def test_server_singleproc(restore_signal):
started = mp.Value('i', 0)
terminated = mp.Value('i', 0)
def interrupt():
os.kill(0, signal.SIGINT)
@aiotools.actxmgr
async def myserver(loop, proc_idx, args):
nonlocal started, terminated
assert proc_idx == 0
assert len(args) == 0
await asyncio.sleep(0)
with started.get_lock():
started.value += 1
loop.call_later(0.2, interrupt)
yield
await asyncio.sleep(0)
with terminated.get_lock():
terminated.value += 1
aiotools.start_server(myserver)
assert started.value == 1
assert terminated.value == 1
评论列表
文章目录