def handle_io(self, timeout):
# max_events must be > 0 or kqueue gets cranky
# and we generally want this to be strictly larger than the actual
# number of events we get, so that we can tell that we've gotten
# all the events in just 1 call.
max_events = len(self._registered) + 1
events = []
while True:
batch = self._kqueue.control([], max_events, timeout)
events += batch
if len(batch) < max_events:
break
else:
timeout = 0
# and loop back to the start
for event in events:
key = (event.ident, event.filter)
receiver = self._registered[key]
if event.flags & select.KQ_EV_ONESHOT:
del self._registered[key]
if type(receiver) is _core.Task:
_core.reschedule(receiver, _core.Value(event))
else:
receiver.put_nowait(event)
# kevent registration is complicated -- e.g. aio submission can
# implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will
# automatically register filters for child processes. So our lowlevel
# API is *very* low-level: we expose the kqueue itself for adding
# events or sticking into AIO submission structs, and split waiting
# off into separate methods. It's your responsibility to make sure
# that handle_io never receives an event without a corresponding
# registration! This may be challenging if you want to be careful
# about e.g. KeyboardInterrupt. Possibly this API could be improved to
# be more ergonomic...
python类KQ_EV_ONESHOT的实例源码
def _wait_common(self, fd, filter):
if not isinstance(fd, int):
fd = fd.fileno()
flags = select.KQ_EV_ADD | select.KQ_EV_ONESHOT
event = select.kevent(fd, filter, flags)
self._kqueue.control([event], 0)
def abort(_):
event = select.kevent(fd, filter, select.KQ_EV_DELETE)
self._kqueue.control([event], 0)
return _core.Abort.SUCCEEDED
await self.wait_kevent(fd, filter, abort)
def test_create_event(self):
fd = sys.stderr.fileno()
ev = select.kevent(fd)
other = select.kevent(1000)
self.assertEqual(ev.ident, fd)
self.assertEqual(ev.filter, select.KQ_FILTER_READ)
self.assertEqual(ev.flags, select.KQ_EV_ADD)
self.assertEqual(ev.fflags, 0)
self.assertEqual(ev.data, 0)
self.assertEqual(ev.udata, 0)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
self.assertEqual(cmp(ev, other), -1)
self.assertTrue(ev < other)
self.assertTrue(other >= ev)
self.assertRaises(TypeError, cmp, ev, None)
self.assertRaises(TypeError, cmp, ev, 1)
self.assertRaises(TypeError, cmp, ev, "ev")
ev = select.kevent(fd, select.KQ_FILTER_WRITE)
self.assertEqual(ev.ident, fd)
self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
self.assertEqual(ev.flags, select.KQ_EV_ADD)
self.assertEqual(ev.fflags, 0)
self.assertEqual(ev.data, 0)
self.assertEqual(ev.udata, 0)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
self.assertEqual(ev.ident, fd)
self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
self.assertEqual(ev.fflags, 0)
self.assertEqual(ev.data, 0)
self.assertEqual(ev.udata, 0)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
ev = select.kevent(1, 2, 3, 4, 5, 6)
self.assertEqual(ev.ident, 1)
self.assertEqual(ev.filter, 2)
self.assertEqual(ev.flags, 3)
self.assertEqual(ev.fflags, 4)
self.assertEqual(ev.data, 5)
self.assertEqual(ev.udata, 6)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
bignum = sys.maxsize * 2 + 1
ev = select.kevent(bignum, 1, 2, 3, sys.maxsize, bignum)
self.assertEqual(ev.ident, bignum)
self.assertEqual(ev.filter, 1)
self.assertEqual(ev.flags, 2)
self.assertEqual(ev.fflags, 3)
self.assertEqual(ev.data, sys.maxsize)
self.assertEqual(ev.udata, bignum)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)