simpleconsumer.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:oa_qian 作者: sunqb 项目源码 文件源码
def _setup_fetch_workers(self):
        """Start the fetcher threads"""
        # NB this gets overridden in rdkafka.RdKafkaSimpleConsumer
        self = weakref.proxy(self)

        def fetcher():
            while True:
                try:
                    if not self._running:
                        break
                    self.fetch()
                    self._cluster.handler.sleep(.0001)
                except ReferenceError:
                    break
                except Exception:
                    # surface all exceptions to the main thread
                    self._worker_exception = sys.exc_info()
                    break
            log.debug("Fetcher thread exiting")
        log.info("Starting %s fetcher threads", self._num_consumer_fetchers)
        return [self._cluster.handler.spawn(fetcher, name="pykafka.SimpleConsumer.fetcher")
                for i in range(self._num_consumer_fetchers)]
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号