def _producer_multi_threads(queue_task, queue_product, worker_function):
"""
??????????????
:type queue_task: multiprocessing.JoinableQueue
:type queue_product: multiprocessing.JoinableQueue
:type worker_function: Callable[[Any], Any]
"""
while True:
try:
task = queue_task.get()
if isinstance(task, _QueueEndSignal): # ????
# finally ?? task_done() ?break??????????
break
if isinstance(task, dict):
result = worker_function(**task)
elif isinstance(task, (tuple, list)):
result = worker_function(*task)
else:
result = worker_function(task)
queue_product.put((task, result))
except:
traceback.print_exc()
finally:
queue_task.task_done()
评论列表
文章目录