def send_json(socket, data: dict, timeout=TIMEOUT):
"""
send a json-ified dictionary, throws an exception if it takes
more than [timeout] seconds
"""
signal.signal(
signal.SIGALRM,
lambda s, f: timeout_handler(s, f, f'sending ({timeout}s)')
)
signal.alarm(timeout)
try:
result = socket.send(json.dumps(data))
signal.alarm(0)
return result
except Exception:
signal.alarm(0)
raise
integration_tests.py 文件源码
python
阅读 36
收藏 0
点赞 0
评论 0
评论列表
文章目录