def test_timeout():
from takumi.service import ServiceHandler, ApiMap, Context
import gevent
app = ServiceHandler('TestService', soft_timeout=0, hard_timeout=1)
class UnknownException(Exception):
def __init__(self, exc):
self.exc = exc
@app.handle_system_exception
def system_exception(tp, value, tb):
exc = UnknownException(value)
return UnknownException, exc, tb
@app.api
def timeout():
gevent.sleep(2)
api_map = ApiMap(app, Context({'client_addr': 'localhost', 'meta': {}}))
with pytest.raises(UnknownException) as exc:
api_map.timeout()
assert str(exc.value.exc) == 'Timeout after 1 seconds'
评论列表
文章目录