def test_result_get(app):
@app.manager.task
def add(a, b):
return a + b
req = Request.blank('/push')
req.method = 'POST'
req.content_type = 'application/json'
req.body = bytestr(json.dumps({'queue': 'boo', 'name': 'add',
'args': (1, 2), 'keep_result': 100}))
res = req.get_response(app)
tid = res.json['id']
assert Request.blank('/result?id={}'.format(tid)).get_response(app).json == None
app.manager.process(app.manager.pop(['boo'], 1))
assert Request.blank('/result?id={}'.format(tid)).get_response(app).json == {'result': 3}
评论列表
文章目录