def process_message(self, topic, payload, *args):
log.debug('Bus receive: topic={topic}, payload={payload}', topic=topic, payload=payload)
# TODO: filter by realm/topic
# decode message
if type(payload) is types.DictionaryType:
message = payload.copy()
elif type(payload) is types.ListType:
message = OrderedDict(payload)
else:
raise TypeError('Unable to handle data type "{}" from bus'.format(type(payload)))
# compute storage location from topic and message
storage_location = self.storage_location(message)
log.debug('Storage location: {storage_location}', storage_location=dict(storage_location))
# store data
self.store_message(storage_location, message)
评论列表
文章目录