def initialize_exception_listener(): # must be invoked in main thread in "geventless" runs in order for raise_in_main_thread to work
global REGISTERED_SIGNAL
if REGISTERED_SIGNAL:
# already registered
return
if threading.current_thread() is not threading.main_thread():
raise NotMainThread()
def handle_signal(sig, stack):
global LAST_ERROR
error = LAST_ERROR
LAST_ERROR = None
if error:
raise error
raise LastErrorEmpty(signal=sig)
custom_signal = signal.SIGUSR1
if signal.getsignal(custom_signal) in (signal.SIG_DFL, signal.SIG_IGN): # check if signal is already trapped
signal.signal(custom_signal, handle_signal)
REGISTERED_SIGNAL = custom_signal
else:
raise SignalAlreadyBound(signal=custom_signal)
评论列表
文章目录