def receive(self, handler=None, block=True):
"""
:param handler: Reference to a specific message handler function to use for interpreting
the message to be received
:param block: Blocking receive call
:return: Map holding the data, timestamp, data and main header
"""
message = None
# Set blocking flag in receiver
self.receiver.block = block
receive_is_successful = False
if not handler:
try:
# Dynamically select handler
htype = self.receiver.header()["htype"]
except zmq.Again:
# not clear if this is needed
self.receiver.flush(receive_is_successful)
return message
except KeyboardInterrupt:
raise
except:
logger.exception('Unable to read header - skipping')
# Clear remaining sub-messages if exist
self.receiver.flush(receive_is_successful)
return message
try:
handler = receive_handlers[htype]
except:
logger.debug(sys.exc_info()[1])
logger.warning('htype - ' + htype + ' - not supported')
try:
data = handler(self.receiver)
# as an extra safety margin
if data:
receive_is_successful = True
message = Message(self.receiver.statistics, data)
except KeyboardInterrupt:
raise
except:
logger.exception('Unable to decode message - skipping')
# Clear remaining sub-messages if exist
self.receiver.flush(receive_is_successful)
return message
评论列表
文章目录