Python asyncio:处理潜在的无限列表

发布于 2021-01-29 14:59:34

我有以下情况:

  • Python 3.6+
  • 从文件中逐行读取输入数据。
  • 协程将数据发送到API(使用aiohttp),并将调用结果保存到Mongo(使用motor)。因此,有很多IO正在进行。

该代码使用async/编写await,并且对于手动执行的单个调用也可以正常工作。

我不知道该怎么做,就是要大量使用输入数据。

asyncio我看到的所有示例都asyncio.wait通过发送有限列表作为参数进行了演示。但是我不能简单地向它发送任务列表,因为输入文件可能有数百万行。

我的情况是关于通过传送带将数据流传输到消费者。

我还可以做些什么?我希望程序使用它可以聚集的所有资源来处理文件中的数据,而又不会感到不知所措。

关注者
0
被浏览
86
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    我的情况是关于通过传送带将数据流传输到消费者。我还可以做些什么?

    您可以创建固定数量的任务,这些任务大致与传送带的容量相对应,然后将它们弹出队列。例如:

    async def consumer(queue):
        while True:
            line = await queue.get()
            # connect to API, Mongo, etc.
            ...
            queue.task_done()
    
    async def producer():
        N_TASKS = 10
        loop = asyncio.get_event_loop()
        queue = asyncio.Queue(N_TASKS)
        tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)]
        try:
            with open('input') as f:
                for line in f:
                    await queue.put(line)
            await queue.join()
        finally:
            for t in tasks:
                t.cancel()
    

    由于与线程不同,任务是轻量级的,并且不占用操作系统资源,因此最好在创建“太多”任务时犯错。asyncio可以毫不费力地处理成千上万的任务,尽管这对于这些任务来说可能是过大的事-
    几十就足够了。



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看