def _start_producers(self, result_queue):
jobs = Queue()
n_workers = self.n_producers
batch_count = 0
# Flag used for keeping values in queue in order
last_queued_job = Value('i', -1)
chunks = util.chunks(self.X,self.batch_size)
# Add jobs to queue
for job_index, X_batch in enumerate(chunks):
batch_count += 1
jobs.put( (job_index,X_batch) )
# Add poison pills to queue (to signal workers to stop)
for i in xrange(n_workers):
jobs.put((-1,None))
# Define producer function
produce = partial(_produce_helper,
generator=self.generator,
jobs=jobs,
result_queue=result_queue,
last_queued_job=last_queued_job,
ordered=self.ordered)
# Start worker processes or threads
for i in xrange(n_workers):
name = "ParallelBatchIterator worker {0}".format(i)
if self.multiprocess:
p = Process(target=produce, args=(i,), name=name)
else:
p = Thread(target=produce, args=(i,), name=name)
# Make the process daemon, so the main process can die without these finishing
#p.daemon = True
p.start()
return batch_count, jobs
评论列表
文章目录