def _worker_main(worker_actxmgr, threaded, proc_idx, args):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
interrupted = False
if threaded:
with _children_lock:
_children_loops.append(loop)
def _handle_term_signal():
nonlocal interrupted
if not interrupted:
loop.stop()
interrupted = True
async def _work():
async with worker_actxmgr(loop, proc_idx, args):
yield
if not threaded:
loop.add_signal_handler(signal.SIGINT, _handle_term_signal)
signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT})
try:
task = _work()
loop.run_until_complete(task.__anext__())
except Exception:
log.exception("Unexpected error during worker initialization!")
# interrupt the main loop.
os.killpg(os.getpgid(0), signal.SIGINT)
try:
# even when the previous __anext__() call has errored,
# we need to run the loop so that we can receive the SIGINT
# sent by the main loop first and then terminate.
loop.run_forever()
except (SystemExit, KeyboardInterrupt):
pass
finally:
try:
loop.run_until_complete(task.__anext__())
except StopAsyncIteration:
loop.run_until_complete(loop.shutdown_asyncgens())
else:
raise RuntimeError('should not happen') # pragma: no cover
if not threaded:
# Prevent multiple delivery of signals and too early
# termination of the worker process before multiprocessing
# handles the termination.
# Without this line, it often generates:
# Exception ignored when trying to write to the signal wakeup fd:
# BrokenPipeError: [Errno 32] Broken pipe
signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
loop.close()
评论列表
文章目录