def process(self, fd):
'''??????????logic??????'''
logs.dblog("proces: proces start")
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????????epoll??????epoll???????????
# ????????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ???????????
logs.dblog("***process: process end fd state change to write***")
sock_state.state_log()
python类epoll()的实例源码
def check_fd(self):
'''??fd??
??read ?????????????????
?????????????
'''
while True:
for fd in self.conn_state.keys():
sock_state = self.conn_state[fd]
# fd?read???? read_time ???
# ???fd?epoll?????????????????
if sock_state.state == "read" and sock_state.read_stime \
and (time.time() - sock_state.read_stime) >= sock_state.read_itime:
# ??????????fd
sock_state.state = "closing"
self.state_machine(fd)
# ??????
time.sleep(60)
#}}}
#{{{fork_processes
def __init__(self, sock, logic):
# ??????,??????socket??????????????????
self.conn_state = {}
# ??setFD?????socket?????????
self.setFd(sock)
# ??epoll??????????????????
self.epoll_sock = select.epoll()
# ??????epoll???????socket????????fd?????
# ?????????epoll????? EPOLLIN ?????
# ??????https://docs.python.org/2.7/library/select.html?highlight=epoll#select.poll.register
self.epoll_sock.register(sock.fileno(), select.EPOLLIN)
# ??????
self.logic = logic
# ?????????????
self.sm = {
"accept": self.accept,
"read": self.read,
"write": self.write,
"process": self.process,
"closing": self.close,
}
def accept(self, fd):
'''??????fd???fd??
'''
try:
# ??fd??????????fd?
sock_state = self.conn_state[fd]
# ??sock??????soket?
sock = sock_state.sock_obj
# ??accept???????????????????conn?????socket???addr?????????
conn, addr = sock.accept()
# ??socket????
conn.setblocking(0)
# ?epoll??????socket??fd
self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
# ???????conn???????sock
self.setFd(conn)
# ????fd????read epoll????????????????
self.conn_state[conn.fileno()].state = "read"
except socket.error as msg:
# ECONNABORTED??TCP???????????RST
# EAGIIN ???????????????????
# ????accept
if msg.args[0] in (errno.ECONNABORTED, errno.EAGAIN):
return
raise
def process(self, fd):
'''read????? process ????
'''
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ??write??
self.state_machine(fd)
def run(self):
'''????
??epoll????????
'''
while True:
# epoll??????????????????????????????????????????
# ?????????epoll???????
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
sock_state = self.conn_state[fd]
# ?? epoll?????io?? epoll hang??????
if select.EPOLLHUP & events:
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
sock_state.state = "closing"
self.state_machine(fd)
#}}}
#{{{fork_processes
def accept(self, fd):
'''accpet??epoll?????????
??????socket?????
'''
logs.dblog("accept: accept client")
try:
# ??fd??????????fd?
sock_state = self.conn_state[fd]
# ??sock??????soket?
sock = sock_state.sock_obj
# ??accept???????????????????conn?????socket???addr?????????
conn, addr = sock.accept()
# ??socket????
conn.setblocking(0)
# ????????socket??,???IP??
logs.dblog("accept: find new socket client fd(%s)" % conn.fileno())
return conn,addr[0]
except socket.error as msg:
# EAGIIN ???????????????????(erron???11)
# ECONNABORTED??TCP???????????RST(erron???103)
# ????accept ?????? retry
if msg.errno in (11, 103):
return "retry"
def process(self, fd):
'''??????????logic??????'''
logs.dblog("proces: proces start")
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????????epoll??????epoll???????????
# ????????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ???????????
logs.dblog("***process: process end fd state change to write***")
sock_state.state_log()
def run(self):
'''????
??epoll????????
'''
while True:
# epoll??????????????????????????????????????????
# ?????????epoll???????
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
logs.dblog("epoll: epoll find fd(%s) have signal" % fd)
sock_state = self.conn_state[fd]
# ?? epoll?????io?? epoll hang??????
if select.EPOLLHUP & events:
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
sock_state.state = "closing"
logs.dblog("epoll: use state_machine process fd(%s)" % fd)
self.state_machine(fd)
def check_fd(self):
'''??fd??
??read ?????????????????
?????????????
'''
while True:
for fd in self.conn_state.keys():
sock_state = self.conn_state[fd]
# fd?read???? read_time ???
# ???fd?epoll?????????????????
if sock_state.state == "read" and sock_state.read_stime \
and (time.time() - sock_state.read_stime) >= sock_state.read_itime:
# ??????????fd
sock_state.state = "closing"
self.state_machine(fd)
# ??????
time.sleep(60)
#}}}
#{{{fork_processes
def __init__(self, sock, logic):
# ??????,??????socket??????????????????
self.conn_state = {}
# ??setFD?????socket?????????
self.setFd(sock)
# ??epoll??????????????????
self.epoll_sock = select.epoll()
# ??????epoll???????socket????????fd?????
# ?????????epoll????? EPOLLIN ?????
# ??????https://docs.python.org/2.7/library/select.html?highlight=epoll#select.poll.register
self.epoll_sock.register(sock.fileno(), select.EPOLLIN)
# ??????
self.logic = logic
# ?????????????
self.sm = {
"accept": self.accept,
"read": self.read,
"write": self.write,
"process": self.process,
"closing": self.close,
}
def accept(self, fd):
'''??????fd???fd??
'''
try:
# ??fd??????????fd?
sock_state = self.conn_state[fd]
# ??sock??????soket?
sock = sock_state.sock_obj
# ??accept???????????????????conn?????socket???addr?????????
conn, addr = sock.accept()
# ??socket????
conn.setblocking(0)
# ?epoll??????socket??fd
self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
# ???????conn???????sock
self.setFd(conn)
# ????fd????read epoll????????????????
self.conn_state[conn.fileno()].state = "read"
except socket.error as msg:
# ECONNABORTED??TCP???????????RST
# EAGIIN ???????????????????
# ????accept
if msg.args[0] in (errno.ECONNABORTED, errno.EAGAIN):
return
raise
def process(self, fd):
'''read????? process ????
'''
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ??write??
self.state_machine(fd)
def run(self):
'''????
??epoll????????
'''
while True:
# epoll??????????????????????????????????????????
# ?????????epoll???????
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
sock_state = self.conn_state[fd]
# ?? epoll?????io?? epoll hang??????
if select.EPOLLHUP & events:
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
sock_state.state = "closing"
self.state_machine(fd)
#}}}
#{{{fork_processes
def _can_allocate(struct):
""" Checks that select structs can be allocated by the underlying
operating system, not just advertised by the select module. We don't
check select() because we'll be hopeful that most platforms that
don't have it available will not advertise it. (ie: GAE) """
try:
# select.poll() objects won't fail until used.
if struct == 'poll':
p = select.poll()
p.poll(0)
# All others will fail on allocation.
else:
getattr(select, struct)().close()
return True
except (OSError, AttributeError) as e:
return False
# Choose the best implementation, roughly:
# kqueue == epoll > poll > select. Devpoll not supported. (See above)
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise ValueError('Platform does not have a selector')
return _DEFAULT_SELECTOR()
def _can_allocate(struct):
""" Checks that select structs can be allocated by the underlying
operating system, not just advertised by the select module. We don't
check select() because we'll be hopeful that most platforms that
don't have it available will not advertise it. (ie: GAE) """
try:
# select.poll() objects won't fail until used.
if struct == 'poll':
p = select.poll()
p.poll(0)
# All others will fail on allocation.
else:
getattr(select, struct)().close()
return True
except (OSError, AttributeError) as e:
return False
# Choose the best implementation, roughly:
# kqueue == epoll > poll > select. Devpoll not supported. (See above)
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise ValueError('Platform does not have a selector')
return _DEFAULT_SELECTOR()
def _can_allocate(struct):
""" Checks that select structs can be allocated by the underlying
operating system, not just advertised by the select module. We don't
check select() because we'll be hopeful that most platforms that
don't have it available will not advertise it. (ie: GAE) """
try:
# select.poll() objects won't fail until used.
if struct == 'poll':
p = select.poll()
p.poll(0)
# All others will fail on allocation.
else:
getattr(select, struct)().close()
return True
except (OSError, AttributeError) as e:
return False
# Choose the best implementation, roughly:
# kqueue == epoll > poll > select. Devpoll not supported. (See above)
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise ValueError('Platform does not have a selector')
return _DEFAULT_SELECTOR()
def __init__(self):
if hasattr(select, 'epoll'):
self._impl = select.epoll()
model = 'epoll'
elif hasattr(select, 'kqueue'):
self._impl = KqueueLoop()
model = 'kqueue'
elif hasattr(select, 'select'):
self._impl = SelectLoop()
model = 'select'
else:
raise Exception('can not find any available functions in select '
'package')
self._fdmap = {} # (f, handler)
self._last_time = time.time()
self._periodic_callbacks = []
self._stopping = False
logging.debug('using event model: %s', model)
def _can_allocate(struct):
""" Checks that select structs can be allocated by the underlying
operating system, not just advertised by the select module. We don't
check select() because we'll be hopeful that most platforms that
don't have it available will not advertise it. (ie: GAE) """
try:
# select.poll() objects won't fail until used.
if struct == 'poll':
p = select.poll()
p.poll(0)
# All others will fail on allocation.
else:
getattr(select, struct)().close()
return True
except (OSError, AttributeError) as e:
return False
# Choose the best implementation, roughly:
# kqueue == epoll > poll > select. Devpoll not supported. (See above)
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise ValueError('Platform does not have a selector')
return _DEFAULT_SELECTOR()
def _can_allocate(struct):
""" Checks that select structs can be allocated by the underlying
operating system, not just advertised by the select module. We don't
check select() because we'll be hopeful that most platforms that
don't have it available will not advertise it. (ie: GAE) """
try:
# select.poll() objects won't fail until used.
if struct == 'poll':
p = select.poll()
p.poll(0)
# All others will fail on allocation.
else:
getattr(select, struct)().close()
return True
except (OSError, AttributeError) as e:
return False
# Choose the best implementation, roughly:
# kqueue == epoll > poll > select. Devpoll not supported. (See above)
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise ValueError('Platform does not have a selector')
return _DEFAULT_SELECTOR()
def __init__(self):
if hasattr(select, 'epoll'):
self._impl = select.epoll()
model = 'epoll'
elif hasattr(select, 'kqueue'):
self._impl = KqueueLoop()
model = 'kqueue'
elif hasattr(select, 'select'):
self._impl = SelectLoop()
model = 'select'
else:
raise Exception('can not find any available functions in select '
'package')
self._fdmap = {} # (f, handler)
self._last_time = time.time()
self._periodic_callbacks = []
self._stopping = False
logging.debug('using event model: %s', model)
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def __init__(self):
if hasattr(select, 'epoll'):
self._impl = select.epoll()
model = 'epoll'
elif hasattr(select, 'kqueue'):
self._impl = KqueueLoop()
model = 'kqueue'
elif hasattr(select, 'select'):
self._impl = SelectLoop()
model = 'select'
else:
raise Exception('can not find any available functions in select '
'package')
self._fdmap = {} # (f, handler)
self._last_time = time.time()
self._periodic_callbacks = []
self._stopping = False
logging.debug('using event model: %s', model)
def register(self, sock, events, callback, *args, **kwargs):
ev = select.EPOLLERR | select.EPOLLHUP
need_modify = False
if sock in self.rd_socks:
ev |= select.EPOLLIN
need_modify = True
if sock in self.wr_socks:
ev |= select.EPOLLOUT
need_modify = True
if events & EV_READ:
ev |= select.EPOLLIN
if events & EV_WRITE:
ev |= select.EPOLLOUT
if need_modify:
self.epoll.modify(sock.fileno(), ev)
else:
try:
self.epoll.register(sock.fileno(), ev)
except IOError:
return False
else:
self.fd2socks[sock.fileno()] = sock
super(Epoll, self).register(sock, events, callback, *args, **kwargs)
return True
def unregister(self, sock, events=EV_READ | EV_WRITE):
super(Epoll, self).unregister(sock, events)
if events == EV_READ | EV_WRITE:
self.epoll.unregister(sock)
ck = self.fd2socks.pop(sock.fileno(), None)
if ck:
return True
else:
return False
else:
ev = select.EPOLLERR | select.EPOLLHUP | \
select.EPOLLIN | select.EPOLLOUT
if events & EV_READ:
ev ^= select.EPOLLIN
if events & EV_WRITE:
ev ^= select.EPOLLOUT
self.epoll.modify(sock.fileno(), ev)
return True