def _consume(self, msg):
io_loop = tornado.ioloop.IOLoop.instance()
if msg.content_type != 'application/json':
LOG.warn('invalid content-type header.'
' only json content is acceptable.'
' message rejected.')
msg.reject(requeue=False)
return False
try:
data = json_decode(msg.body)
except ValueError as e:
msg.reject(requeue=False)
LOG.warn('malformed json message: %s. reason: %s '
'message rejected.' % (msg.body, e))
else:
future = maybe_future(self._on_message(data))
io_loop.add_future(future, lambda f: self._ack(f, msg))
评论列表
文章目录