def unregister(self, fileobj):
key = super(KqueueSelector, self).unregister(fileobj)
if key.events & EVENT_READ:
kevent = select.kevent(key.fd,
select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
try:
_syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
except SelectorError:
pass
if key.events & EVENT_WRITE:
kevent = select.kevent(key.fd,
select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)
try:
_syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
except SelectorError:
pass
return key
python类select()的实例源码
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise ValueError('Platform does not have a selector')
return _DEFAULT_SELECTOR()
def recv(self, *args, **kwargs):
try:
data = self.connection.recv(*args, **kwargs)
except OpenSSL.SSL.SysCallError as e:
if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
return b''
else:
raise SocketError(str(e))
except OpenSSL.SSL.ZeroReturnError as e:
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
return b''
else:
raise
except OpenSSL.SSL.WantReadError:
rd, wd, ed = select.select(
[self.socket], [], [], self.socket.gettimeout())
if not rd:
raise timeout('The read operation timed out')
else:
return self.recv(*args, **kwargs)
else:
return data
def recv_into(self, *args, **kwargs):
try:
return self.connection.recv_into(*args, **kwargs)
except OpenSSL.SSL.SysCallError as e:
if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
return 0
else:
raise SocketError(str(e))
except OpenSSL.SSL.ZeroReturnError as e:
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
return 0
else:
raise
except OpenSSL.SSL.WantReadError:
rd, wd, ed = select.select(
[self.socket], [], [], self.socket.gettimeout())
if not rd:
raise timeout('The read operation timed out')
else:
return self.recv_into(*args, **kwargs)
def __serve(self, client, server):
'Private class method.'
pairs = {client: server, server: client}
while pairs:
read, write, error = _select.select(pairs.keys(), [], [])
for socket in read:
string = socket.recv(self.BUFFERSIZE)
if string:
pairs[socket].sendall(string)
else:
pairs[socket].shutdown(_socket.SHUT_WR)
socket.shutdown(_socket.SHUT_RD)
del pairs[socket]
client.close()
server.close()
################################################################################
def _setup_input_pipes(input_pipes):
"""
Given a mapping of input pipes, return a tuple with 2 elements. The first is
a list of file descriptors to pass to ``select`` as writeable descriptors.
The second is a dictionary mapping paths to existing named pipes to their
adapters.
"""
wds = []
fifos = {}
for pipe, adapter in six.viewitems(input_pipes):
if isinstance(pipe, int):
# This is assumed to be an open system-level file descriptor
wds.append(pipe)
else:
if not os.path.exists(pipe):
raise Exception('Input pipe does not exist: %s' % pipe)
if not stat.S_ISFIFO(os.stat(pipe).st_mode):
raise Exception('Input pipe must be a fifo object: %s' % pipe)
fifos[pipe] = adapter
return wds, fifos
def __init__(self):
if hasattr(select, 'epoll'):
self._impl = select.epoll()
model = 'epoll'
elif hasattr(select, 'kqueue'):
self._impl = KqueueLoop()
model = 'kqueue'
elif hasattr(select, 'select'):
self._impl = SelectLoop()
model = 'select'
else:
raise Exception('can not find any available functions in select '
'package')
self._fdmap = {} # (f, handler)
self._last_time = time.time()
self._periodic_callbacks = []
self._stopping = False
logging.debug('using event model: %s', model)
def keep_reading(self):
"""Output thread method for the process
Sends the process output to the ViewController (through OutputTranscoder)
"""
while True:
if self.stop:
break
ret = self.process.poll()
if ret is not None:
self.stop = True
readable, writable, executable = select.select([self.master], [], [], 5)
if readable:
""" We read the new content """
data = os.read(self.master, 1024)
text = data.decode('UTF-8', errors='replace')
log_debug("RAW", repr(text))
log_debug("PID", os.getenv('BASHPID'))
self.output_transcoder.decode(text)
# log_debug("{} >> {}".format(int(time.time()), repr(text)))
def run(self):
while True :
writefd = []
if not self.messages.empty():
# Expects a message to contain either the string 'exit'
# or a line of input in a tuple: ('input', None)
message = self.messages.get()
if message == 'exit':
self.messages.task_done()
break
else:
message, _encoding = message
writefd = [self.master]
r,w,_ = select.select([self.master], writefd, [], 0)
if r:
# Read when the binary has new output for us (sometimes this came from us writing)
line = os.read(self.master, 1024) # Reads up to a kilobyte at once. Should this be higher/lower?
self.RECV_LINE.emit(line)
if w:
os.write(self.master, message + "\n")
self.messages.task_done()
def loop_read(self, max_packets=1):
"""Process read network events. Use in place of calling loop() if you
wish to handle your client reads as part of your own application.
Use socket() to obtain the client socket to call select() or equivalent
on.
Do not use if you are using the threaded interface loop_start()."""
if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
max_packets = len(self._out_messages) + len(self._in_messages)
if max_packets < 1:
max_packets = 1
for i in range(0, max_packets):
rc = self._packet_read()
if rc > 0:
return self._loop_rc_handle(rc)
elif rc == MQTT_ERR_AGAIN:
return MQTT_ERR_SUCCESS
return MQTT_ERR_SUCCESS
def loop_write(self, max_packets=1):
"""Process read network events. Use in place of calling loop() if you
wish to handle your client reads as part of your own application.
Use socket() to obtain the client socket to call select() or equivalent
on.
Use want_write() to determine if there is data waiting to be written.
Do not use if you are using the threaded interface loop_start()."""
if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
max_packets = len(self._out_packet) + 1
if max_packets < 1:
max_packets = 1
for i in range(0, max_packets):
rc = self._packet_write()
if rc > 0:
return self._loop_rc_handle(rc)
elif rc == MQTT_ERR_AGAIN:
return MQTT_ERR_SUCCESS
return MQTT_ERR_SUCCESS
def recv(self, *args, **kwargs):
try:
data = self.connection.recv(*args, **kwargs)
except OpenSSL.SSL.SysCallError as e:
if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
return b''
else:
raise SocketError(str(e))
except OpenSSL.SSL.ZeroReturnError as e:
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
return b''
else:
raise
except OpenSSL.SSL.WantReadError:
rd, wd, ed = select.select(
[self.socket], [], [], self.socket.gettimeout())
if not rd:
raise timeout('The read operation timed out')
else:
return self.recv(*args, **kwargs)
else:
return data
def recv_into(self, *args, **kwargs):
try:
return self.connection.recv_into(*args, **kwargs)
except OpenSSL.SSL.SysCallError as e:
if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
return 0
else:
raise SocketError(str(e))
except OpenSSL.SSL.ZeroReturnError as e:
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
return 0
else:
raise
except OpenSSL.SSL.WantReadError:
rd, wd, ed = select.select(
[self.socket], [], [], self.socket.gettimeout())
if not rd:
raise timeout('The read operation timed out')
else:
return self.recv_into(*args, **kwargs)
def select_folder_and_parse(self, folder, readonly=False):
"""Set the current folder on the server.
Future calls to methods such as search and fetch will act on
the selected folder.
Returns a dictionary containing the ``SELECT`` response. At least
the ``EXISTS``, ``FLAGS`` and ``RECENT`` keys are guaranteed
to exist. An example::
{'EXISTS': 3,
'FLAGS': ('\\Answered', '\\Flagged', '\\Deleted', ... ),
'RECENT': 0,
'PERMANENTFLAGS': ('\\Answered', '\\Flagged', '\\Deleted', ... ),
'READ-WRITE': True,
'UIDNEXT': 11,
'UIDVALIDITY': 1239278212}
"""
self._command_and_check('select',
self._normalise_folder(folder),
readonly)
untagged = self._imap.untagged_responses
return self._process_select_response(from_bytes(untagged))
def recv(self, *args, **kwargs):
try:
data = self.connection.recv(*args, **kwargs)
except OpenSSL.SSL.SysCallError as e:
if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
return b''
else:
raise SocketError(str(e))
except OpenSSL.SSL.ZeroReturnError as e:
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
return b''
else:
raise
except OpenSSL.SSL.WantReadError:
rd, wd, ed = select.select(
[self.socket], [], [], self.socket.gettimeout())
if not rd:
raise timeout('The read operation timed out')
else:
return self.recv(*args, **kwargs)
else:
return data
def recv_into(self, *args, **kwargs):
try:
return self.connection.recv_into(*args, **kwargs)
except OpenSSL.SSL.SysCallError as e:
if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
return 0
else:
raise SocketError(str(e))
except OpenSSL.SSL.ZeroReturnError as e:
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
return 0
else:
raise
except OpenSSL.SSL.WantReadError:
rd, wd, ed = select.select(
[self.socket], [], [], self.socket.gettimeout())
if not rd:
raise timeout('The read operation timed out')
else:
return self.recv_into(*args, **kwargs)
def __init__(self, iocp_notifier):
self._poller_name = 'select'
self._fds = {}
self._events = {}
self._terminate = False
self.rset = set()
self.wset = set()
self.xset = set()
self.iocp_notifier = iocp_notifier
self.cmd_rsock, self.cmd_wsock = _AsyncPoller._socketpair()
self.cmd_rsock.setblocking(0)
self.cmd_wsock.setblocking(0)
self.poller = select.select
self._polling = False
self._lock = threading.RLock()
self.poll_thread = threading.Thread(target=self.poll)
self.poll_thread.daemon = True
self.poll_thread.start()
def __init__(self):
if not hasattr(self, 'poller'):
self.poller = select.kqueue()
def register(self, fid, event):
flags = select.KQ_EV_ADD
if event & _AsyncPoller._Read:
flags |= select.KQ_EV_ENABLE
else:
flags |= select.KQ_EV_DISABLE
self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_READ, flags=flags)], 0)
flags = select.KQ_EV_ADD
if event & _AsyncPoller._Write:
flags |= select.KQ_EV_ENABLE
else:
flags |= select.KQ_EV_DISABLE
self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_WRITE, flags=flags)], 0)
def unregister(self, fid):
flags = select.KQ_EV_DELETE
self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_READ, flags=flags)], 0)
self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_WRITE, flags=flags)], 0)