def register(self, fd):
"""
Register a new file descriptor to be
part of the select polling next time around.
"""
try:
self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
except IOError:
pass
python类EPOLLPRI的实例源码
def poll(self):
"""
Polls once and yields each ready-to-be-read
file-descriptor
"""
events = self.poller.poll(timeout=self.timeout)
for fd, event in events:
if event | select.EPOLLIN | select.EPOLLPRI:
yield fd
def register(self, fd):
"""
Register a new file descriptor to be
part of the select polling next time around.
"""
try:
self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
except IOError:
pass
def poll(self):
"""
Polls once and yields each ready-to-be-read
file-descriptor
"""
events = self.poller.poll(timeout=self.timeout)
for fd, event in events:
if event | select.EPOLLIN | select.EPOLLPRI:
yield fd
def __init__(self, popenToWrap):
self._streamLock = threading.Lock()
self._proc = popenToWrap
self._stdout = StringIO()
self._stderr = StringIO()
self._stdin = StringIO()
fdout = self._proc.stdout.fileno()
fderr = self._proc.stderr.fileno()
self._fdin = self._proc.stdin.fileno()
self._closedfds = []
self._poller = select.epoll()
self._poller.register(fdout, select.EPOLLIN | select.EPOLLPRI)
self._poller.register(fderr, select.EPOLLIN | select.EPOLLPRI)
self._poller.register(self._fdin, 0)
self._fdMap = {fdout: self._stdout,
fderr: self._stderr,
self._fdin: self._stdin}
self.stdout = io.BufferedReader(self._streamWrapper(self,
self._stdout, fdout), BUFFSIZE)
self.stderr = io.BufferedReader(self._streamWrapper(self,
self._stderr, fderr), BUFFSIZE)
self.stdin = io.BufferedWriter(self._streamWrapper(self,
self._stdin, self._fdin), BUFFSIZE)
self._returncode = None
self.blocking = False
def _run(self):
while self._running:
events = self._poll.poll(EPOLL_TIMEOUT)
for fd, event in events:
if not (event & (select.EPOLLPRI | select.EPOLLET)):
continue
self.changed(self.read())
def register(self, fd):
"""
Register a new file descriptor to be
part of the select polling next time around.
"""
try:
self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
except IOError:
pass
def poll(self):
"""
Polls once and yields each ready-to-be-read
file-descriptor
"""
try:
events = self.poller.poll(timeout=self.timeout)
except IOError:
events = []
for fd, event in events:
if event | select.EPOLLIN | select.EPOLLPRI:
yield fd
def register(self, fd):
"""
Register a new file descriptor to be
part of the select polling next time around.
"""
try:
self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
except IOError:
pass
def poll(self):
"""
Polls once and yields each ready-to-be-read
file-descriptor
"""
try:
events = self.poller.poll(timeout=self.timeout)
except IOError:
events = []
for fd, event in events:
if event | select.EPOLLIN | select.EPOLLPRI:
yield fd
def run(self):
self.exc = None
try:
sysfs.edge(self._pin, self._trigger)
initial_edge = True
with sysfs.value_descriptor(self._pin) as fd:
e = select.epoll()
e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI)
try:
while not self._finished:
events = e.poll(0.1, maxevents=1)
if initial_edge:
initial_edge = False
elif len(events) > 0:
with self._lock:
self._event_detected = True
self.notify_callbacks()
finally:
e.unregister(fd)
e.close()
except BaseException as e:
self.exc = e
finally:
sysfs.edge(self._pin, NONE)
def blocking_wait_for_edge(pin, trigger, timeout=-1):
assert trigger in [RISING, FALLING, BOTH]
if pin in _threads:
raise RuntimeError("Conflicting edge detection events already exist for this GPIO channel")
try:
sysfs.edge(pin, trigger)
finished = False
initial_edge = True
with sysfs.value_descriptor(pin) as fd:
e = select.epoll()
e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI)
try:
while not finished:
# TODO: implement bouncetime
events = e.poll(timeout / 1000.0, maxevents=1)
if initial_edge:
initial_edge = False
else:
finished = True
n = len(events)
if n == 0:
return None
else:
return pin
finally:
e.unregister(fd)
e.close()
finally:
sysfs.edge(pin, NONE)
def __init__(self):
self.timeout_multiplier = 1
if hasattr(select, 'epoll'):
self._poller_name = 'epoll'
self._poller = select.epoll()
_AsyncPoller._Read = select.EPOLLIN | select.EPOLLPRI
_AsyncPoller._Write = select.EPOLLOUT
_AsyncPoller._Hangup = select.EPOLLHUP
_AsyncPoller._Error = select.EPOLLERR
_AsyncPoller._Block = -1
elif hasattr(select, 'kqueue'):
self._poller_name = 'kqueue'
self._poller = _KQueueNotifier()
# kqueue filter values are negative numbers so using
# them as flags won't work, so define them as necessary
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
elif hasattr(select, 'devpoll'):
self._poller_name = 'devpoll'
self._poller = select.devpoll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
elif hasattr(select, 'poll'):
self._poller_name = 'poll'
self._poller = select.poll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
else:
self._poller_name = 'select'
self._poller = _SelectNotifier()
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
self._fds = {}
self._events = {}
self._timeouts = []
self.cmd_read, self.cmd_write = _AsyncPoller._cmd_read_write_fds(self)
if hasattr(self.cmd_write, 'getsockname'):
self.cmd_read = AsyncSocket(self.cmd_read)
self.cmd_read._read_fn = lambda: self.cmd_read._rsock.recv(128)
self.interrupt = lambda: self.cmd_write.send('I')
else:
self.interrupt = lambda: os.write(self.cmd_write._fileno, 'I')
self.add(self.cmd_read, _AsyncPoller._Read)
def __init__(self):
self.timeout_multiplier = 1
if hasattr(select, 'epoll'):
self._poller_name = 'epoll'
self._poller = select.epoll()
_AsyncPoller._Read = select.EPOLLIN | select.EPOLLPRI
_AsyncPoller._Write = select.EPOLLOUT
_AsyncPoller._Hangup = select.EPOLLHUP
_AsyncPoller._Error = select.EPOLLERR
_AsyncPoller._Block = -1
elif hasattr(select, 'kqueue'):
self._poller_name = 'kqueue'
self._poller = _KQueueNotifier()
# kqueue filter values are negative numbers so using
# them as flags won't work, so define them as necessary
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
elif hasattr(select, 'devpoll'):
self._poller_name = 'devpoll'
self._poller = select.devpoll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
elif hasattr(select, 'poll'):
self._poller_name = 'poll'
self._poller = select.poll()
_AsyncPoller._Read = select.POLLIN | select.POLLPRI
_AsyncPoller._Write = select.POLLOUT
_AsyncPoller._Hangup = select.POLLHUP
_AsyncPoller._Error = select.POLLERR
_AsyncPoller._Block = -1
self.timeout_multiplier = 1000
else:
self._poller_name = 'select'
self._poller = _SelectNotifier()
_AsyncPoller._Read = 0x01
_AsyncPoller._Write = 0x02
_AsyncPoller._Hangup = 0x04
_AsyncPoller._Error = 0x08
_AsyncPoller._Block = None
self._fds = {}
self._events = {}
self._timeouts = []
self.cmd_read, self.cmd_write = _AsyncPoller._cmd_read_write_fds(self)
if hasattr(self.cmd_write, 'getsockname'):
self.cmd_read = AsyncSocket(self.cmd_read)
self.cmd_read._read_fn = lambda: self.cmd_read._rsock.recv(128)
self.interrupt = lambda: self.cmd_write.send(b'I')
else:
self.interrupt = lambda: os.write(self.cmd_write._fileno, b'I')
self.add(self.cmd_read, _AsyncPoller._Read)
def _processStreams(self):
if len(self._closedfds) == 3:
return
if not self._streamLock.acquire(False):
self._streamLock.acquire()
self._streamLock.release()
return
try:
if self._stdin.len > 0 and self._stdin.pos == 0:
# Polling stdin is redundant if there is nothing to write
# turn on only if data is waiting to be pushed
self._poller.modify(self._fdin, select.EPOLLOUT)
pollres = NoIntrPoll(self._poller.poll, 1)
for fd, event in pollres:
stream = self._fdMap[fd]
if event & select.EPOLLOUT and self._stdin.len > 0:
buff = self._stdin.read(BUFFSIZE)
written = os.write(fd, buff)
stream.pos -= len(buff) - written
if stream.pos == stream.len:
stream.truncate(0)
self._poller.modify(fd, 0)
elif event & (select.EPOLLIN | select.EPOLLPRI):
data = os.read(fd, BUFFSIZE)
oldpos = stream.pos
stream.pos = stream.len
stream.write(data)
stream.pos = oldpos
elif event & (select.EPOLLHUP | select.EPOLLERR):
self._poller.unregister(fd)
self._closedfds.append(fd)
# I don't close the fd because the original Popen
# will do it.
if self.stdin.closed and self._fdin not in self._closedfds:
self._poller.unregister(self._fdin)
self._closedfds.append(self._fdin)
self._proc.stdin.close()
finally:
self._streamLock.release()
def __init__(self, number, direction=INPUT, callback=None, edge=None, active_low=0):
"""
@type number: int
@param number: The pin number
@type direction: int
@param direction: Pin direction, enumerated by C{Direction}
@type callback: callable
@param callback: Method be called when pin changes state
@type edge: int
@param edge: The edge transition that triggers callback,
enumerated by C{Edge}
@type active_low: int
@param active_low: Indicator of whether this pin uses inverted
logic for HIGH-LOW transitions.
"""
self._number = number
self._direction = direction
self._callback = callback
self._active_low = active_low
if not os.path.isdir(self._sysfs_gpio_value_path()):
with open(SYSFS_EXPORT_PATH, 'w') as export:
export.write('%d' % number)
else:
Logger.debug("SysfsGPIO: Pin %d already exported" % number)
self._fd = open(self._sysfs_gpio_value_path(), 'r+')
if callback and not edge:
raise Exception('You must supply a edge to trigger callback on')
with open(self._sysfs_gpio_direction_path(), 'w') as fsdir:
fsdir.write(direction)
if edge:
with open(self._sysfs_gpio_edge_path(), 'w') as fsedge:
fsedge.write(edge)
self._poll = select.epoll()
self._poll.register(self, (select.EPOLLPRI | select.EPOLLET))
self.thread = Thread(target=self._run)
self.thread.daemon = True
self._running = True
self.start()
if active_low:
if active_low not in ACTIVE_LOW_MODES:
raise Exception('You must supply a value for active_low which is either 0 or 1.')
with open(self._sysfs_gpio_active_low_path(), 'w') as fsactive_low:
fsactive_low.write(str(active_low))
def _epoll_select(self, rlist, wlist, xlist, timeout=None):
"""epoll-based drop-in replacement for select to overcome select
limitation on a maximum filehandle value
"""
if timeout is None:
timeout = -1
eventmasks = defaultdict(int)
rfd2obj = defaultdict(list)
wfd2obj = defaultdict(list)
xfd2obj = defaultdict(list)
read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case
def store_evmasks(obj_list, evmask, fd2obj):
for obj in obj_list:
fileno = _to_fileno(obj)
eventmasks[fileno] |= evmask
fd2obj[fileno].append(obj)
store_evmasks(rlist, read_evmask, rfd2obj)
store_evmasks(wlist, select.EPOLLOUT, wfd2obj)
store_evmasks(xlist, select.EPOLLERR, xfd2obj)
poller = select.epoll()
for fileno in eventmasks:
poller.register(fileno, eventmasks[fileno])
try:
events = poller.poll(timeout)
revents = []
wevents = []
xevents = []
for fileno, event in events:
if event & read_evmask:
revents += rfd2obj.get(fileno, [])
if event & select.EPOLLOUT:
wevents += wfd2obj.get(fileno, [])
if event & select.EPOLLERR:
xevents += xfd2obj.get(fileno, [])
finally:
poller.close()
return revents, wevents, xevents