def check_wakeup(self, test_body, *signals, ordered=True):
# use a subprocess to have only one thread
code = """if 1:
import fcntl
import os
import signal
import struct
signals = {!r}
def handler(signum, frame):
pass
def check_signum(signals):
data = os.read(read, len(signals)+1)
raised = struct.unpack('%uB' % len(data), data)
if not {!r}:
raised = set(raised)
signals = set(signals)
if raised != signals:
raise Exception("%r != %r" % (raised, signals))
{}
signal.signal(signal.SIGALRM, handler)
read, write = os.pipe()
for fd in (read, write):
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
signal.set_wakeup_fd(write)
test()
check_signum(signals)
os.close(read)
os.close(write)
""".format(signals, ordered, test_body)
assert_python_ok('-c', code)
评论列表
文章目录