def allow_interruption(*callbacks):
if sys.platform == 'win32':
from ctypes import WINFUNCTYPE, windll
from ctypes.wintypes import BOOL, DWORD
kernel32 = windll.LoadLibrary('kernel32')
phandler_routine = WINFUNCTYPE(BOOL, DWORD)
setconsolectrlhandler = kernel32.SetConsoleCtrlHandler
setconsolectrlhandler.argtypes = (phandler_routine, BOOL)
setconsolectrlhandler.restype = BOOL
@phandler_routine
def shutdown(event):
if event == 0:
for loop, cb in callbacks:
loop.call_soon_threadsafe(cb)
return 1
return 0
if setconsolectrlhandler(shutdown, 1) == 0:
raise WindowsError()
else:
def handler(*args):
for loop, cb in callbacks:
loop.call_soon_threadsafe(cb)
signal.signal(signal.SIGINT, handler)
try:
yield
finally:
if sys.platform == 'win32':
if setconsolectrlhandler(shutdown, 0) == 0:
raise WindowsError()
else:
signal.signal(signal.SIGINT, signal.SIG_DFL)
评论列表
文章目录