def connect_factory(host, port, factory, blob_storage, hash_to_process):
from twisted.internet import reactor
@defer.inlineCallbacks
def on_finish(result):
log.info("Finished sending %s", hash_to_process)
yield update_sent_blobs(factory.p.blob_hashes_sent, host, blob_storage)
connection.disconnect()
reactor.fireSystemEvent("shutdown")
@defer.inlineCallbacks
def on_error(error):
log.error("Error when sending %s: %s. Hashes sent %s", hash_to_process, error,
factory.p.blob_hashes_sent)
yield update_sent_blobs(factory.p.blob_hashes_sent, host, blob_storage)
connection.disconnect()
reactor.fireSystemEvent("shutdown")
def on_connection_fail(result):
log.error("Failed to connect to %s:%s", host, port)
reactor.fireSystemEvent("shutdown")
def _error(failure):
log.error("Failed on_connection_lost_d callback: %s", failure)
reactor.fireSystemEvent("shutdown")
factory.on_connection_lost_d.addCallbacks(on_finish, on_error)
factory.on_connection_lost_d.addErrback(_error)
factory.on_connection_fail_d.addCallback(on_connection_fail)
try:
log.debug("Connecting factory to %s:%s", host, port)
connection = reactor.connectTCP(host, port, factory, timeout=TCP_CONNECT_TIMEOUT)
except JobTimeoutException:
log.error("Failed to forward %s --> %s", hash_to_process[:8], host)
return sys.exit(0)
except Exception as err:
log.exception("Job (pid %s) encountered unexpected error")
return sys.exit(1)
评论列表
文章目录