def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None,
heartbeat=None, clientClass=None):
host = host or self.host
port = port or self.port
spec = spec or self.spec
user = user or self.user
password = password or self.password
vhost = vhost or self.vhost
heartbeat = heartbeat or self.heartbeat
clientClass = clientClass or self.clientClass
delegate = TestDelegate()
on_connect = Deferred()
p = clientClass(delegate, vhost, txamqp.spec.load(spec), heartbeat=heartbeat)
f = protocol._InstanceFactory(reactor, p, on_connect)
c = reactor.connectTCP(host, port, f)
def errb(thefailure):
thefailure.trap(error.ConnectionRefusedError)
print("failed to connect to host: %s, port: %s; These tests are designed to run against a running instance" \
" of the %s AMQP broker on the given host and port. failure: %r" % (host, port, self.broker, thefailure,))
thefailure.raiseException()
on_connect.addErrback(errb)
self.connectors.append(c)
client = yield on_connect
yield self.authenticate(client, user, password)
returnValue(client)
评论列表
文章目录