def recv_json(socket, timeout=TIMEOUT):
"""
block until a message is received [timeout] seconds, returns None
if nothing is received
"""
signal.alarm(0)
signal.signal(
signal.SIGALRM,
lambda s, f: timeout_handler(s, f, f'receiving ({timeout}s)')
)
signal.alarm(timeout)
try:
result = json.loads(socket.recv())
signal.alarm(0)
return result
except TimeOutException:
signal.alarm(0)
return None
except Exception:
signal.alarm(0)
raise
integration_tests.py 文件源码
python
阅读 39
收藏 0
点赞 0
评论 0
评论列表
文章目录