def _setup_autocommit_worker(self):
"""Start the autocommitter thread"""
self = weakref.proxy(self)
def autocommitter():
while True:
try:
if not self._running:
break
if self._auto_commit_enable:
self._auto_commit()
self._cluster.handler.sleep(self._auto_commit_interval_ms / 1000)
except ReferenceError:
break
except Exception:
# surface all exceptions to the main thread
self._worker_exception = sys.exc_info()
break
log.debug("Autocommitter thread exiting")
log.debug("Starting autocommitter thread")
return self._cluster.handler.spawn(autocommitter, name="pykafka.SimpleConsumer.autocommiter")
评论列表
文章目录