def run(self):
if self.s is None:
self._connect()
if self.status != 'connected':
return
self.s.settimeout(0)
if not self.server:
self.send_queue.put(message.Version(self.host, self.port))
while True:
if self.on_connection_fully_established_scheduled and not (self.buffer_send or self.buffer_receive):
self._on_connection_fully_established()
data = True
try:
if self.status == 'fully_established':
data = self.s.recv(4096)
self.buffer_receive += data
if data and len(self.buffer_receive) < 4000000:
continue
else:
data = self.s.recv(self.next_message_size - len(self.buffer_receive))
self.buffer_receive += data
except ssl.SSLWantReadError:
if self.status == 'fully_established':
self._request_objects()
self._send_objects()
except socket.error as e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
if self.status == 'fully_established':
self._request_objects()
self._send_objects()
else:
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host, self.port, e))
data = None
except ConnectionResetError:
logging.debug('Disconnecting from {}:{}. Reason: ConnectionResetError'.format(self.host, self.port))
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
self._send_data()
if time.time() - self.last_message_received > shared.timeout:
logging.debug(
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > shared.timeout'.format(
self.host, self.port))
self.status = 'disconnecting'
if time.time() - self.last_message_received > 30 and self.status != 'fully_established'and self.status != 'disconnecting':
logging.debug(
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > 30 and self.status != \'fully_established\''.format(
self.host, self.port))
self.status = 'disconnecting'
if time.time() - self.last_message_sent > 300 and self.status == 'fully_established':
self.send_queue.put(message.Message(b'pong', b''))
if self.status == 'disconnecting' or shared.shutting_down:
data = None
if not data:
self.status = 'disconnected'
self.s.close()
logging.info('Disconnected from {}:{}'.format(self.host, self.port))
break
time.sleep(0.2)
评论列表
文章目录