def start_service(self):
"""
start speaker training service.
"""
# prevent signal from propagating to child process
handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
if self.debug:
self.sprecog.debug = True
mp.log_to_stderr(logging.DEBUG)
self.sprecog.speaker_name = self.speaker_name
self.proc = mp.Process(name="watchdog", target=self.__run,
args=(self.event,))
self.proc.setDaemon = False
self.proc.start()
# restore signal
signal.signal(signal.SIGINT, handler)
python类getsignal()的实例源码
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def bug1():
# BUG:
# sys.stdin.readline() in `ask` dies with IOError and
# errno=EINTR when the terminal gets resized after curses has
# been de-initialized.
# 1: SIGWINCH is sent by the terminal when the screen has been
# resized.
# 2: curses forgot to restore the signal handling of SIGWINCH
# to the default of ignoring the signal.
# NOTE: signal.getsignal(signal.SIGWINCH) incorrectly
# returns signal.SIG_DFL. (Default is to be ignored.)
# 3: Python fails to handle EINTR when reading from stdin.
# REFERENCES:
# Issue 3949: https://bugs.python.org/issue3949
# PEP 0457: https://www.python.org/dev/peps/pep-0475/
def handler(foo, bar):
print("Resized")
signal.signal(signal.SIGWINCH, handler)
while True:
sys.stdin.readline()
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def testInterruptCaught(self):
default_handler = signal.getsignal(signal.SIGINT)
result = unittest.TestResult()
unittest.installHandler()
unittest.registerResult(result)
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
result.breakCaught = True
self.assertTrue(result.shouldStop)
try:
test(result)
except KeyboardInterrupt:
self.fail("KeyboardInterrupt not handled")
self.assertTrue(result.breakCaught)
def testSecondInterrupt(self):
# Can't use skipIf decorator because the signal handler may have
# been changed after defining this method.
if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
self.skipTest("test requires SIGINT to not be ignored")
result = unittest.TestResult()
unittest.installHandler()
unittest.registerResult(result)
def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
result.breakCaught = True
self.assertTrue(result.shouldStop)
os.kill(pid, signal.SIGINT)
self.fail("Second KeyboardInterrupt not raised")
try:
test(result)
except KeyboardInterrupt:
pass
else:
self.fail("Second KeyboardInterrupt not raised")
self.assertTrue(result.breakCaught)
def testTwoResults(self):
unittest.installHandler()
result = unittest.TestResult()
unittest.registerResult(result)
new_handler = signal.getsignal(signal.SIGINT)
result2 = unittest.TestResult()
unittest.registerResult(result2)
self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
result3 = unittest.TestResult()
def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
try:
test(result)
except KeyboardInterrupt:
self.fail("KeyboardInterrupt not handled")
self.assertTrue(result.shouldStop)
self.assertTrue(result2.shouldStop)
self.assertFalse(result3.shouldStop)
def _handleSignals(self):
"""Install the signal handlers for the Twisted event loop."""
try:
import signal
except ImportError:
log.msg("Warning: signal module unavailable -- not installing signal handlers.")
return
if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
# only handle if there isn't already a handler, e.g. for Pdb.
signal.signal(signal.SIGINT, self.sigInt)
signal.signal(signal.SIGTERM, self.sigTerm)
# Catch Ctrl-Break in windows
if hasattr(signal, "SIGBREAK"):
signal.signal(signal.SIGBREAK, self.sigBreak)
if platformType == 'posix':
signal.signal(signal.SIGCHLD, self._handleSigchld)
def testInterruptCaught(self):
default_handler = signal.getsignal(signal.SIGINT)
result = unittest2.TestResult()
unittest2.installHandler()
unittest2.registerResult(result)
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
result.breakCaught = True
self.assertTrue(result.shouldStop)
try:
test(result)
except KeyboardInterrupt:
self.fail("KeyboardInterrupt not handled")
self.assertTrue(result.breakCaught)
def testSecondInterrupt(self):
# Can't use skipIf decorator because the signal handler may have
# been changed after defining this method.
if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
self.skipTest("test requires SIGINT to not be ignored")
result = unittest2.TestResult()
unittest2.installHandler()
unittest2.registerResult(result)
def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
result.breakCaught = True
self.assertTrue(result.shouldStop)
os.kill(pid, signal.SIGINT)
self.fail("Second KeyboardInterrupt not raised")
try:
test(result)
except KeyboardInterrupt:
pass
else:
self.fail("Second KeyboardInterrupt not raised")
self.assertTrue(result.breakCaught)
def testTwoResults(self):
unittest2.installHandler()
result = unittest2.TestResult()
unittest2.registerResult(result)
new_handler = signal.getsignal(signal.SIGINT)
result2 = unittest2.TestResult()
unittest2.registerResult(result2)
self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
result3 = unittest2.TestResult()
def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
try:
test(result)
except KeyboardInterrupt:
self.fail("KeyboardInterrupt not handled")
self.assertTrue(result.shouldStop)
self.assertTrue(result2.shouldStop)
self.assertFalse(result3.shouldStop)
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def test_context_manager_with_signal(self):
init_signals = get_signals(self.signals)
with signal_receiver(self.signals) as signals_received:
with self.handler:
should_be_42 = 42
send_signal(self.signals[0])
should_be_42 *= 10
# check execution stoped when the signal was sent
self.assertEqual(42, should_be_42)
# assert signals were caught
self.assertEqual([self.signals[0]], signals_received)
# assert the error handling function was just called once
self.init_func.assert_called_once_with(*self.init_args,
**self.init_kwargs)
for signum in self.signals:
self.assertEqual(init_signals[signum], signal.getsignal(signum))
def signals_to_exception(signals=(signal.SIGINT, signal.SIGTERM)):
"""Context manager that makes sure that converts system signals to exceptions.
This allows e.g. for a graceful exit after receiving SIGTERM (e.g. through
`kill` on UNIX systems).
Example:
>>> with signals_to_exception():
>>> try:
>>> # do something
>>> except SystemExit:
>>> # graceful exit even upon receiving interrupt signal
"""
def signal_to_exception(sig, frame):
raise SystemExit("received interrupt signal {}".format(sig))
old_signals = {}
for s in signals:
old_signals[s] = signal.getsignal(s)
signal.signal(s, signal_to_exception)
try:
yield
finally:
for s in signals:
signal.signal(s, old_signals[s])
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def test_catch_signals():
print = lambda *args: None
orig = signal.getsignal(signal.SIGILL)
print(orig)
with catch_signals([signal.SIGILL]) as queue:
# Raise it a few times, to exercise signal coalescing, both at the
# call_soon level and at the SignalQueue level
signal_raise(signal.SIGILL)
signal_raise(signal.SIGILL)
await _core.wait_all_tasks_blocked()
signal_raise(signal.SIGILL)
await _core.wait_all_tasks_blocked()
async for batch in queue: # pragma: no branch
assert batch == {signal.SIGILL}
break
signal_raise(signal.SIGILL)
async for batch in queue: # pragma: no branch
assert batch == {signal.SIGILL}
break
with pytest.raises(RuntimeError):
await queue.__anext__()
assert signal.getsignal(signal.SIGILL) is orig
def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
if (threading.current_thread() != threading.main_thread()
or signal.getsignal(signal.SIGINT) != signal.default_int_handler):
yield
return
def handler(signum, frame):
assert signum == signal.SIGINT
protection_enabled = ki_protection_enabled(frame)
if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
deliver_cb()
else:
raise KeyboardInterrupt
signal.signal(signal.SIGINT, handler)
try:
yield
finally:
if signal.getsignal(signal.SIGINT) is handler:
signal.signal(signal.SIGINT, signal.default_int_handler)