def _start_thread(self):
"""Run the request processor"""
# We pass a direct reference to `shared` into the worker, to avoid
# that thread holding a ref to `self`, which would prevent GC. A
# previous version of this used a weakref to `self`, but would
# potentially abort the thread before the requests queue was empty
shared = self.shared
def worker():
try:
while not shared.ending.is_set():
try:
# set a timeout so we check `ending` every so often
task = shared.requests.get(timeout=1)
except Empty:
continue
try:
shared.connection.request(task.request)
if task.future:
res = shared.connection.response()
task.future.set_response(res)
except Exception as e:
if task.future:
task.future.set_error(e)
finally:
shared.requests.task_done()
log.info("RequestHandler worker: exiting cleanly")
except:
# deal with interpreter shutdown in the same way that
# python 3.x's threading module does, swallowing any
# errors raised when core modules such as sys have
# already been destroyed
if _sys is None:
return
raise
name = "pykafka.RequestHandler.worker for {}:{}".format(
self.shared.connection.host, self.shared.connection.port)
return self.handler.spawn(worker, name=name)
评论列表
文章目录