def handle_message(self):
try:
message = await self._recv()
except aiohttp.ServerDisconnectedError as exc:
logging.error(
f'Error: Exception: f{exc}. Re-initializing websocket.')
await self.__aexit__(None, None, None)
await self.__aenter__()
return
msg_type = message['type']
if msg_type == 'error':
raise OrderBookError(f'Error: {message["message"]}')
if msg_type == 'subscriptions':
return # must filter out here because the subscriptions message does not have a product_id key
product_id = message['product_id']
assert self._sequences[product_id] is not None
sequence = message['sequence']
if sequence <= self._sequences[product_id]:
# ignore older messages (e.g. before order book initialization
# from getProductOrderBook)
return message
elif sequence > self._sequences[product_id] + 1:
logging.error(
'Error: messages missing ({} - {}). Re-initializing websocket.'
.format(sequence, self._sequences[product_id]))
await self.__aexit__(None, None, None)
await self.__aenter__()
return
if msg_type == 'open':
self.add(product_id, message)
elif msg_type == 'done' and 'price' in message:
self.remove(product_id, message)
elif msg_type == 'match':
self.match(product_id, message)
elif msg_type == 'change':
self.change(product_id, message)
elif msg_type == 'heartbeat':
pass
elif msg_type == 'received':
pass
elif msg_type == 'done':
pass
else:
raise OrderBookError(f'unknown message type {msg_type}')
self._sequences[product_id] = sequence
return message
评论列表
文章目录