def handle_io(self, timeout):
# max_events must be > 0 or kqueue gets cranky
# and we generally want this to be strictly larger than the actual
# number of events we get, so that we can tell that we've gotten
# all the events in just 1 call.
max_events = len(self._registered) + 1
events = []
while True:
batch = self._kqueue.control([], max_events, timeout)
events += batch
if len(batch) < max_events:
break
else:
timeout = 0
# and loop back to the start
for event in events:
key = (event.ident, event.filter)
receiver = self._registered[key]
if event.flags & select.KQ_EV_ONESHOT:
del self._registered[key]
if type(receiver) is _core.Task:
_core.reschedule(receiver, _core.Value(event))
else:
receiver.put_nowait(event)
# kevent registration is complicated -- e.g. aio submission can
# implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will
# automatically register filters for child processes. So our lowlevel
# API is *very* low-level: we expose the kqueue itself for adding
# events or sticking into AIO submission structs, and split waiting
# off into separate methods. It's your responsibility to make sure
# that handle_io never receives an event without a corresponding
# registration! This may be challenging if you want to be careful
# about e.g. KeyboardInterrupt. Possibly this API could be improved to
# be more ergonomic...
评论列表
文章目录