consumer_thread.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:cherami-client-python 作者: uber 项目源码 文件源码
def run(self):
        request = cherami.ReceiveMessageBatchRequest(destinationPath=self.path,
                                                     consumerGroupName=self.consumer_group_name,
                                                     maxNumberOfMessages=self.msg_batch_size,
                                                     receiveTimeout=max(1, self.timeout_seconds - 1)
                                                     )
        while not self.stop_signal.is_set():
            # possible optimization: if we don't have enough capacity in the queue,
            # backoff for a bit before pulling from Cherami again
            try:
                result = util.execute_output_host(tchannel=self.tchannel,
                                                  headers=self.headers,
                                                  hostport=self.hostport,
                                                  timeout=self.timeout_seconds,
                                                  method_name='receiveMessageBatch',
                                                  request=request)
                util.stats_count(self.tchannel.name,
                                 'receiveMessageBatch.messages',
                                 self.hostport,
                                 len(result.messages))

                for msg in result.messages:
                    # if the queue is full, keep trying until there's free slot, or the thread has been shutdown
                    while not self.stop_signal.is_set():
                        try:
                            self.msg_queue.put((util.create_delivery_token(msg.ackId, self.hostport), msg),
                                               block=True,
                                               timeout=5)
                            util.stats_count(self.tchannel.name,
                                             'consumer_msg_queue.enqueue',
                                             self.hostport,
                                             1)
                            break
                        except Full:
                            pass
            except Exception as e:
                self.logger.info({
                    'msg': 'error receiving msg from output host',
                    'hostport': self.hostport,
                    'traceback': traceback.format_exc(),
                    'exception': str(e)
                })
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号