integration_tests.py 文件源码

python
阅读 39 收藏 0 点赞 0 评论 0

项目:django-channels-router 作者: Monadical-SAS 项目源码 文件源码
def recv_json(socket, timeout=TIMEOUT):
    """
    block until a message is received [timeout] seconds, returns None 
    if nothing is received
    """

    signal.alarm(0)
    signal.signal(
        signal.SIGALRM, 
        lambda s, f: timeout_handler(s, f, f'receiving ({timeout}s)')
    )
    signal.alarm(timeout)
    try:
        result = json.loads(socket.recv())
        signal.alarm(0)
        return result
    except TimeOutException:
        signal.alarm(0)
        return None
    except Exception:
        signal.alarm(0)
        raise
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号