def connect_upstream(self, tries=1, max_attempts=7):
if self._closed:
logger.info("[RewardProxyServer] [%d] Attempted to connect upstream although client connection is already closed. Aborting",
self.id)
return
remote = getattr(self.factory, 'rewarder_address', 'localhost:15900')
endpoint = endpoints.clientFromString(reactor, 'tcp:' + remote)
client_factory = websocket.WebSocketClientFactory('ws://' + remote)
headers = {'authorization': self._request.headers['authorization']}
if self._request.headers.get('openai-observer'):
headers['openai-observer'] = self._request.headers.get('openai-observer')
client_factory.headers = headers
client_factory.protocol = RewardServerClient
client_factory.proxy_server = self
client_factory.endpoint = endpoint
logger.info("[RewardProxyServer] [%d] Connecting to upstream %s (try %d/%d)", self.id, remote, tries, max_attempts)
def _connect_callback(client):
logger.info('[RewardProxyServer] [%d] Upstream connection %s established', self.id, remote)
self.client = client
if self.factory.logfile_dir:
self.begin_recording()
def _connect_errback(reason):
if tries < max_attempts:
# Somewhat arbitrary exponential backoff: should be
# pretty rare, and indicate that we're just starting
# up.
delay = 1.5 ** tries
logger.info('[RewardProxyServer] [%d] Connection to %s failed: %s. Try %d/%d; going to retry in %fs', self.id, remote, reason, tries, max_attempts, delay)
reactor.callLater(
delay, self.connect_upstream,
tries=tries+1, max_attempts=max_attempts)
else:
logger.error('[RewardProxyServer] [%d] Connection to %s failed: %s. Completed %d/%d atttempts; disconnecting.', self.id, remote, reason, tries, max_attempts)
self.transport.loseConnection()
endpoint.connect(client_factory).addCallbacks(_connect_callback, _connect_errback)
评论列表
文章目录