def skip_test_multiprocessing():
app = Sanic('test_json')
response = Array('c', 50)
@app.route('/')
async def handler(request):
return json({"test": True})
stop_event = Event()
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
response.value = http_response.text.encode()
stop_event.set()
def rescue_crew():
sleep(5)
stop_event.set()
rescue_process = Process(target=rescue_crew)
rescue_process.start()
app.serve_multiple({
'host': HOST,
'port': PORT,
'after_start': after_start,
'request_handler': app.handle_request,
'request_max_size': 100000,
}, workers=2, stop_event=stop_event)
rescue_process.terminate()
try:
results = json_loads(response.value)
except:
raise ValueError("Expected JSON response but got '{}'".format(response))
assert results.get('test') == True
test_multiprocessing.py 文件源码
python
阅读 31
收藏 0
点赞 0
评论 0
评论列表
文章目录