def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
if (threading.current_thread() != threading.main_thread()
or signal.getsignal(signal.SIGINT) != signal.default_int_handler):
yield
return
def handler(signum, frame):
assert signum == signal.SIGINT
protection_enabled = ki_protection_enabled(frame)
if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
deliver_cb()
else:
raise KeyboardInterrupt
signal.signal(signal.SIGINT, handler)
try:
yield
finally:
if signal.getsignal(signal.SIGINT) is handler:
signal.signal(signal.SIGINT, signal.default_int_handler)
评论列表
文章目录