def __init__(self, sock, logic):
# ??????,??????socket??????????????????
self.conn_state = {}
# ??setFD?????socket?????????
self.setFd(sock)
# ??epoll??????????????????
self.epoll_sock = select.epoll()
# ??????epoll???????socket????????fd?????
# ?????????epoll????? EPOLLIN ?????
# ??????https://docs.python.org/2.7/library/select.html?highlight=epoll#select.poll.register
self.epoll_sock.register(sock.fileno(), select.EPOLLIN)
# ??????
self.logic = logic
# ?????????????
self.sm = {
"accept": self.accept,
"read": self.read,
"write": self.write,
"process": self.process,
"closing": self.close,
}
python类EPOLLIN的实例源码
def accept(self, fd):
'''??????fd???fd??
'''
try:
# ??fd??????????fd?
sock_state = self.conn_state[fd]
# ??sock??????soket?
sock = sock_state.sock_obj
# ??accept???????????????????conn?????socket???addr?????????
conn, addr = sock.accept()
# ??socket????
conn.setblocking(0)
# ?epoll??????socket??fd
self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
# ???????conn???????sock
self.setFd(conn)
# ????fd????read epoll????????????????
self.conn_state[conn.fileno()].state = "read"
except socket.error as msg:
# ECONNABORTED??TCP???????????RST
# EAGIIN ???????????????????
# ????accept
if msg.args[0] in (errno.ECONNABORTED, errno.EAGAIN):
return
raise
def accept(self, fd):
'''??????fd???fd??
'''
try:
# ??fd??????????fd?
sock_state = self.conn_state[fd]
# ??sock??????soket?
sock = sock_state.sock_obj
# ??accept???????????????????conn?????socket???addr?????????
conn, addr = sock.accept()
# ??socket????
conn.setblocking(0)
# ?epoll??????socket??fd
self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
# ???????conn???????sock
self.setFd(conn)
# ????fd????read epoll????????????????
self.conn_state[conn.fileno()].state = "read"
except socket.error as msg:
# ECONNABORTED??TCP???????????RST
# EAGIIN ???????????????????
# ????accept
if msg.args[0] in (errno.ECONNABORTED, errno.EAGAIN):
return
raise
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def register(self, sock, events, callback, *args, **kwargs):
ev = select.EPOLLERR | select.EPOLLHUP
need_modify = False
if sock in self.rd_socks:
ev |= select.EPOLLIN
need_modify = True
if sock in self.wr_socks:
ev |= select.EPOLLOUT
need_modify = True
if events & EV_READ:
ev |= select.EPOLLIN
if events & EV_WRITE:
ev |= select.EPOLLOUT
if need_modify:
self.epoll.modify(sock.fileno(), ev)
else:
try:
self.epoll.register(sock.fileno(), ev)
except IOError:
return False
else:
self.fd2socks[sock.fileno()] = sock
super(Epoll, self).register(sock, events, callback, *args, **kwargs)
return True
def unregister(self, sock, events=EV_READ | EV_WRITE):
super(Epoll, self).unregister(sock, events)
if events == EV_READ | EV_WRITE:
self.epoll.unregister(sock)
ck = self.fd2socks.pop(sock.fileno(), None)
if ck:
return True
else:
return False
else:
ev = select.EPOLLERR | select.EPOLLHUP | \
select.EPOLLIN | select.EPOLLOUT
if events & EV_READ:
ev ^= select.EPOLLIN
if events & EV_WRITE:
ev ^= select.EPOLLOUT
self.epoll.modify(sock.fileno(), ev)
return True
def run(self):
while True:
try:
self.check_timer()
events = self.epoll.poll(self.MIN_INTERVAL)
for fd, event in events:
sock = self.fd2socks.get(fd)
if not sock:
continue
if event & select.EPOLLERR or event & select.EPOLLHUP:
if self.err_callback:
self.err_callback[0](sock, *self.err_callback[1],
**self.err_callback[2])
elif event & select.EPOLLIN:
callback, args, kwargs = self.rd_socks.get(sock)
if callback:
callback(sock, *args, **kwargs)
elif event & select.EPOLLOUT:
callback, args, kwargs = self.wr_socks.get(sock)
if callback:
callback(sock, *args, **kwargs)
except Exception as e:
print("exception, %s\n%s" % (e, traceback.format_exc()))
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def write2read(self, fd):
try:
write_ret = self.write(fd)
except socket.error, msg:
write_ret = "closing"
if write_ret == "writemore":
pass
elif write_ret == "writecomplete":
sock_state = self.conn_state[fd]
conn = sock_state.sock_obj
self.setFd(conn)
self.conn_state[fd].state = "read"
self.epoll_sock.modify(fd, select.EPOLLIN)
elif write_ret == "closing":
dbgPrint(msg)
self.conn_state[fd].state = "closing"
self.state_machine(fd)
#}}}
def __init__(self, addr, port, logic):
dbgPrint("\n-- __init__: start!")
self.conn_state = {}
self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_sock.bind((addr, port))
self.listen_sock.listen(10)
self.setFd(self.listen_sock)
self.epoll_sock = select.epoll()
self.epoll_sock.register(self.listen_sock.fileno(), select.EPOLLIN)
self.logic = logic
self.sm = {
'accept': self.accept2read,
"read": self.read2process,
"write":self.write2read,
"process": self.process,
"closing": self.close,
}
def write2read(self, fd):
try:
write_ret = self.write(fd)
except socket.error, msg:
write_ret = "closing"
if write_ret == "writemore":
pass
elif write_ret == "writecomplete":
sock_state = self.conn_state[fd]
conn = sock_state.sock_obj
self.setFd(conn)
self.conn_state[fd].state = "read"
self.epoll_sock.modify(fd, select.EPOLLIN)
elif write_ret == "closing":
dbgPrint(msg)
self.conn_state[fd].state = "closing"
self.state_machine(fd)
#}}}
def flags(self):
flags = 0
if self.read_task is not None:
flags |= select.EPOLLIN
if self.write_task is not None:
flags |= select.EPOLLOUT
if not flags:
return None
# XX not sure if EPOLLEXCLUSIVE is actually safe... I think
# probably we should use it here unconditionally, but:
# https://stackoverflow.com/questions/41582560/how-does-epolls-epollexclusive-mode-interact-with-level-triggering
#flags |= select.EPOLLEXCLUSIVE
# We used to use ONESHOT here also, but it turns out that it's
# confusing/complicated: you can't use ONESHOT+EPOLLEXCLUSIVE
# together, you ONESHOT doesn't delete the registration but just
# "disables" it so you re-enable with CTL rather than ADD (or
# something?)...
# https://lkml.org/lkml/2016/2/4/541
return flags
def handle_io(self, timeout):
# max_events must be > 0 or epoll gets cranky
max_events = max(1, len(self._registered))
events = self._epoll.poll(timeout, max_events)
for fd, flags in events:
waiters = self._registered[fd]
# Clever hack stolen from selectors.EpollSelector: an event
# with EPOLLHUP or EPOLLERR flags wakes both readers and
# writers.
if flags & ~select.EPOLLIN and waiters.write_task is not None:
_core.reschedule(waiters.write_task)
waiters.write_task = None
if flags & ~select.EPOLLOUT and waiters.read_task is not None:
_core.reschedule(waiters.read_task)
waiters.read_task = None
self._update_registrations(fd, True)
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError as e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError, e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError, e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError as e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError, e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except OSError as e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_close(self):
open_file = open(__file__, "rb")
self.addCleanup(open_file.close)
fd = open_file.fileno()
epoll = select.epoll()
# test fileno() method and closed attribute
self.assertIsInstance(epoll.fileno(), int)
self.assertFalse(epoll.closed)
# test close()
epoll.close()
self.assertTrue(epoll.closed)
self.assertRaises(ValueError, epoll.fileno)
# close() can be called more than once
epoll.close()
# operations must fail with ValueError("I/O operation on closed ...")
self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
self.assertRaises(ValueError, epoll.poll, 1.0)
self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
self.assertRaises(ValueError, epoll.unregister, fd)
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except IOError, e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_fromfd(self):
server, client = self._connected_pair()
ep = select.epoll(2)
ep2 = select.epoll.fromfd(ep.fileno())
ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
events = ep.poll(1, 4)
events2 = ep2.poll(0.9, 4)
self.assertEqual(len(events), 2)
self.assertEqual(len(events2), 2)
ep.close()
try:
ep2.poll(1, 4)
except OSError as e:
self.assertEqual(e.args[0], errno.EBADF, e)
else:
self.fail("epoll on closed fd didn't raise EBADF")
def test_close(self):
open_file = open(__file__, "rb")
self.addCleanup(open_file.close)
fd = open_file.fileno()
epoll = select.epoll()
# test fileno() method and closed attribute
self.assertIsInstance(epoll.fileno(), int)
self.assertFalse(epoll.closed)
# test close()
epoll.close()
self.assertTrue(epoll.closed)
self.assertRaises(ValueError, epoll.fileno)
# close() can be called more than once
epoll.close()
# operations must fail with ValueError("I/O operation on closed ...")
self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
self.assertRaises(ValueError, epoll.poll, 1.0)
self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
self.assertRaises(ValueError, epoll.unregister, fd)
def __init__(self, sock, logic):
'''?????'''
# ??????,??????socket??????????????????
self.conn_state = {}
logs.dblog("init: init listen socket ")
# ??setFD???socket?????????
self.setFd(sock)
# ??epoll??????????????????
self.epoll_sock = select.epoll()
# ??????epoll???????socket????????fd?????
# ?????????epoll????? EPOLLIN ?????
self.epoll_sock.register(sock.fileno(), select.EPOLLIN)
# ??????
self.logic = logic
def write(self, fd):
'''????????
'''
# ??socket
sock_state = self.conn_state[fd]
conn = sock_state.sock_obj
# ????????????
last_have_send = sock_state.have_write
try:
# ?????? conn.send ?????????
have_send = conn.send(sock_state.buff_write[last_have_send:])
# ?????????
sock_state.have_write += have_send
# ???????????
sock_state.need_write -= have_send
# ??????????????? ???????????
if sock_state.need_write == 0 and sock_state.have_write != 0:
# ????????????,????????????
self.setFd(conn)
self.conn_state[fd].state = "read"
self.epoll_sock.modify(fd, select.EPOLLIN)
except socket.error, msg:
# ?send???????socket?????epoll?????????????
# ???????????[Errno 11] Resource temporarily unavailable
# ???????epoll??????,?????epoll????
if msg.errno == 11:
return
sock_state.state = "closing"
self.state_machine(fd)
def __init__(self, sock, logic):
'''?????'''
# ??????,??????socket??????????????????
self.conn_state = {}
logs.dblog("init: init listen socket ")
# ??setFD???socket?????????
self.setFd(sock)
# ??epoll??????????????????
self.epoll_sock = select.epoll()
# ??????epoll???????socket????????fd?????
# ?????????epoll????? EPOLLIN ?????
self.epoll_sock.register(sock.fileno(), select.EPOLLIN)
# ??????
self.logic = logic
def write(self, fd):
'''????????
'''
# ??socket
sock_state = self.conn_state[fd]
conn = sock_state.sock_obj
# ????????????
last_have_send = sock_state.have_write
try:
# ?????? conn.send ?????????
have_send = conn.send(sock_state.buff_write[last_have_send:])
# ?????????
sock_state.have_write += have_send
# ???????????
sock_state.need_write -= have_send
# ??????????????? ???????????
if sock_state.need_write == 0 and sock_state.have_write != 0:
# ????????????,????????????
self.setFd(conn)
self.conn_state[fd].state = "read"
self.epoll_sock.modify(fd, select.EPOLLIN)
except socket.error, msg:
# ?send???????socket?????epoll?????????????
# ???????????[Errno 11] Resource temporarily unavailable
# ???????epoll??????,?????epoll????
if msg.errno == 11:
return
sock_state.state = "closing"
self.state_machine(fd)
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key