def _run(self):
while self._running:
events = self._poll.poll(EPOLL_TIMEOUT)
for fd, event in events:
if not (event & (select.EPOLLPRI | select.EPOLLET)):
continue
self.changed(self.read())
python类EPOLLET的实例源码
def wait_edge(fd,pin,callback,debouncingtime):
debouncingtime=debouncingtime/1000
timestampprec=time.time()
counter=0
po = select.epoll()
po.register(fd,select.EPOLLET)
while True:
events = po.poll()
timestamp=time.time()
if (timestamp-timestampprec>debouncingtime) and counter>0:
callback(pin)
counter=counter+1
timestampprec=timestamp
def wait_edge(self,fd,callback,debouncingtime):
# Convert in millisecondi
debouncingtime=debouncingtime/1000.0
timestampprec=time.time()
counter=0
po = select.epoll()
po.register(fd,select.EPOLLET)
while True:
events = po.poll()
timestamp=time.time()
if (timestamp-timestampprec>debouncingtime) and counter>0:
callback()
counter=counter+1
timestampprec=timestamp
def run(self):
self.exc = None
try:
sysfs.edge(self._pin, self._trigger)
initial_edge = True
with sysfs.value_descriptor(self._pin) as fd:
e = select.epoll()
e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI)
try:
while not self._finished:
events = e.poll(0.1, maxevents=1)
if initial_edge:
initial_edge = False
elif len(events) > 0:
with self._lock:
self._event_detected = True
self.notify_callbacks()
finally:
e.unregister(fd)
e.close()
except BaseException as e:
self.exc = e
finally:
sysfs.edge(self._pin, NONE)
def blocking_wait_for_edge(pin, trigger, timeout=-1):
assert trigger in [RISING, FALLING, BOTH]
if pin in _threads:
raise RuntimeError("Conflicting edge detection events already exist for this GPIO channel")
try:
sysfs.edge(pin, trigger)
finished = False
initial_edge = True
with sysfs.value_descriptor(pin) as fd:
e = select.epoll()
e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI)
try:
while not finished:
# TODO: implement bouncetime
events = e.poll(timeout / 1000.0, maxevents=1)
if initial_edge:
initial_edge = False
else:
finished = True
n = len(events)
if n == 0:
return None
else:
return pin
finally:
e.unregister(fd)
e.close()
finally:
sysfs.edge(pin, NONE)
def run_forever():
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_fd.bind(('', master_connector.tcp_port))
listen_fd.listen(master_connector.max_minions)
master_connector.epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
datalist = {}
master_connector.establish_vswitch('master')
try:
while True:
epoll_list = master_connector.epoll_fd.poll()
for fd, events in epoll_list:
if fd == listen_fd.fileno():
fileno, addr = listen_fd.accept()
fileno.setblocking(0)
master_connector.epoll_fd.register(fileno.fileno(), select.EPOLLIN | select.EPOLLET)
master_connector.conn[fileno.fileno()] = (fileno, addr[0])
master_connector.build_gre_conn('master', addr[0])
elif select.EPOLLIN & events:
datas = b''
while True:
try:
data = master_connector.conn[fd][0].recv(10)
if not data and not datas:
master_connector.close_connection(fd)
break
else:
datas += data
except socket.error as msg:
if msg.errno == errno.EAGAIN:
try:
datalist[fd] = master_connector.do_message_response(datas)
master_connector.epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
except:
master_connector.close_connection(fd)
else:
master_connector.close_connection(fd)
break
elif select.EPOLLOUT & events:
sendLen = 0
while True:
sendLen += master_connector.conn[fd][0].send(datalist[fd][sendLen:])
if sendLen == len(datalist[fd]):
break
master_connector.epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
elif select.EPOLLHUP & events:
master_connector.close_connection(fd)
else:
continue
finally:
os.system('ovs-vsctl del-br ovs-master >/dev/null 2>&1')
def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
self.assertFalse(then - now > 0.01, then - now)
now = time.time()
events = ep.poll(timeout=2.1, maxevents=4)
then = time.time()
self.assertFalse(events)
client.send(b"Hello!")
server.send(b"world!!!")
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)
def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
events = ep.poll(timeout=2.1, maxevents=4)
self.assertFalse(events)
client.send("Hello!")
server.send("world!!!")
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)
def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
events = ep.poll(timeout=2.1, maxevents=4)
self.assertFalse(events)
client.send("Hello!")
server.send("world!!!")
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)
def __init__(self, number, direction=INPUT, callback=None, edge=None, active_low=0):
"""
@type number: int
@param number: The pin number
@type direction: int
@param direction: Pin direction, enumerated by C{Direction}
@type callback: callable
@param callback: Method be called when pin changes state
@type edge: int
@param edge: The edge transition that triggers callback,
enumerated by C{Edge}
@type active_low: int
@param active_low: Indicator of whether this pin uses inverted
logic for HIGH-LOW transitions.
"""
self._number = number
self._direction = direction
self._callback = callback
self._active_low = active_low
if not os.path.isdir(self._sysfs_gpio_value_path()):
with open(SYSFS_EXPORT_PATH, 'w') as export:
export.write('%d' % number)
else:
Logger.debug("SysfsGPIO: Pin %d already exported" % number)
self._fd = open(self._sysfs_gpio_value_path(), 'r+')
if callback and not edge:
raise Exception('You must supply a edge to trigger callback on')
with open(self._sysfs_gpio_direction_path(), 'w') as fsdir:
fsdir.write(direction)
if edge:
with open(self._sysfs_gpio_edge_path(), 'w') as fsedge:
fsedge.write(edge)
self._poll = select.epoll()
self._poll.register(self, (select.EPOLLPRI | select.EPOLLET))
self.thread = Thread(target=self._run)
self.thread.daemon = True
self._running = True
self.start()
if active_low:
if active_low not in ACTIVE_LOW_MODES:
raise Exception('You must supply a value for active_low which is either 0 or 1.')
with open(self._sysfs_gpio_active_low_path(), 'w') as fsactive_low:
fsactive_low.write(str(active_low))
def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
self.assertFalse(then - now > 0.01, then - now)
now = time.time()
events = ep.poll(timeout=2.1, maxevents=4)
then = time.time()
self.assertFalse(events)
client.send(b"Hello!")
server.send(b"world!!!")
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)
def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
events = ep.poll(timeout=2.1, maxevents=4)
self.assertFalse(events)
client.send("Hello!")
server.send("world!!!")
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)
def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.monotonic()
events = ep.poll(1, 4)
then = time.monotonic()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
events = ep.poll(timeout=2.1, maxevents=4)
self.assertFalse(events)
client.send(b"Hello!")
server.send(b"world!!!")
now = time.monotonic()
events = ep.poll(1, 4)
then = time.monotonic()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.monotonic()
events = ep.poll(1, 4)
then = time.monotonic()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)
def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
self.assertFalse(then - now > 0.01, then - now)
now = time.time()
events = ep.poll(timeout=2.1, maxevents=4)
then = time.time()
self.assertFalse(events)
client.send("Hello!")
server.send("world!!!")
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.time()
events = ep.poll(1, 4)
then = time.time()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)
def test_control_and_wait(self):
client, server = self._connected_pair()
ep = select.epoll(16)
ep.register(server.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
ep.register(client.fileno(),
select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
now = time.monotonic()
events = ep.poll(1, 4)
then = time.monotonic()
self.assertFalse(then - now > 0.1, then - now)
events.sort()
expected = [(client.fileno(), select.EPOLLOUT),
(server.fileno(), select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
events = ep.poll(timeout=2.1, maxevents=4)
self.assertFalse(events)
client.send(b"Hello!")
server.send(b"world!!!")
now = time.monotonic()
events = ep.poll(1, 4)
then = time.monotonic()
self.assertFalse(then - now > 0.01)
events.sort()
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
expected.sort()
self.assertEqual(events, expected)
ep.unregister(client.fileno())
ep.modify(server.fileno(), select.EPOLLOUT)
now = time.monotonic()
events = ep.poll(1, 4)
then = time.monotonic()
self.assertFalse(then - now > 0.01)
expected = [(server.fileno(), select.EPOLLOUT)]
self.assertEqual(events, expected)