def _setup_fetch_workers(self):
"""Start the fetcher threads"""
# NB this gets overridden in rdkafka.RdKafkaSimpleConsumer
self = weakref.proxy(self)
def fetcher():
while True:
try:
if not self._running:
break
self.fetch()
self._cluster.handler.sleep(.0001)
except ReferenceError:
break
except Exception:
# surface all exceptions to the main thread
self._worker_exception = sys.exc_info()
break
log.debug("Fetcher thread exiting")
log.info("Starting %s fetcher threads", self._num_consumer_fetchers)
return [self._cluster.handler.spawn(fetcher, name="pykafka.SimpleConsumer.fetcher")
for i in range(self._num_consumer_fetchers)]
评论列表
文章目录