test_server.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:aiotools 作者: achimnol 项目源码 文件源码
def test_server_singleproc(restore_signal):

    started = mp.Value('i', 0)
    terminated = mp.Value('i', 0)

    def interrupt():
        os.kill(0, signal.SIGINT)

    @aiotools.actxmgr
    async def myserver(loop, proc_idx, args):
        nonlocal started, terminated
        assert proc_idx == 0
        assert len(args) == 0
        await asyncio.sleep(0)
        with started.get_lock():
            started.value += 1
        loop.call_later(0.2, interrupt)

        yield

        await asyncio.sleep(0)
        with terminated.get_lock():
            terminated.value += 1

    aiotools.start_server(myserver)

    assert started.value == 1
    assert terminated.value == 1
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号