integration_tests.py 文件源码

python
阅读 33 收藏 0 点赞 0 评论 0

项目:django-channels-router 作者: Monadical-SAS 项目源码 文件源码
def run(self):
        signal.alarm(0)
        self.ws = create_connection(self.url, **self.socket_options)
        self.ws.send(json.dumps(self.get_message()))
        resp = self.ws.recv()
        resp = self.ws.recv()
        assert resp and self.check_response(json.loads(resp)), \
                'Failed to get expected response from backend.'
        self.started.set()
        self.should_start.wait()
        while self.keep_running:
            try:
                msg = self.get_message()
                self.ws.send(json.dumps(msg))
                resp = json.loads(self.ws.recv())
                if self.verbose:
                    print('sent:', msg[ROUTING_KEY], 
                        '  recv:', resp[ROUTING_KEY])
                assert resp and self.check_response(resp), \
                        'Failed to get expected response from backend.'
                self.round_trips += 1
            except Exception:
                if self.keep_running:
                    raise
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号