def run(self):
'''????
??epoll????????
'''
while True:
# epoll??????????????????????????????????????????
# ?????????epoll???????
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
logs.dblog("epoll: epoll find fd(%s) have signal" % fd)
sock_state = self.conn_state[fd]
# ?? epoll?????io?? epoll hang??????
if select.EPOLLHUP & events:
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
sock_state.state = "closing"
logs.dblog("epoll: use state_machine process fd(%s)" % fd)
self.state_machine(fd)
python类EPOLLHUP的实例源码
def run(self):
'''????
??epoll????????
'''
while True:
# epoll??????????????????????????????????????????
# ?????????epoll???????
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
sock_state = self.conn_state[fd]
# ?? epoll?????io?? epoll hang??????
if select.EPOLLHUP & events:
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
sock_state.state = "closing"
self.state_machine(fd)
#}}}
#{{{fork_processes
def run(self):
'''????
??epoll????????
'''
while True:
# epoll??????????????????????????????????????????
# ?????????epoll???????
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
logs.dblog("epoll: epoll find fd(%s) have signal" % fd)
sock_state = self.conn_state[fd]
# ?? epoll?????io?? epoll hang??????
if select.EPOLLHUP & events:
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
sock_state.state = "closing"
logs.dblog("epoll: use state_machine process fd(%s)" % fd)
self.state_machine(fd)
def run(self):
'''????
??epoll????????
'''
while True:
# epoll??????????????????????????????????????????
# ?????????epoll???????
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
sock_state = self.conn_state[fd]
# ?? epoll?????io?? epoll hang??????
if select.EPOLLHUP & events:
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
sock_state.state = "closing"
self.state_machine(fd)
#}}}
#{{{fork_processes
def register(self, sock, events, callback, *args, **kwargs):
ev = select.EPOLLERR | select.EPOLLHUP
need_modify = False
if sock in self.rd_socks:
ev |= select.EPOLLIN
need_modify = True
if sock in self.wr_socks:
ev |= select.EPOLLOUT
need_modify = True
if events & EV_READ:
ev |= select.EPOLLIN
if events & EV_WRITE:
ev |= select.EPOLLOUT
if need_modify:
self.epoll.modify(sock.fileno(), ev)
else:
try:
self.epoll.register(sock.fileno(), ev)
except IOError:
return False
else:
self.fd2socks[sock.fileno()] = sock
super(Epoll, self).register(sock, events, callback, *args, **kwargs)
return True
def unregister(self, sock, events=EV_READ | EV_WRITE):
super(Epoll, self).unregister(sock, events)
if events == EV_READ | EV_WRITE:
self.epoll.unregister(sock)
ck = self.fd2socks.pop(sock.fileno(), None)
if ck:
return True
else:
return False
else:
ev = select.EPOLLERR | select.EPOLLHUP | \
select.EPOLLIN | select.EPOLLOUT
if events & EV_READ:
ev ^= select.EPOLLIN
if events & EV_WRITE:
ev ^= select.EPOLLOUT
self.epoll.modify(sock.fileno(), ev)
return True
def run(self):
while True:
try:
self.check_timer()
events = self.epoll.poll(self.MIN_INTERVAL)
for fd, event in events:
sock = self.fd2socks.get(fd)
if not sock:
continue
if event & select.EPOLLERR or event & select.EPOLLHUP:
if self.err_callback:
self.err_callback[0](sock, *self.err_callback[1],
**self.err_callback[2])
elif event & select.EPOLLIN:
callback, args, kwargs = self.rd_socks.get(sock)
if callback:
callback(sock, *args, **kwargs)
elif event & select.EPOLLOUT:
callback, args, kwargs = self.wr_socks.get(sock)
if callback:
callback(sock, *args, **kwargs)
except Exception as e:
print("exception, %s\n%s" % (e, traceback.format_exc()))
def run(self):
while True:
dbgPrint("\n -- run func loop")
for i in self.conn_state.iterkeys():
dbgPrint("\n -- state of fd: %d" % i)
self.conn_state[i].printState();
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
dbgPrint("\n-- run epoll return fd: %d, event: %s" %(fd, events))
sock_state = self.conn_state[fd]
if select.EPOLLHUP & events:
dbgPrint("events EPOLLHUP")
sock_state.state = "closing"
elif select.EPOLLERR & events:
dbgPrint("EPOLLERROR")
sock_state.state = "closing"
self.state_machine(fd)
def run(self):
'''????
??epoll????????
'''
while True:
#dbgPrint("\n -- run func loop")
#for i in self.conn_state.iterkeys():
# dbgPrint("\n -- state of fd: %d" % i)
# self.conn_state[i].printState();
# epoll??
# ????????????????????????????????
# ?????????epoll???????
# epoll???IO
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
dbgPrint("\n-- run epoll return fd: %d, event: %s" %(fd, events))
sock_state = self.conn_state[fd]
# ?? epoll??
# ???io???epoll hang??????
if select.EPOLLHUP & events:
dbgPrint("events EPOLLHUP")
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
dbgPrint("EPOLLERROR")
sock_state.state = "closing"
self.state_machine(fd)
def run(self):
'''????
??epoll????????
'''
while True:
#dbgPrint("\n -- run func loop")
#for i in self.conn_state.iterkeys():
# dbgPrint("\n -- state of fd: %d" % i)
# self.conn_state[i].printState();
# epoll??
# ????????????????????????????????
# ?????????epoll???????
# epoll???IO
epoll_list = self.epoll_sock.poll()
for fd, events in epoll_list:
dbgPrint("\n-- run epoll return fd: %d, event: %s" %(fd, events))
sock_state = self.conn_state[fd]
# ?? epoll??
# ???io???epoll hang??????
if select.EPOLLHUP & events:
dbgPrint("events EPOLLHUP")
sock_state.state = "closing"
# ??IO??epoll?????????
elif select.EPOLLERR & events:
dbgPrint("EPOLLERROR")
sock_state.state = "closing"
self.state_machine(fd)
2_4_simple_web_server_with_epoll.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def run(self):
"""Executes epoll server operation"""
try:
connections = {}; requests = {}; responses = {}
while True:
events = self.epoll.poll(1)
for fileno, event in events:
if fileno == self.sock.fileno():
connection, address = self.sock.accept()
connection.setblocking(0)
self.epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = SERVER_RESPONSE
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
self.epoll.modify(fileno, select.EPOLLOUT)
print('-'*40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
self.epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
self.epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
self.epoll.unregister(self.sock.fileno())
self.epoll.close()
self.sock.close()
def __convert_epoll_events(self, events):
"""
Convert epoll events to standard events
"""
std_events = []
for fileno, event in events:
is_read = (event & select.EPOLLIN) == select.EPOLLIN
is_write = (event & select.EPOLLOUT) == select.EPOLLOUT
is_hup = (event & select.EPOLLHUP) == select.EPOLLHUP
is_err = (event & select.EPOLLERR) == select.EPOLLERR
std_event = 0
if is_read: std_event |= EV_TYPE_READ
if is_write: std_event |= EV_TYPE_WRITE
if is_hup: std_event |= EV_TYPE_HUP
if is_err: std_event |= EV_TYPE_ERR
std_events.append(
(
fileno, std_event, self.__users_data.get(fileno, None)
)
)
return std_events
def poll(self):
time.sleep(0.0000000001) # give epoll.modify a chance
if not self._connections:
time.sleep(1)
return
for fileno in self._connections:
if fileno not in self._servers:
if self._connections[fileno].outbuffer:
self._epoll.modify(fileno, self._rw)
else:
self._epoll.modify(fileno, self._ro)
for fileno, event in self._epoll.poll(timeout=1):
if fileno in self._servers:
server = self._servers[fileno]
server.handle_connection()
else:
if event & select.EPOLLIN:
try:
con = self._connections[fileno]
con._in()
except Exception as e: # noqa
# logger.exception("{}: {}".format(self._name, e))
con.close()
continue
if event & select.EPOLLOUT:
try:
con = self._connections[fileno]
con._out()
except Exception as e: # noqa
con.close()
continue
if event & (select.EPOLLHUP | select.EPOLLERR):
try:
con = self._connections[fileno]
con.close()
continue
except:
pass
def run(self):
while not self.e_stop.isSet():
self.last_sent = 0
self.state = "idle"
self.state = "connecting"
print "connecting %s" % self.dstaddr
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.socket.settimeout(120)
self.socket.connect((self.dstaddr, self.dstport))
except:
self.handle_close()
self.handle_connect()
epoll = select.epoll()
epoll.register(self.socket.fileno(), select.EPOLLIN)
while self.state != "closed":
if self.e_stop.isSet():
break
events = epoll.poll(timeout=1)
for fd, ev in events:
if ev & select.EPOLLIN:
self.handle_read()
elif ev & select.EPOLLHUP:
self.handle_close()
self.handle_write()
self.handle_close()
if not self.e_stop.isSet():
time.sleep(5)
print "reconnect"
02_04_simple_web_server_with_epoll.py 文件源码
项目:011_python_network_programming_cookbook_demo
作者: jerry-0824
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def run(self):
""" Executes epoll server operation """
try:
connections = {}; requests = {}; responses = {}
while True:
events = self.epoll.poll(1)
for fileno, event in events:
if fileno == self.sock.fileno():
connection, address = self.sock.accept()
connection.setblocking(0)
self.epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = SERVER_RESPONSE
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
self.epoll.modify(fileno, select.EPOLLOUT)
print('-'*40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
self.epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
self.epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
self.epoll.unregister(self.sock.fileno())
self.epoll.close()
self.sock.close()
def __init__(self):
self.timeout_multiplier = 1
if hasattr(select, 'epoll'):
self._poller_name = 'epoll'
self._poller = select.epoll()
_AsyncPoller._Read = select.EPOLLIN | select.EPOLLPRI
_AsyncPoller._Write = select.EPOLLOUT
_AsyncPoller._Hangup = select.EPOLLHUP
_AsyncPoller._Error = select.EPOLLERR
_AsyncPoller._Block = -1
elif hasattr(select, 'kqueue'):
self._poller_name = 'kqueue'
self._poller = _KQueueNotifier()
# kqueue filter values are negative numbers so using
# them as flags won't work, so define them as necessary
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
elif hasattr(select, 'devpoll'):
self._poller_name = 'devpoll'
self._poller = select.devpoll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
elif hasattr(select, 'poll'):
self._poller_name = 'poll'
self._poller = select.poll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
else:
self._poller_name = 'select'
self._poller = _SelectNotifier()
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
self._fds = {}
self._events = {}
self._timeouts = []
self.cmd_read, self.cmd_write = _AsyncPoller._cmd_read_write_fds(self)
if hasattr(self.cmd_write, 'getsockname'):
self.cmd_read = AsyncSocket(self.cmd_read)
self.cmd_read._read_fn = lambda: self.cmd_read._rsock.recv(128)
self.interrupt = lambda: self.cmd_write.send('I')
else:
self.interrupt = lambda: os.write(self.cmd_write._fileno, 'I')
self.add(self.cmd_read, _AsyncPoller._Read)
def __init__(self):
self.timeout_multiplier = 1
if hasattr(select, 'epoll'):
self._poller_name = 'epoll'
self._poller = select.epoll()
_AsyncPoller._Read = select.EPOLLIN | select.EPOLLPRI
_AsyncPoller._Write = select.EPOLLOUT
_AsyncPoller._Hangup = select.EPOLLHUP
_AsyncPoller._Error = select.EPOLLERR
_AsyncPoller._Block = -1
elif hasattr(select, 'kqueue'):
self._poller_name = 'kqueue'
self._poller = _KQueueNotifier()
# kqueue filter values are negative numbers so using
# them as flags won't work, so define them as necessary
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
elif hasattr(select, 'devpoll'):
self._poller_name = 'devpoll'
self._poller = select.devpoll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
elif hasattr(select, 'poll'):
self._poller_name = 'poll'
self._poller = select.poll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
else:
self._poller_name = 'select'
self._poller = _SelectNotifier()
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
self._fds = {}
self._events = {}
self._timeouts = []
self.cmd_read, self.cmd_write = _AsyncPoller._cmd_read_write_fds(self)
if hasattr(self.cmd_write, 'getsockname'):
self.cmd_read = AsyncSocket(self.cmd_read)
self.cmd_read._read_fn = lambda: self.cmd_read._rsock.recv(128)
self.interrupt = lambda: self.cmd_write.send(b'I')
else:
self.interrupt = lambda: os.write(self.cmd_write._fileno, b'I')
self.add(self.cmd_read, _AsyncPoller._Read)
def run_forever():
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_fd.bind(('', master_connector.tcp_port))
listen_fd.listen(master_connector.max_minions)
master_connector.epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
datalist = {}
master_connector.establish_vswitch('master')
try:
while True:
epoll_list = master_connector.epoll_fd.poll()
for fd, events in epoll_list:
if fd == listen_fd.fileno():
fileno, addr = listen_fd.accept()
fileno.setblocking(0)
master_connector.epoll_fd.register(fileno.fileno(), select.EPOLLIN | select.EPOLLET)
master_connector.conn[fileno.fileno()] = (fileno, addr[0])
master_connector.build_gre_conn('master', addr[0])
elif select.EPOLLIN & events:
datas = b''
while True:
try:
data = master_connector.conn[fd][0].recv(10)
if not data and not datas:
master_connector.close_connection(fd)
break
else:
datas += data
except socket.error as msg:
if msg.errno == errno.EAGAIN:
try:
datalist[fd] = master_connector.do_message_response(datas)
master_connector.epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
except:
master_connector.close_connection(fd)
else:
master_connector.close_connection(fd)
break
elif select.EPOLLOUT & events:
sendLen = 0
while True:
sendLen += master_connector.conn[fd][0].send(datalist[fd][sendLen:])
if sendLen == len(datalist[fd]):
break
master_connector.epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
elif select.EPOLLHUP & events:
master_connector.close_connection(fd)
else:
continue
finally:
os.system('ovs-vsctl del-br ovs-master >/dev/null 2>&1')
def _processStreams(self):
if len(self._closedfds) == 3:
return
if not self._streamLock.acquire(False):
self._streamLock.acquire()
self._streamLock.release()
return
try:
if self._stdin.len > 0 and self._stdin.pos == 0:
# Polling stdin is redundant if there is nothing to write
# turn on only if data is waiting to be pushed
self._poller.modify(self._fdin, select.EPOLLOUT)
pollres = NoIntrPoll(self._poller.poll, 1)
for fd, event in pollres:
stream = self._fdMap[fd]
if event & select.EPOLLOUT and self._stdin.len > 0:
buff = self._stdin.read(BUFFSIZE)
written = os.write(fd, buff)
stream.pos -= len(buff) - written
if stream.pos == stream.len:
stream.truncate(0)
self._poller.modify(fd, 0)
elif event & (select.EPOLLIN | select.EPOLLPRI):
data = os.read(fd, BUFFSIZE)
oldpos = stream.pos
stream.pos = stream.len
stream.write(data)
stream.pos = oldpos
elif event & (select.EPOLLHUP | select.EPOLLERR):
self._poller.unregister(fd)
self._closedfds.append(fd)
# I don't close the fd because the original Popen
# will do it.
if self.stdin.closed and self._fdin not in self._closedfds:
self._poller.unregister(self._fdin)
self._closedfds.append(self._fdin)
self._proc.stdin.close()
finally:
self._streamLock.release()
def run():
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('127.0.0.1', 18080))
serversocket.listen(5)
serversocket.setblocking(0) #???????
epoll = select.epoll() # ????epoll??
epoll.register(serversocket.fileno(), select.EPOLLIN) # ????serversocket.fileno?????event
try:
count = 0
while True:
events = epoll.poll() # ???fileno??
count += 1
for fileno, event in events:
if fileno == serversocket.fileno(): # ????fileno????,??fileno?????event
connection, address = serversocket.accept()
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = b""
print "new conn.fileno is %s" % connection.fileno()
elif event & select.EPOLLIN: # ??fileno??event,???????,????fileno??event,????????
print "read event is happing"
requests[fileno] += connections[fileno].recv(1024)
epoll.modify(fileno, select.EPOLLOUT)
print('-' * 40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT: # ??fileno????,?????????
if responses[fileno]:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
epoll.modify(fileno, select.EPOLLOUT) # ???????????,????????event
print "change event to write"
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
print "event is HUP ===%s" % fileno
pass
except Exception, err:
print traceback.print_exc()
finally:
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
print "finally"