def test_read_while_writeable(self):
# Ensure that write events don't come in while we're waiting for
# a read and haven't asked for writeability. (the reverse is
# difficult to test for)
client, server = socket.socketpair()
try:
def handler(fd, events):
self.assertEqual(events, IOLoop.READ)
self.stop()
self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ)
self.io_loop.add_timeout(self.io_loop.time() + 0.01,
functools.partial(server.send, b'asdf'))
self.wait()
self.io_loop.remove_handler(client.fileno())
finally:
client.close()
server.close()
评论列表
文章目录