def __convert_kqueue_events(self, events):
"""
Convert kqueue events to standard events
"""
std_events = []
for kevent in events:
std_event = 0
ident = kevent.ident
flags = kevent.flags
fflags = kevent.fflags
filter_ = kevent.filter
data = kevent.data
udata = kevent.udata
is_read = (filter_ & select.KQ_FILTER_READ) == select.KQ_FILTER_READ
is_write = (filter_ & select.KQ_FILTER_WRITE) == select.KQ_FILTER_WRITE and \
((udata & EV_TYPE_WRITE) == EV_TYPE_WRITE)
is_error = (flags & select.KQ_EV_ERROR) == select.KQ_EV_ERROR
if is_read: std_event |= EV_TYPE_READ
if is_write: std_event |= EV_TYPE_WRITE
if is_error: std_event |= EV_TYPE_ERR
self.__kqueue_event_map[ident] = kevent
std_events.append(
(
ident,
std_event,
self.__users_data.get(ident, None)
)
)
return std_events
评论列表
文章目录