def __call__(self):
'''Selector poll loop'''
avail_buf = array.array('i', [0])
while self._running:
with self._socket_map_lock:
for sock in self._unregister_sockets:
self.selector.unregister(sock)
self._unregister_sockets.clear()
for sock in self._register_sockets:
self.selector.register(sock, selectors.EVENT_READ)
self._register_sockets.clear()
events = self.selector.select(timeout=0.1)
with self._socket_map_lock:
if self._unregister_sockets or self._register_sockets:
continue
ready_ids = [self.socket_to_id[key.fileobj]
for key, mask in events]
ready_objs = [(self.objects[obj_id], self.id_to_socket[obj_id])
for obj_id in ready_ids]
for obj, sock in ready_objs:
# TODO: consider thread pool for recv and command_loop
if fcntl.ioctl(sock, termios.FIONREAD, avail_buf) < 0:
continue
bytes_available = avail_buf[0]
try:
bytes_recv, address = sock.recvfrom(max((4096,
bytes_available)))
except OSError as ex:
if ex.errno == errno.EAGAIN:
continue
bytes_recv, address = b'', None
obj.received(bytes_recv, address)
评论列表
文章目录