def _extra_main(extra_func, threaded, intr_event, proc_idx, args):
if not threaded:
_interrupted = False
def raise_kbdintr(signum, frame):
nonlocal _interrupted
if not _interrupted:
_interrupted = True
raise KeyboardInterrupt
# restore signal handler.
signal.signal(signal.SIGINT, raise_kbdintr)
signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT})
intr_event = None
try:
extra_func(intr_event, proc_idx, args)
except SystemExit:
pass
finally:
if not threaded:
# same as in _worker_main()
signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
python类pthread_sigmask()的实例源码
def test_sigpending(self):
code = """if 1:
import os
import signal
def handler(signum, frame):
1/0
signum = signal.SIGUSR1
signal.signal(signum, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
os.kill(os.getpid(), signum)
pending = signal.sigpending()
if pending != {signum}:
raise Exception('%s != {%s}' % (pending, signum))
try:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
except ZeroDivisionError:
pass
else:
raise Exception("ZeroDivisionError not raised")
"""
assert_python_ok('-c', code)
def _serve(self):
if hasattr(signal, 'pthread_sigmask'):
signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
while 1:
try:
with self._listener.accept() as conn:
msg = conn.recv()
if msg is None:
break
key, destination_pid = msg
send, close = self._cache.pop(key)
try:
send(conn, destination_pid)
finally:
close()
except:
if not util.is_exiting():
sys.excepthook(*sys.exc_info())
def test_sigpending(self):
code = """if 1:
import os
import signal
def handler(signum, frame):
1/0
signum = signal.SIGUSR1
signal.signal(signum, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
os.kill(os.getpid(), signum)
pending = signal.sigpending()
if pending != {signum}:
raise Exception('%s != {%s}' % (pending, signum))
try:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
except ZeroDivisionError:
pass
else:
raise Exception("ZeroDivisionError not raised")
"""
assert_python_ok('-c', code)
def _serve(self):
if hasattr(signal, 'pthread_sigmask'):
signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
while 1:
try:
with self._listener.accept() as conn:
msg = conn.recv()
if msg is None:
break
key, destination_pid = msg
send, close = self._cache.pop(key)
try:
send(conn, destination_pid)
finally:
close()
except:
if not util.is_exiting():
sys.excepthook(*sys.exc_info())
def test_sigpending(self):
code = """if 1:
import os
import signal
def handler(signum, frame):
1/0
signum = signal.SIGUSR1
signal.signal(signum, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
os.kill(os.getpid(), signum)
pending = signal.sigpending()
if pending != {signum}:
raise Exception('%s != {%s}' % (pending, signum))
try:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
except ZeroDivisionError:
pass
else:
raise Exception("ZeroDivisionError not raised")
"""
assert_python_ok('-c', code)
def test_pending(self):
self.check_wakeup("""def test():
signum1 = signal.SIGUSR1
signum2 = signal.SIGUSR2
signal.signal(signum1, handler)
signal.signal(signum2, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
os.kill(os.getpid(), signum1)
os.kill(os.getpid(), signum2)
# Unblocking the 2 signals calls the C signal handler twice
signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
""", signal.SIGUSR1, signal.SIGUSR2, ordered=False)
def test_sigwait_thread(self):
# Check that calling sigwait() from a thread doesn't suspend the whole
# process. A new interpreter is spawned to avoid problems when mixing
# threads and fork(): only async-safe functions are allowed between
# fork() and exec().
assert_python_ok("-c", """if True:
import os, threading, sys, time, signal
# the default handler terminates the process
signum = signal.SIGUSR1
def kill_later():
# wait until the main thread is waiting in sigwait()
time.sleep(1)
os.kill(os.getpid(), signum)
# the signal must be blocked by all the threads
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
killer = threading.Thread(target=kill_later)
killer.start()
received = signal.sigwait([signum])
if received != signum:
print("sigwait() received %s, not %s" % (received, signum),
file=sys.stderr)
sys.exit(1)
killer.join()
# unblock the signal, which should have been cleared by sigwait()
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
""")
def test_pthread_sigmask_arguments(self):
self.assertRaises(TypeError, signal.pthread_sigmask)
self.assertRaises(TypeError, signal.pthread_sigmask, 1)
self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
def test_pending(self):
self.check_wakeup("""def test():
signum1 = signal.SIGUSR1
signum2 = signal.SIGUSR2
signal.signal(signum1, handler)
signal.signal(signum2, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
os.kill(os.getpid(), signum1)
os.kill(os.getpid(), signum2)
# Unblocking the 2 signals calls the C signal handler twice
signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
""", signal.SIGUSR1, signal.SIGUSR2, ordered=False)
def test_sigwait_thread(self):
# Check that calling sigwait() from a thread doesn't suspend the whole
# process. A new interpreter is spawned to avoid problems when mixing
# threads and fork(): only async-safe functions are allowed between
# fork() and exec().
assert_python_ok("-c", """if True:
import os, threading, sys, time, signal
# the default handler terminates the process
signum = signal.SIGUSR1
def kill_later():
# wait until the main thread is waiting in sigwait()
time.sleep(1)
os.kill(os.getpid(), signum)
# the signal must be blocked by all the threads
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
killer = threading.Thread(target=kill_later)
killer.start()
received = signal.sigwait([signum])
if received != signum:
print("sigwait() received %s, not %s" % (received, signum),
file=sys.stderr)
sys.exit(1)
killer.join()
# unblock the signal, which should have been cleared by sigwait()
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
""")
def _give_terminal_to(pgid):
# over-simplified version of:
# give_terminal_to from bash 4.3 source, jobs.c, line 4030
# this will give the terminal to the process group pgid
if _shell_tty is not None and os.isatty(_shell_tty):
oldmask = signal.pthread_sigmask(signal.SIG_BLOCK, _block_when_giving)
os.tcsetpgrp(_shell_tty, pgid)
signal.pthread_sigmask(signal.SIG_SETMASK, oldmask)
def test_pending(self):
self.check_wakeup("""def test():
signum1 = signal.SIGUSR1
signum2 = signal.SIGUSR2
signal.signal(signum1, handler)
signal.signal(signum2, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
os.kill(os.getpid(), signum1)
os.kill(os.getpid(), signum2)
# Unblocking the 2 signals calls the C signal handler twice
signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
""", signal.SIGUSR1, signal.SIGUSR2, ordered=False)
def test_sigwait_thread(self):
# Check that calling sigwait() from a thread doesn't suspend the whole
# process. A new interpreter is spawned to avoid problems when mixing
# threads and fork(): only async-safe functions are allowed between
# fork() and exec().
assert_python_ok("-c", """if True:
import os, threading, sys, time, signal
# the default handler terminates the process
signum = signal.SIGUSR1
def kill_later():
# wait until the main thread is waiting in sigwait()
time.sleep(1)
os.kill(os.getpid(), signum)
# the signal must be blocked by all the threads
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
killer = threading.Thread(target=kill_later)
killer.start()
received = signal.sigwait([signum])
if received != signum:
print("sigwait() received %s, not %s" % (received, signum),
file=sys.stderr)
sys.exit(1)
killer.join()
# unblock the signal, which should have been cleared by sigwait()
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
""")
def _worker_main(worker_actxmgr, threaded, proc_idx, args):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
interrupted = False
if threaded:
with _children_lock:
_children_loops.append(loop)
def _handle_term_signal():
nonlocal interrupted
if not interrupted:
loop.stop()
interrupted = True
async def _work():
async with worker_actxmgr(loop, proc_idx, args):
yield
if not threaded:
loop.add_signal_handler(signal.SIGINT, _handle_term_signal)
signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT})
try:
task = _work()
loop.run_until_complete(task.__anext__())
except Exception:
log.exception("Unexpected error during worker initialization!")
# interrupt the main loop.
os.killpg(os.getpgid(0), signal.SIGINT)
try:
# even when the previous __anext__() call has errored,
# we need to run the loop so that we can receive the SIGINT
# sent by the main loop first and then terminate.
loop.run_forever()
except (SystemExit, KeyboardInterrupt):
pass
finally:
try:
loop.run_until_complete(task.__anext__())
except StopAsyncIteration:
loop.run_until_complete(loop.shutdown_asyncgens())
else:
raise RuntimeError('should not happen') # pragma: no cover
if not threaded:
# Prevent multiple delivery of signals and too early
# termination of the worker process before multiprocessing
# handles the termination.
# Without this line, it often generates:
# Exception ignored when trying to write to the signal wakeup fd:
# BrokenPipeError: [Errno 32] Broken pipe
signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
loop.close()
def wait_helper(self, blocked, test):
"""
test: body of the "def test(signum):" function.
blocked: number of the blocked signal
"""
code = '''if 1:
import signal
import sys
def handler(signum, frame):
1/0
%s
blocked = %s
signum = signal.SIGALRM
# child: block and wait the signal
try:
signal.signal(signum, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
# Do the tests
test(signum)
# The handler must not be called on unblock
try:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
except ZeroDivisionError:
print("the signal handler has been called",
file=sys.stderr)
sys.exit(1)
except BaseException as err:
print("error: {}".format(err), file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
''' % (test.strip(), blocked)
# sig*wait* must be called with the signal blocked: since the current
# process might have several threads running, use a subprocess to have
# a single thread.
assert_python_ok('-c', code)
def wait_helper(self, blocked, test):
"""
test: body of the "def test(signum):" function.
blocked: number of the blocked signal
"""
code = '''if 1:
import signal
import sys
def handler(signum, frame):
1/0
%s
blocked = %s
signum = signal.SIGALRM
# child: block and wait the signal
try:
signal.signal(signum, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
# Do the tests
test(signum)
# The handler must not be called on unblock
try:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
except ZeroDivisionError:
print("the signal handler has been called",
file=sys.stderr)
sys.exit(1)
except BaseException as err:
print("error: {}".format(err), file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
''' % (test.strip(), blocked)
# sig*wait* must be called with the signal blocked: since the current
# process might have several threads running, use a subprocess to have
# a single thread.
assert_python_ok('-c', code)
def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
"""Check that a partial write, when it gets interrupted, properly
invokes the signal handler, and bubbles up the exception raised
in the latter."""
read_results = []
def _read():
if hasattr(signal, 'pthread_sigmask'):
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
s = os.read(r, 1)
read_results.append(s)
t = threading.Thread(target=_read)
t.daemon = True
r, w = os.pipe()
fdopen_kwargs["closefd"] = False
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
# Fill the pipe enough that the write will be blocking.
# It will be interrupted by the timer armed above. Since the
# other thread has read one byte, the low-level write will
# return with a successful (partial) result rather than an EINTR.
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
signal.alarm(1)
try:
with self.assertRaises(ZeroDivisionError):
wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
finally:
signal.alarm(0)
t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
read_results.append(os.read(r, 1))
self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
finally:
os.close(w)
os.close(r)
# This is deliberate. If we didn't close the file descriptor
# before closing wio, wio would try to flush its internal
# buffer, and block again.
try:
wio.close()
except OSError as e:
if e.errno != errno.EBADF:
raise
def wait_helper(self, blocked, test):
"""
test: body of the "def test(signum):" function.
blocked: number of the blocked signal
"""
code = '''if 1:
import signal
import sys
def handler(signum, frame):
1/0
%s
blocked = %s
signum = signal.SIGALRM
# child: block and wait the signal
try:
signal.signal(signum, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
# Do the tests
test(signum)
# The handler must not be called on unblock
try:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
except ZeroDivisionError:
print("the signal handler has been called",
file=sys.stderr)
sys.exit(1)
except BaseException as err:
print("error: {}".format(err), file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
''' % (test.strip(), blocked)
# sig*wait* must be called with the signal blocked: since the current
# process might have several threads running, use a subprocess to have
# a single thread.
assert_python_ok('-c', code)
def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
"""Check that a partial write, when it gets interrupted, properly
invokes the signal handler, and bubbles up the exception raised
in the latter."""
read_results = []
def _read():
if hasattr(signal, 'pthread_sigmask'):
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
s = os.read(r, 1)
read_results.append(s)
t = threading.Thread(target=_read)
t.daemon = True
r, w = os.pipe()
fdopen_kwargs["closefd"] = False
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
# Fill the pipe enough that the write will be blocking.
# It will be interrupted by the timer armed above. Since the
# other thread has read one byte, the low-level write will
# return with a successful (partial) result rather than an EINTR.
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
signal.alarm(1)
try:
self.assertRaises(ZeroDivisionError,
wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
finally:
signal.alarm(0)
t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
read_results.append(os.read(r, 1))
self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
finally:
os.close(w)
os.close(r)
# This is deliberate. If we didn't close the file descriptor
# before closing wio, wio would try to flush its internal
# buffer, and block again.
try:
wio.close()
except OSError as e:
if e.errno != errno.EBADF:
raise