def test_invalid_fd(self):
fd = test_support.make_bad_fd()
self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
python类set_wakeup_fd()的实例源码
def setUp(self):
import fcntl
self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
self.read, self.write = os.pipe()
flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
self.old_wakeup = signal.set_wakeup_fd(self.write)
def tearDown(self):
signal.set_wakeup_fd(self.old_wakeup)
os.close(self.read)
os.close(self.write)
signal.signal(signal.SIGALRM, self.alrm)
def remove_signal_handler(self, sig):
"""Remove a handler for a signal. UNIX only.
Return True if a signal handler was removed, False if not.
"""
self._check_signal(sig)
try:
del self._signal_handlers[sig]
except KeyError:
return False
if sig == signal.SIGINT:
handler = signal.default_int_handler
else:
handler = signal.SIG_DFL
try:
signal.signal(sig, handler)
except OSError as exc:
if exc.errno == errno.EINVAL:
raise RuntimeError('sig {} cannot be caught'.format(sig))
else:
raise
if not self._signal_handlers:
try:
signal.set_wakeup_fd(-1)
except (ValueError, OSError) as exc:
logger.info('set_wakeup_fd(-1) failed: %s', exc)
return True
def test_invalid_fd(self):
fd = support.make_bad_fd()
self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
def check_wakeup(self, test_body, *signals, ordered=True):
# use a subprocess to have only one thread
code = """if 1:
import fcntl
import os
import signal
import struct
signals = {!r}
def handler(signum, frame):
pass
def check_signum(signals):
data = os.read(read, len(signals)+1)
raised = struct.unpack('%uB' % len(data), data)
if not {!r}:
raised = set(raised)
signals = set(signals)
if raised != signals:
raise Exception("%r != %r" % (raised, signals))
{}
signal.signal(signal.SIGALRM, handler)
read, write = os.pipe()
for fd in (read, write):
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
signal.set_wakeup_fd(write)
test()
check_signum(signals)
os.close(read)
os.close(write)
""".format(signals, ordered, test_body)
assert_python_ok('-c', code)
def test_invalid_fd(self):
fd = test_support.make_bad_fd()
self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
def setUp(self):
import fcntl
self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
self.read, self.write = os.pipe()
flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
self.old_wakeup = signal.set_wakeup_fd(self.write)
def tearDown(self):
signal.set_wakeup_fd(self.old_wakeup)
os.close(self.read)
os.close(self.write)
signal.signal(signal.SIGALRM, self.alrm)
def _init_signals(self):
self._signal_sets = defaultdict(list)
self._default_signals = {}
old_fd = signal.set_wakeup_fd(self._notify_sock.fileno())
assert old_fd < 0, 'Signals already initialized %d' % old_fd
def _shutdown_resources(self):
log.debug('Kernel %r shutting down', self)
if self._selector:
self._selector.close()
self._selector = None
if self._notify_sock:
self._notify_sock.close()
self._notify_sock = None
self._wait_sock.close()
self._wait_sock = None
if self._signal_sets:
signal.set_wakeup_fd(-1)
self._signal_sets = None
self._default_signals = None
if self._thread_pool:
self._thread_pool.shutdown()
self._thread_pool = None
if self._process_pool:
self._process_pool.shutdown()
self._process_pool = None
if self._monitor:
self._monitor.close()
# Main Kernel Loop
# ----------
def add_signal_handler(self, sig, callback, *args):
"""Add a handler for a signal. UNIX only.
Raise ValueError if the signal number is invalid or uncatchable.
Raise RuntimeError if there is a problem setting up the handler.
"""
self._check_signal(sig)
try:
# set_wakeup_fd() raises ValueError if this is not the
# main thread. By calling it early we ensure that an
# event loop running in another thread cannot add a signal
# handler.
signal.set_wakeup_fd(self._csock.fileno())
except (ValueError, OSError) as exc:
raise RuntimeError(str(exc))
handle = events.Handle(callback, args, self)
self._signal_handlers[sig] = handle
try:
# Register a dummy signal handler to ask Python to write the signal
# number in the wakup file descriptor. _process_self_data() will
# read signal numbers from this file descriptor to handle signals.
signal.signal(sig, _sighandler_noop)
# Set SA_RESTART to limit EINTR occurrences.
signal.siginterrupt(sig, False)
except OSError as exc:
del self._signal_handlers[sig]
if not self._signal_handlers:
try:
signal.set_wakeup_fd(-1)
except (ValueError, OSError) as nexc:
logger.info('set_wakeup_fd(-1) failed: %s', nexc)
if exc.errno == errno.EINVAL:
raise RuntimeError('sig {} cannot be caught'.format(sig))
else:
raise
def remove_signal_handler(self, sig):
"""Remove a handler for a signal. UNIX only.
Return True if a signal handler was removed, False if not.
"""
self._check_signal(sig)
try:
del self._signal_handlers[sig]
except KeyError:
return False
if sig == signal.SIGINT:
handler = signal.default_int_handler
else:
handler = signal.SIG_DFL
try:
signal.signal(sig, handler)
except OSError as exc:
if exc.errno == errno.EINVAL:
raise RuntimeError('sig {} cannot be caught'.format(sig))
else:
raise
if not self._signal_handlers:
try:
signal.set_wakeup_fd(-1)
except (ValueError, OSError) as exc:
logger.info('set_wakeup_fd(-1) failed: %s', exc)
return True
def test_invalid_fd(self):
fd = support.make_bad_fd()
self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
def check_wakeup(self, test_body, *signals, ordered=True):
# use a subprocess to have only one thread
code = """if 1:
import fcntl
import os
import signal
import struct
signals = {!r}
def handler(signum, frame):
pass
def check_signum(signals):
data = os.read(read, len(signals)+1)
raised = struct.unpack('%uB' % len(data), data)
if not {!r}:
raised = set(raised)
signals = set(signals)
if raised != signals:
raise Exception("%r != %r" % (raised, signals))
{}
signal.signal(signal.SIGALRM, handler)
read, write = os.pipe()
for fd in (read, write):
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
signal.set_wakeup_fd(write)
test()
check_signum(signals)
os.close(read)
os.close(write)
""".format(signals, ordered, test_body)
assert_python_ok('-c', code)
def set_sigchld_handler(self):
# TODO: find out whether set_wakeup_fd still works if the default
# signal handler is used (I'm pretty sure it doesn't work if the
# signal is ignored).
signal.signal(signal.SIGCHLD, self.handle_sigchld)
# This should keep reads and writes from getting EINTR.
if hasattr(signal, 'siginterrupt'):
signal.siginterrupt(signal.SIGCHLD, False)
def handle_sigchld(self, number, frame):
"""Apparently we need a sigchld handler to make set_wakeup_fd work."""
# Write to the signal pipe (only for Python <2.5, where the
# set_wakeup_fd method doesn't exist).
if self.iomap.wakeup_writefd:
os.write(self.iomap.wakeup_writefd, '\0')
for task in self.running:
if task.proc:
task.proc.poll()
# Apparently some UNIX systems automatically reset the SIGCHLD
# handler to SIG_DFL. Reset it just in case.
self.set_sigchld_handler()
def __init__(self):
self.readmap = {}
self.writemap = {}
# Setup the wakeup file descriptor to avoid hanging on lost signals.
wakeup_readfd, wakeup_writefd = os.pipe()
self.register_read(wakeup_readfd, self.wakeup_handler)
# TODO: remove test when we stop supporting Python <2.5
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(wakeup_writefd)
self.wakeup_writefd = None
else:
self.wakeup_writefd = wakeup_writefd
def add_signal_handler(self, sig, callback, *args):
"""Add a handler for a signal. UNIX only.
Raise ValueError if the signal number is invalid or uncatchable.
Raise RuntimeError if there is a problem setting up the handler.
"""
if (coroutines.iscoroutine(callback)
or coroutines.iscoroutinefunction(callback)):
raise TypeError("coroutines cannot be used "
"with add_signal_handler()")
self._check_signal(sig)
self._check_closed()
try:
# set_wakeup_fd() raises ValueError if this is not the
# main thread. By calling it early we ensure that an
# event loop running in another thread cannot add a signal
# handler.
signal.set_wakeup_fd(self._csock.fileno())
except (ValueError, OSError) as exc:
raise RuntimeError(str(exc))
handle = events.Handle(callback, args, self)
self._signal_handlers[sig] = handle
try:
# Register a dummy signal handler to ask Python to write the signal
# number in the wakup file descriptor. _process_self_data() will
# read signal numbers from this file descriptor to handle signals.
signal.signal(sig, _sighandler_noop)
# Set SA_RESTART to limit EINTR occurrences.
signal.siginterrupt(sig, False)
except OSError as exc:
del self._signal_handlers[sig]
if not self._signal_handlers:
try:
signal.set_wakeup_fd(-1)
except (ValueError, OSError) as nexc:
logger.info('set_wakeup_fd(-1) failed: %s', nexc)
if exc.errno == errno.EINVAL:
raise RuntimeError('sig {} cannot be caught'.format(sig))
else:
raise
def main():
writer, reader = socket.socketpair()
writer.setblocking(False)
reader.setblocking(False)
signal.set_wakeup_fd(writer.fileno())
# Keep trying until we lose the race...
for attempt in itertools.count():
print(f"Attempt {attempt}: start")
# Make sure the socket is empty
drained = drain(reader)
if drained:
print(f"Attempt {attempt}: ({drained} residual bytes discarded)")
# Arrange for SIGINT to be delivered 1 second from now
thread = threading.Thread(target=raise_SIGINT_soon)
thread.start()
# Fake an IO loop that's trying to sleep for 10 seconds (but will
# hopefully get interrupted after just 1 second)
start = time.monotonic()
target = start + 10
try:
select_calls = 0
drained = 0
while True:
now = time.monotonic()
if now > target:
break
select_calls += 1
r, _, _ = select.select([reader], [], [], target - now)
if r:
# In theory we should loop to fully drain the socket but
# honestly there's 1 byte in there at most and it'll be
# fine.
drained += drain(reader)
except KeyboardInterrupt:
pass
else:
print(f"Attempt {attempt}: no KeyboardInterrupt?!")
# We expect a successful run to take 1 second, and a failed run to
# take 10 seconds, so 2 seconds is a reasonable cutoff to distinguish
# them.
duration = time.monotonic() - start
if duration < 2:
print(f"Attempt {attempt}: OK, trying again "
f"(select_calls = {select_calls}, drained = {drained})")
else:
print(f"Attempt {attempt}: FAILED, took {duration} seconds")
print(f"select_calls = {select_calls}, drained = {drained}")
break
thread.join()