def handle_in(self):
try:
tmp = self.socket.recv_multipart()
except zmq.Again:
return
if len(tmp) != 2:
self.logger.critical('Received a msg with len != 2, something seriously wrong. ')
return
sender, msg_buf = tmp
msg = msg_factory(msg_buf)
data = self.controllers.get(sender)
if not data:
self.logger.critical('Received a msg from %s - this is an unknown sender' % sender)
return
data['last_seen'] = time.time()
# self.logger.debug('Received from %s' % sender)
# TODO Notify Controllers that we are busy, no more messages to be sent
# The above busy notification is not perfect as other messages might be on their way already
# but for long-running queries it will at least ensure other controllers
# don't try and overuse this node by filling up a queue
busy_msg = BusyMessage()
self.send_to_all(busy_msg)
try:
tmp = self.handle(msg)
except Exception, e:
tmp = ErrorMessage(msg)
tmp['payload'] = traceback.format_exc()
self.logger.exception(tmp['payload'])
if tmp:
self.send(sender, tmp)
self.send_to_all(DoneMessage()) # Send a DoneMessage to all controllers, this flags you as 'Done'. Duh
评论列表
文章目录