def blocking_wait_for_edge(pin, trigger, timeout=-1):
assert trigger in [RISING, FALLING, BOTH]
if pin in _threads:
raise RuntimeError("Conflicting edge detection events already exist for this GPIO channel")
try:
sysfs.edge(pin, trigger)
finished = False
initial_edge = True
with sysfs.value_descriptor(pin) as fd:
e = select.epoll()
e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI)
try:
while not finished:
# TODO: implement bouncetime
events = e.poll(timeout / 1000.0, maxevents=1)
if initial_edge:
initial_edge = False
else:
finished = True
n = len(events)
if n == 0:
return None
else:
return pin
finally:
e.unregister(fd)
e.close()
finally:
sysfs.edge(pin, NONE)
评论列表
文章目录