_test_asyncio.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:zanph 作者: zanph 项目源码 文件源码
def can_connect(self, server, client):
        """Check if client can connect to server using tcp transport"""
        @asyncio.coroutine
        def go():
            result = False
            iface = 'tcp://127.0.0.1'
            port = server.bind_to_random_port(iface)
            client.connect("%s:%i" % (iface, port))
            msg = [b"Hello World"]
            yield from server.send_multipart(msg)
            if (yield from client.poll(1000)):
                rcvd_msg = yield from client.recv_multipart()
                self.assertEqual(rcvd_msg, msg)
                result = True
            return result
        return self.loop.run_until_complete(go())
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号