def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
events = ep.poll(timeout=2.1, maxevents=4)
self.assertFalse(events)
client.send("Hello!")
server.send("world!!!")
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)
评论列表
文章目录