def test_force_close(self):
tr = self.create_transport()
tr._buffer.extend(b'1')
self.loop.add_reader(7, mock.sentinel)
self.loop.add_writer(7, mock.sentinel)
tr._force_close(None)
self.assertTrue(tr.is_closing())
self.assertEqual(tr._buffer, list_to_buffer())
self.assertFalse(self.loop.readers)
self.assertFalse(self.loop.writers)
# second close should not remove reader
tr._force_close(None)
self.assertFalse(self.loop.readers)
self.assertEqual(1, self.loop.remove_reader_count[7])
评论列表
文章目录