def _process_data(self):
self.conn = create_connection(self.addr, timeout=4)
payload = json.dumps({'type': 'subscribe', 'product_ids': self.pairs})
self.conn.send(payload)
while self.running:
try:
data = json.loads(self.conn.recv())
except (WebSocketTimeoutException, ConnectionResetError):
self._controller_q.put('restart')
if 'product_id' in data:
self.data_q.put(('order_book', data['product_id'],
data, time.time()))
self.conn = None
python类WebSocketTimeoutException()的实例源码
def start(self):
"""
Start the websocket client threads
:return:
"""
super(BitfinexWSS, self).start()
log.info("BitfinexWSS.start(): Initializing Websocket connection..")
while self.conn is None:
try:
self.conn = create_connection(self.addr, timeout=10)
except WebSocketTimeoutException:
self.conn = None
print("Couldn't create websocket connection - retrying!")
log.info("BitfinexWSS.start(): Initializing receiver thread..")
if not self.receiver_thread:
self.receiver_thread = Thread(target=self.receive, name='Receiver Thread')
self.receiver_thread.start()
else:
log.info("BitfinexWSS.start(): Thread not started! "
"self.receiver_thread is populated!")
log.info("BitfinexWSS.start(): Initializing processing thread..")
if not self.processing_thread:
self.processing_thread = Thread(target=self.process, name='Processing Thread')
self.processing_thread.start()
else:
log.info("BitfinexWSS.start(): Thread not started! "
"self.processing_thread is populated!")
self.setup_subscriptions()
def receive(self):
"""
Receives incoming websocket messages, and puts them on the Client queue
for processing.
:return:
"""
while self.running:
if self._receiver_lock.acquire(blocking=False):
try:
raw = self.conn.recv()
except WebSocketTimeoutException:
self._receiver_lock.release()
continue
except WebSocketConnectionClosedException:
# this needs to restart the client, while keeping track
# of the currently subscribed channels!
self.conn = None
self._controller_q.put('restart')
except AttributeError:
# self.conn is None, idle loop until shutdown of thread
self._receiver_lock.release()
continue
msg = time.time(), json.loads(raw)
log.debug("receiver Thread: Data Received: %s", msg)
self.receiver_q.put(msg)
self._receiver_lock.release()
else:
# The receiver_lock was locked, idling until available
time.sleep(0.5)
def _subscription_thread(self, endpoint):
"""
Thread Method, running the connection for each endpoint.
:param endpoint:
:return:
"""
try:
conn = create_connection(self.addr + endpoint, timeout=5)
except WebSocketTimeoutException:
self.restart_q.put(endpoint)
return
while self.threads_running[endpoint]:
try:
msg = conn.recv()
except WebSocketTimeoutException:
self._controller_q.put(endpoint)
log.debug("%s, %s", endpoint, msg)
ep, pair = endpoint.split('/')
log.debug("_subscription_thread(): Putting data on q..")
try:
self.data_q.put((ep, pair, msg, time.time()), timeout=1)
except TimeoutError:
continue
finally:
log.debug("_subscription_thread(): Data Processed, looping back..")
conn.close()
log.debug("_subscription_thread(): Thread Loop Ended.")
def run1(self, payload):
data = None
browser = None
begin_time = datetime.datetime.now()
retry = payload.get('retried', False)
try:
socket_timeout = payload.get('sockettimeout') or self._socket_timeout
browser = websocket.create_connection(self._browser_url, timeout=socket_timeout)
data = self.run1_core(payload, browser, begin_time)
return data
except websocket.WebSocketTimeoutException as e:
if retry:
error_data = {
'state': 'critical',
'error_code': -6,
'error_desc': str(type(e)) + ': ' + str(e)
}
ret = self.crawl_info(error_data, payload, begin_time)
return ret
else:
sleep(payload.get('retry_sleep', 3))
payload['sockettimeout'] = int(payload.get('sockettimeout') or self._socket_timeout) + payload.get(
'retry_extra', 10)
payload['loadtimeout'] = int(payload.get('loadtimeout') or self._socket_timeout) + payload.get('retry_extra',
10)
payload['retried'] = True
return self.run1_core(payload, browser=browser, begin_time=begin_time)
except Exception as e:
error_data = {
'state': 'critical',
'error_code': -7,
'error_desc': str(type(e)) + ': ' + str(e)
}
ret = self.crawl_info(error_data, payload, begin_time)
return ret
finally:
if browser is not None:
browser.close()
def _ws_events(ws_conn, message, snapshot, since, on_message, on_error):
"""Process websocket events."""
# Pylint complains too many nested blocks.
#
# pylint: disable=R0101
last_timestamp = since
subscription_msg = {'since': since,
'snapshot': snapshot}
subscription_msg.update(message)
try:
ws_conn.send(json.dumps(subscription_msg))
while True:
try:
reply = ws_conn.recv()
if not reply:
break
result = json.loads(reply)
if '_error' in result:
if on_error:
on_error(result)
break
last_timestamp = result.get('when', time.time())
if on_message:
if not on_message(result):
break
except ws_client.WebSocketTimeoutException:
ws_conn.ping()
except ws_client.WebSocketConnectionClosedException as err:
_LOGGER.debug('ws connection closed, will retry: %s.', str(err))
raise _RetryError(last_timestamp)
finally:
ws_conn.close()
def ws_loop(wsapi, message, snapshot, on_message, on_error=None,
timeout=_DEFAULT_TIMEOUT):
"""Instance trace loop."""
ws_conn = None
since = 0
while True:
apis = context.GLOBAL.ws_api(wsapi)
for api in apis:
try:
_LOGGER.debug('Connecting to %s, [timeout: %s]', api, timeout)
ws_conn = ws_client.create_connection(api, timeout=timeout)
_LOGGER.debug('Connected.')
return _ws_events(ws_conn, message, snapshot, since,
on_message, on_error)
except ws_client.WebSocketTimeoutException as to_err:
_LOGGER.debug('Connection timeout: %s, %s', api, str(to_err))
continue
except ws_client.WebSocketProxyException as proxy_err:
_LOGGER.debug('Websocket connection error: %s, %s', api,
str(proxy_err))
continue
except socket.error:
_LOGGER.debug('Connection failed: %s', api)
continue
except _RetryError as retry_err:
since = retry_err.since
if not ws_conn:
raise WSConnectionError()
def recv_packet(self):
try:
packet_text = self._connection.recv()
except websocket.WebSocketTimeoutException as e:
raise TimeoutError('recv timed out (%s)' % e)
except websocket.SSLError as e:
raise ConnectionError('recv disconnected by SSL (%s)' % e)
except websocket.WebSocketConnectionClosedException as e:
raise ConnectionError('recv disconnected (%s)' % e)
except socket.error as e:
raise ConnectionError('recv disconnected (%s)' % e)
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
six.b(packet_text))
yield engineIO_packet_type, engineIO_packet_data
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
packet = format_packet_text(engineIO_packet_type, engineIO_packet_data)
try:
self._connection.send(packet)
except websocket.WebSocketTimeoutException as e:
raise TimeoutError('send timed out (%s)' % e)
except socket.error as e:
raise ConnectionError('send disconnected (%s)' % e)
except websocket.WebSocketConnectionClosedException as e:
raise ConnectionError('send disconnected (%s)' % e)