def _producer_multi_processes(queue_task,
queue_product,
threads_per_process,
worker_function):
"""
???????????????
:type queue_task: multiprocessing.JoinableQueue
:type queue_product: multiprocessing.JoinableQueue
:type threads_per_process: int
:type worker_function: Callable[[Any], Any]
"""
_queue_task = queue.Queue(maxsize=threads_per_process)
_queue_product = queue.Queue()
pool = [threading.Thread(target=_producer_multi_threads, args=(_queue_task, _queue_product, worker_function))
for _ in range(threads_per_process)]
for t in pool:
t.daemon = True
t.start()
th = threading.Thread(target=_subprocesses_queue_transfer, args=(queue_task, _queue_task))
th.daemon = True
th.start()
th = threading.Thread(target=_subprocesses_queue_transfer, args=(_queue_product, queue_product))
th.daemon = True
th.start()
# ?????????
for t in pool:
t.join()
logger.debug("subthread {} of {} stopped".format(t.name, multiprocessing.current_process().name))
logger.debug("subprocess {} completed".format(multiprocessing.current_process().name))
评论列表
文章目录