def pipeline(stages, initial_data):
monitors = Group()
# Make sure items in initial_data are iterable.
if not isinstance(initial_data, types.GeneratorType):
try:
iter(initial_data)
except:
raise TypeError('initial_data must be iterable')
# The StopIteration will bubble through the queues as it is reached.
# Once a stage monitor sees it, it indicates that the stage will read
# no more data and the monitor can wait for the current work to complete
# and clean up.
if hasattr(initial_data, 'append'):
initial_data.append(StopIteration)
if not stages:
return PipelineResult(monitors, [])
# chain stage queue io
# Each stage shares an output queue with the next stage's input.
qs = [initial_data] + [Queue() for _ in range(len(stages))]
for stage, in_q, out_q in zip(stages, qs[:-1], qs[1:]):
stage.in_q = in_q
stage.out_q = out_q
monitors.spawn(stage_monitor, stage)
gevent.sleep(0)
return PipelineResult(monitors, stages[-1].out_q)
评论列表
文章目录