integration_tests.py 文件源码

python
阅读 34 收藏 0 点赞 0 评论 0

项目:django-channels-router 作者: Monadical-SAS 项目源码 文件源码
def recv_all_json(socket, timeout=TIMEOUT):
    """
    block for [timeout] seconds, and return a list of all received 
    messages in that period
    """
    results = []
    try:
        last_result = True
        while last_result:

            signal.alarm(0)
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(timeout)
            try:
                last_result = json.loads(socket.recv())
            except TimeOutException:
                last_result = None
            signal.alarm(0)

            if last_result:
                results.append(last_result)

        return results
    except TimeOutException:
        return results
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号