与工作线程“线程”等效的asyncio.Queues

发布于 2021-01-29 15:09:53

我试图弄清楚如何移植要使用的线程程序asyncio。我有很多代码可以围绕几个标准库进行同步Queues,基本上是这样的:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()

一个线程创建值(可能是用户输入),而另一个线程对它们执行某些操作。关键是这些线程在出现新数据之前一直处于空闲状态,此时它们将唤醒并对其进行处理。

我正在尝试使用asyncio实现此模式,但是我似乎无法弄清楚如何使其“运行”。

我的尝试或多或少看起来像这样(根本不做任何事情)。

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()?

loop = asyncio.get_event_loop()
loop.run_forever()

我尝试过使用协程,不使用协程,在Tasks中包装内容,试图使它们创建或返回期货等的变体。

我开始认为我对应该如何使用asyncio有错误的想法(也许应该以我不知道的其他方式来实现此模式)。任何指针将不胜感激。

关注者
0
被浏览
85
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    对,就是这样。任务是您的朋友:

    import asyncio, random
    
    q = asyncio.Queue()
    
    @asyncio.coroutine
    def produce():
        while True:
            yield from q.put(random.random())
            yield from asyncio.sleep(0.5 + random.random())
    
    @asyncio.coroutine
    def consume():
        while True:
            value = yield from q.get()
            print("Consumed", value)
    
    
    loop = asyncio.get_event_loop()
    loop.create_task(produce())
    loop.create_task(consume())
    loop.run_forever()
    

    asyncio.ensure_future 也可以用于任务创建。

    并且请记住:q.put()协程 ,因此您应该使用yield from q.put(value)

    UPD

    转自asyncio.Task()/asyncio.async()新品牌的APIloop.create_task()asyncio.ensure_future()示例所示。



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看