def _handle_socket_read(handler, sock):
collected = b''
while True:
# a full packet before calling on_packet in the handler class
ready = select.select([sock], [], [], 0.0)
try:
if ready[0]:
data = sock.recv(4096)
# print("data in: %s" % hex_bytes(data))
collected += data
# Try and consume repeatedly if multiple messages arrived
# in the same packet
while True:
collected1 = handler.consume(collected)
if collected1 is None:
print("Protocol requested to disconnect the socket")
break
if collected1 == collected:
break # could not consume any more
collected = collected1
else:
handler.handle_inbox()
except select.error:
# Disconnected probably or another error
break
sock.close()
handler.on_connection_lost()
python类select()的实例源码
def select(self, *args, **kwargs):
return gevent.select.select(*args, **kwargs)
def __call__(self, environ, start_response):
self.environ = environ
uwsgi.websocket_handshake()
self._req_ctx = None
if hasattr(uwsgi, 'request_context'):
# uWSGI >= 2.1.x with support for api access across-greenlets
self._req_ctx = uwsgi.request_context()
else:
# use event and queue for sending messages
from gevent.event import Event
from gevent.queue import Queue
from gevent.select import select
self._event = Event()
self._send_queue = Queue()
# spawn a select greenlet
def select_greenlet_runner(fd, event):
"""Sets event when data becomes available to read on fd."""
while True:
event.set()
try:
select([fd], [], [])[0]
except ValueError:
break
self._select_greenlet = gevent.spawn(
select_greenlet_runner,
uwsgi.connection_fd(),
self._event)
self.app(self)
def select(self, *args, **kwargs):
return gevent.select.select(*args, **kwargs)
def runShell(script, env=None, cwd="/", user=None, base64_encode=False):
# check script
import pwd, grp
import pty
if not script:
return 0, ""
try:
# base64 decode
if base64_encode:
decode_script = base64.decodestring(script)
else:
decode_script = script
# check user
logger.info(decode_script)
cur_user = pwd.getpwuid(os.getuid())[0]
if not user or user == cur_user:
shell_list = ["bash", "-c", decode_script.encode('utf8')]
else:
shell_list = ["su", user, "-c", decode_script.encode('utf8'), "-s", "/bin/bash"]
master_fd, slave_fd = pty.openpty()
proc = Popen(shell_list, shell=False, universal_newlines=True, bufsize=1,
stdout=slave_fd, stderr=STDOUT, env=env, cwd=cwd, close_fds=True)
except Exception, e:
logger.error(e)
return (1, str(e))
timeout = .1
outputs = ''
while True:
try:
ready, _, _ = gs.select([master_fd], [], [], timeout)
except gs.error as ex:
if ex[0] == 4:
continue
else:
raise
if ready:
data = os.read(master_fd, 512)
outputs += data
if not data:
break
elif proc.poll() is not None:
break
os.close(slave_fd)
os.close(master_fd)
proc.wait()
status = proc.returncode
return (status, outputs)