pipeline.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:pipeline 作者: alexlemann 项目源码 文件源码
def pipeline(stages, initial_data):
    monitors = Group()
    # Make sure items in initial_data are iterable.
    if not isinstance(initial_data, types.GeneratorType):
        try:
            iter(initial_data)
        except:
            raise TypeError('initial_data must be iterable')
    # The StopIteration will bubble through the queues as it is reached.
    #   Once a stage monitor sees it, it indicates that the stage will read
    #   no more data and the monitor can wait for the current work to complete
    #   and clean up.
    if hasattr(initial_data, 'append'):
        initial_data.append(StopIteration)
    if not stages:
        return PipelineResult(monitors, [])
    # chain stage queue io
    #  Each stage shares an output queue with the next stage's input.
    qs = [initial_data] + [Queue() for _ in range(len(stages))]
    for stage, in_q, out_q in zip(stages, qs[:-1], qs[1:]):
        stage.in_q = in_q
        stage.out_q = out_q
        monitors.spawn(stage_monitor, stage)
    gevent.sleep(0)
    return PipelineResult(monitors, stages[-1].out_q)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号