def process(self, fd):
'''??????????logic??????'''
logs.dblog("proces: proces start")
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????????epoll??????epoll???????????
# ????????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ???????????
logs.dblog("***process: process end fd state change to write***")
sock_state.state_log()
python类EPOLLOUT的实例源码
def process(self, fd):
'''read????? process ????
'''
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ??write??
self.state_machine(fd)
def process(self, fd):
'''??????????logic??????'''
logs.dblog("proces: proces start")
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????????epoll??????epoll???????????
# ????????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ???????????
logs.dblog("***process: process end fd state change to write***")
sock_state.state_log()
def process(self, fd):
'''read????? process ????
'''
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ??write??
self.state_machine(fd)
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def register(self, sock, events, callback, *args, **kwargs):
ev = select.EPOLLERR | select.EPOLLHUP
need_modify = False
if sock in self.rd_socks:
ev |= select.EPOLLIN
need_modify = True
if sock in self.wr_socks:
ev |= select.EPOLLOUT
need_modify = True
if events & EV_READ:
ev |= select.EPOLLIN
if events & EV_WRITE:
ev |= select.EPOLLOUT
if need_modify:
self.epoll.modify(sock.fileno(), ev)
else:
try:
self.epoll.register(sock.fileno(), ev)
except IOError:
return False
else:
self.fd2socks[sock.fileno()] = sock
super(Epoll, self).register(sock, events, callback, *args, **kwargs)
return True
def unregister(self, sock, events=EV_READ | EV_WRITE):
super(Epoll, self).unregister(sock, events)
if events == EV_READ | EV_WRITE:
self.epoll.unregister(sock)
ck = self.fd2socks.pop(sock.fileno(), None)
if ck:
return True
else:
return False
else:
ev = select.EPOLLERR | select.EPOLLHUP | \
select.EPOLLIN | select.EPOLLOUT
if events & EV_READ:
ev ^= select.EPOLLIN
if events & EV_WRITE:
ev ^= select.EPOLLOUT
self.epoll.modify(sock.fileno(), ev)
return True
def run(self):
while True:
try:
self.check_timer()
events = self.epoll.poll(self.MIN_INTERVAL)
for fd, event in events:
sock = self.fd2socks.get(fd)
if not sock:
continue
if event & select.EPOLLERR or event & select.EPOLLHUP:
if self.err_callback:
self.err_callback[0](sock, *self.err_callback[1],
**self.err_callback[2])
elif event & select.EPOLLIN:
callback, args, kwargs = self.rd_socks.get(sock)
if callback:
callback(sock, *args, **kwargs)
elif event & select.EPOLLOUT:
callback, args, kwargs = self.wr_socks.get(sock)
if callback:
callback(sock, *args, **kwargs)
except Exception as e:
print("exception, %s\n%s" % (e, traceback.format_exc()))
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def flags(self):
flags = 0
if self.read_task is not None:
flags |= select.EPOLLIN
if self.write_task is not None:
flags |= select.EPOLLOUT
if not flags:
return None
# XX not sure if EPOLLEXCLUSIVE is actually safe... I think
# probably we should use it here unconditionally, but:
# https://stackoverflow.com/questions/41582560/how-does-epolls-epollexclusive-mode-interact-with-level-triggering
#flags |= select.EPOLLEXCLUSIVE
# We used to use ONESHOT here also, but it turns out that it's
# confusing/complicated: you can't use ONESHOT+EPOLLEXCLUSIVE
# together, you ONESHOT doesn't delete the registration but just
# "disables" it so you re-enable with CTL rather than ADD (or
# something?)...
# https://lkml.org/lkml/2016/2/4/541
return flags
def handle_io(self, timeout):
# max_events must be > 0 or epoll gets cranky
max_events = max(1, len(self._registered))
events = self._epoll.poll(timeout, max_events)
for fd, flags in events:
waiters = self._registered[fd]
# Clever hack stolen from selectors.EpollSelector: an event
# with EPOLLHUP or EPOLLERR flags wakes both readers and
# writers.
if flags & ~select.EPOLLIN and waiters.write_task is not None:
_core.reschedule(waiters.write_task)
waiters.write_task = None
if flags & ~select.EPOLLOUT and waiters.read_task is not None:
_core.reschedule(waiters.read_task)
waiters.read_task = None
self._update_registrations(fd, True)
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError as e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError, e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError, e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError as e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError, e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except OSError as e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError, e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except OSError as e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key