test_multiprocessing.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:annotated-py-sanic 作者: hhstore 项目源码 文件源码
def skip_test_multiprocessing():
    app = Sanic('test_json')

    response = Array('c', 50)
    @app.route('/')
    async def handler(request):
        return json({"test": True})

    stop_event = Event()
    async def after_start(*args, **kwargs):
        http_response = await local_request('get', '/')
        response.value = http_response.text.encode()
        stop_event.set()

    def rescue_crew():
        sleep(5)
        stop_event.set()

    rescue_process = Process(target=rescue_crew)
    rescue_process.start()

    app.serve_multiple({
        'host': HOST,
        'port': PORT,
        'after_start': after_start,
        'request_handler': app.handle_request,
        'request_max_size': 100000,
    }, workers=2, stop_event=stop_event)

    rescue_process.terminate()

    try:
        results = json_loads(response.value)
    except:
        raise ValueError("Expected JSON response but got '{}'".format(response))

    assert results.get('test') == True
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号