def run(self):
request = cherami.ReceiveMessageBatchRequest(destinationPath=self.path,
consumerGroupName=self.consumer_group_name,
maxNumberOfMessages=self.msg_batch_size,
receiveTimeout=max(1, self.timeout_seconds - 1)
)
while not self.stop_signal.is_set():
# possible optimization: if we don't have enough capacity in the queue,
# backoff for a bit before pulling from Cherami again
try:
result = util.execute_output_host(tchannel=self.tchannel,
headers=self.headers,
hostport=self.hostport,
timeout=self.timeout_seconds,
method_name='receiveMessageBatch',
request=request)
util.stats_count(self.tchannel.name,
'receiveMessageBatch.messages',
self.hostport,
len(result.messages))
for msg in result.messages:
# if the queue is full, keep trying until there's free slot, or the thread has been shutdown
while not self.stop_signal.is_set():
try:
self.msg_queue.put((util.create_delivery_token(msg.ackId, self.hostport), msg),
block=True,
timeout=5)
util.stats_count(self.tchannel.name,
'consumer_msg_queue.enqueue',
self.hostport,
1)
break
except Full:
pass
except Exception as e:
self.logger.info({
'msg': 'error receiving msg from output host',
'hostport': self.hostport,
'traceback': traceback.format_exc(),
'exception': str(e)
})
评论列表
文章目录