testlib.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:txamqp 作者: txamqp 项目源码 文件源码
def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None,
                heartbeat=None, clientClass=None):
        host = host or self.host
        port = port or self.port
        spec = spec or self.spec
        user = user or self.user
        password = password or self.password
        vhost = vhost or self.vhost
        heartbeat = heartbeat or self.heartbeat
        clientClass = clientClass or self.clientClass

        delegate = TestDelegate()
        on_connect = Deferred()
        p = clientClass(delegate, vhost, txamqp.spec.load(spec), heartbeat=heartbeat)
        f = protocol._InstanceFactory(reactor, p, on_connect)
        c = reactor.connectTCP(host, port, f)

        def errb(thefailure):
            thefailure.trap(error.ConnectionRefusedError)
            print("failed to connect to host: %s, port: %s; These tests are designed to run against a running instance" \
                  " of the %s AMQP broker on the given host and port.  failure: %r" % (host, port, self.broker, thefailure,))
            thefailure.raiseException()
        on_connect.addErrback(errb)

        self.connectors.append(c)
        client = yield on_connect

        yield self.authenticate(client, user, password)
        returnValue(client)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号