def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in list(map.items()):
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error as err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
python类POLLPRI的实例源码
def initialize(self):
self._poller = select.poll()
self.READ = select.POLLIN | select.POLLPRI | select.POLLHUP
self.readables_fd_map = {}
def _readline(self, timeout=0):
if not self.proc:
return None
p = select.poll()
flag_err = select.POLLERR | select.POLLHUP
rfile = self.proc.stdout
p.register(rfile, flag_err | select.POLLIN | select.POLLPRI)
buf = StringIO.StringIO()
while self.playing:
lst = p.poll(timeout)
if not lst:
line = buf.getvalue()
buf.close()
self.log.debug("Time out!: %r" % line)
return line
for fd, flags in lst:
if flags & flag_err:
self.log.debug("Error reading MPlayer: %s, flags=%x" %
(fd, flags))
self.emit("error", "Problems reading MPlayer output")
return None
c = rfile.read(1)
buf.write(c)
if c == "\n" or c == "\r":
line = buf.getvalue()
buf.close()
return line
# _readline()
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in map.items():
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in map.items():
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
def hasMorePollPri(fd):
p = epoll()
p.register(fd, select.POLLPRI | select.POLLERR)
r = p.poll(0)
return (fd in r)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in map.items():
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
def register_read(self, f):
self._register(f, select.POLLIN | select.POLLPRI)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in map.items():
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in list(map.items()):
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except InterruptedError:
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in map.items():
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in map.items():
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in list(map.items()):
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except InterruptedError:
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
def register(self, fd, eventmask=None):
"""Register a file descriptor with the fake polling object."""
if eventmask is None:
eventmask = (select_lib.POLLIN |
select_lib.POLLOUT |
select_lib.POLLPRI)
self.poll_events[fd] = eventmask
def __init__(self):
self.timeout_multiplier = 1
if hasattr(select, 'epoll'):
self._poller_name = 'epoll'
self._poller = select.epoll()
_AsyncPoller._Read = select.EPOLLIN | select.EPOLLPRI
_AsyncPoller._Write = select.EPOLLOUT
_AsyncPoller._Hangup = select.EPOLLHUP
_AsyncPoller._Error = select.EPOLLERR
_AsyncPoller._Block = -1
elif hasattr(select, 'kqueue'):
self._poller_name = 'kqueue'
self._poller = _KQueueNotifier()
# kqueue filter values are negative numbers so using
# them as flags won't work, so define them as necessary
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
elif hasattr(select, 'devpoll'):
self._poller_name = 'devpoll'
self._poller = select.devpoll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
elif hasattr(select, 'poll'):
self._poller_name = 'poll'
self._poller = select.poll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
else:
self._poller_name = 'select'
self._poller = _SelectNotifier()
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
self._fds = {}
self._events = {}
self._timeouts = []
self.cmd_read, self.cmd_write = _AsyncPoller._cmd_read_write_fds(self)
if hasattr(self.cmd_write, 'getsockname'):
self.cmd_read = AsyncSocket(self.cmd_read)
self.cmd_read._read_fn = lambda: self.cmd_read._rsock.recv(128)
self.interrupt = lambda: self.cmd_write.send('I')
else:
self.interrupt = lambda: os.write(self.cmd_write._fileno, 'I')
self.add(self.cmd_read, _AsyncPoller._Read)
def __init__(self):
self.timeout_multiplier = 1
if hasattr(select, 'epoll'):
self._poller_name = 'epoll'
self._poller = select.epoll()
_AsyncPoller._Read = select.EPOLLIN | select.EPOLLPRI
_AsyncPoller._Write = select.EPOLLOUT
_AsyncPoller._Hangup = select.EPOLLHUP
_AsyncPoller._Error = select.EPOLLERR
_AsyncPoller._Block = -1
elif hasattr(select, 'kqueue'):
self._poller_name = 'kqueue'
self._poller = _KQueueNotifier()
# kqueue filter values are negative numbers so using
# them as flags won't work, so define them as necessary
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
elif hasattr(select, 'devpoll'):
self._poller_name = 'devpoll'
self._poller = select.devpoll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
elif hasattr(select, 'poll'):
self._poller_name = 'poll'
self._poller = select.poll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
else:
self._poller_name = 'select'
self._poller = _SelectNotifier()
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
self._fds = {}
self._events = {}
self._timeouts = []
self.cmd_read, self.cmd_write = _AsyncPoller._cmd_read_write_fds(self)
if hasattr(self.cmd_write, 'getsockname'):
self.cmd_read = AsyncSocket(self.cmd_read)
self.cmd_read._read_fn = lambda: self.cmd_read._rsock.recv(128)
self.interrupt = lambda: self.cmd_write.send(b'I')
else:
self.interrupt = lambda: os.write(self.cmd_write._fileno, b'I')
self.add(self.cmd_read, _AsyncPoller._Read)
def _communicate_with_poll(self, input):
stdout = None # Return
stderr = None # Return
fd2file = {}
fd2output = {}
poller = select.poll()
def register_and_append(file_obj, eventmask):
poller.register(file_obj.fileno(), eventmask)
fd2file[file_obj.fileno()] = file_obj
def close_unregister_and_remove(fd):
poller.unregister(fd)
fd2file[fd].close()
fd2file.pop(fd)
if self.stdin and input:
register_and_append(self.stdin, select.POLLOUT)
select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
if self.stdout:
register_and_append(self.stdout, select_POLLIN_POLLPRI)
fd2output[self.stdout.fileno()] = stdout = []
if self.stderr:
register_and_append(self.stderr, select_POLLIN_POLLPRI)
fd2output[self.stderr.fileno()] = stderr = []
input_offset = 0
while fd2file:
try:
ready = poller.poll()
except select.error, e:
if e.args[0] == errno.EINTR:
continue
raise
for fd, mode in ready:
if mode & select.POLLOUT:
chunk = input[input_offset : input_offset + _PIPE_BUF]
try:
input_offset += os.write(fd, chunk)
except OSError as e:
if e.errno == errno.EPIPE:
close_unregister_and_remove(fd)
else:
raise
else:
if input_offset >= len(input):
close_unregister_and_remove(fd)
elif mode & select_POLLIN_POLLPRI:
data = os.read(fd, 4096)
if not data:
close_unregister_and_remove(fd)
fd2output[fd].append(data)
else:
# Ignore hang up or errors.
close_unregister_and_remove(fd)
return (stdout, stderr)
def read(self, timeout=0):
'''
Read the contents of the spool. This may cause reads on the registered
file descriptors.
'''
if not self.fds:
raise Exception('no file descriptors registered')
if timeout > 0:
start = datetime.now()
io = StringIO()
stop = False
while (not stop):
stop = True
if timeout > 0:
limit = timeout - ((datetime.now() - start).microseconds / 1000)
if limit < 0:
raise Exception('time out')
else:
limit = 0
events = self.fds.poll(limit)
if not events:
raise Exception('time out')
for e in events:
if (e[1] & select.POLLIN
or e[1] & select.POLLPRI):
tmp = os.read(e[0], 4096)
if tmp:
io.write(tmp)
if self.log:
self.log.write(tmp)
continue
if e[1] & select.POLLERR:
self.fds.unregister(e[0])
raise Exception('POLLERR')
if e[1] & select.POLLHUP:
self.fds.unregister(e[0])
raise Exception('POLLHUP')
if e[1] & select.POLLNVAL:
self.fds.unregister(e[0])
raise Exception('POLLNVAL')
if self.log:
self.log.flush()
return io.getvalue()
def _communicate_with_poll(self, input):
stdout = None # Return
stderr = None # Return
fd2file = {}
fd2output = {}
poller = select.poll()
def register_and_append(file_obj, eventmask):
poller.register(file_obj.fileno(), eventmask)
fd2file[file_obj.fileno()] = file_obj
def close_unregister_and_remove(fd):
poller.unregister(fd)
fd2file[fd].close()
fd2file.pop(fd)
if self.stdin and input:
register_and_append(self.stdin, select.POLLOUT)
select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
if self.stdout:
register_and_append(self.stdout, select_POLLIN_POLLPRI)
fd2output[self.stdout.fileno()] = stdout = []
if self.stderr:
register_and_append(self.stderr, select_POLLIN_POLLPRI)
fd2output[self.stderr.fileno()] = stderr = []
input_offset = 0
while fd2file:
try:
ready = poller.poll()
except select.error, e:
if e.args[0] == errno.EINTR:
continue
raise
for fd, mode in ready:
if mode & select.POLLOUT:
chunk = input[input_offset : input_offset + _PIPE_BUF]
try:
input_offset += os.write(fd, chunk)
except OSError as e:
if e.errno == errno.EPIPE:
close_unregister_and_remove(fd)
else:
raise
else:
if input_offset >= len(input):
close_unregister_and_remove(fd)
elif mode & select_POLLIN_POLLPRI:
data = os.read(fd, 4096)
if not data:
close_unregister_and_remove(fd)
fd2output[fd].append(data)
else:
# Ignore hang up or errors.
close_unregister_and_remove(fd)
return (stdout, stderr)
def _read_until_with_poll(self, match, timeout):
"""Read until a given string is encountered or until timeout.
This method uses select.poll() to implement the timeout.
"""
n = len(match)
call_timeout = timeout
if timeout is not None:
from time import time
time_start = time()
self.process_rawq()
i = self.cookedq.find(match)
if i < 0:
poller = select.poll()
poll_in_or_priority_flags = select.POLLIN | select.POLLPRI
poller.register(self, poll_in_or_priority_flags)
while i < 0 and not self.eof:
try:
# Poll takes its timeout in milliseconds.
ready = poller.poll(None if timeout is None
else 1000 * call_timeout)
except select.error as e:
if e.errno == errno.EINTR:
if timeout is not None:
elapsed = time() - time_start
call_timeout = timeout-elapsed
continue
raise
for fd, mode in ready:
if mode & poll_in_or_priority_flags:
i = max(0, len(self.cookedq)-n)
self.fill_rawq()
self.process_rawq()
i = self.cookedq.find(match, i)
if timeout is not None:
elapsed = time() - time_start
if elapsed >= timeout:
break
call_timeout = timeout-elapsed
poller.unregister(self)
if i >= 0:
i = i + n
buf = self.cookedq[:i]
self.cookedq = self.cookedq[i:]
return buf
return self.read_very_lazy()