def _run(self):
# in_cpubound_thread is sentinel to prevent double thread dispatch
thread_ctx = threading.local()
thread_ctx.in_cpubound_thread = True
try:
self.in_async = gevent.get_hub().loop.async()
self.in_q_has_data = gevent.event.Event()
self.in_async.start(self.in_q_has_data.set)
while not self.stopping:
if not self.in_q:
# wait for more work
self.in_q_has_data.clear()
self.in_q_has_data.wait()
continue
# arbitrary non-preemptive service discipline can go here
# FIFO for now, but we should experiment with others
jobid, func, args, kwargs = self.in_q.popleft()
start_time = arrow.now()
try:
with db.cleanup_session():
self.results[jobid] = func(*args, **kwargs)
except Exception as e:
log.exception("Exception raised in cpubound_thread:")
self.results[jobid] = self._Caught(e)
finished_time = arrow.now()
run_delta = finished_time - start_time
log.d("Function - '{}'\n".format(func.__name__),
"\tRunning time: {}\n".format(run_delta),
"\tJobs left:", len(self.in_q),
)
self.out_q.append(jobid)
self.out_async.send()
except BaseException:
self._error()
# this may always halt the server process
评论列表
文章目录