我将如何在实时场景中使用并发。期货和队列?

发布于 2021-01-29 17:50:50

concurrent.futures如下所示,使用Python 3的模块进行并行工作非常容易。

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
    for future in concurrent.futures.as_completed(future_to):
        data = future.result()

将项目插入和检索到队列中也非常方便。

q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
   q.get()

我有一个脚本在后台运行,以监听更新。现在,理论上假设,随着这些更新的到来,我将对它们进行排队,并使用进行并发处理ThreadPoolExecutor

现在,单独地,所有这些组件都是独立工作的,并且很有意义,但是我如何一起使用它们呢?我不知道是否有可能ThreadPoolExecutor实时从队列中馈送工作,除非预先确定要工作的数据?

简而言之,我要做的就是,每秒接收4条消息的更新,将它们推送到队列中,并让我的current.futures对其进行处理。如果我不这样做,那我就会陷入缓慢的顺序方法中。

让我们以下面的Python文档中的规范示例为例

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

的列表URLS是固定的。是否可以实时提供此列表,并让工作人员从列表中进行处理,也许出于管理目的而从队列中进行处理?我有点困惑我的方法是否 真能

关注者
0
被浏览
52
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    Python文档中的示例已扩展为可以从队列中进行工作。需要注意的更改是,此代码使用concurrent.futures.wait而不是concurrent.futures.as_completed允许在等待其他工作完成时开始新工作。

    import concurrent.futures
    import urllib.request
    import time
    import queue
    
    q = queue.Queue()
    
    URLS = ['http://www.foxnews.com/',
            'http://www.cnn.com/',
            'http://europe.wsj.com/',
            'http://www.bbc.co.uk/',
            'http://some-made-up-domain.com/']
    
    def feed_the_workers(spacing):
        """ Simulate outside actors sending in work to do, request each url twice """
        for url in URLS + URLS:
            time.sleep(spacing)
            q.put(url)
        return "DONE FEEDING"
    
    def load_url(url, timeout):
        """ Retrieve a single page and report the URL and contents """
        with urllib.request.urlopen(url, timeout=timeout) as conn:
            return conn.read()
    
    # We can use a with statement to ensure threads are cleaned up promptly
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    
        # start a future for a thread which sends work in through the queue
        future_to_url = {
            executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}
    
        while future_to_url:
            # check for status of the futures which are currently working
            done, not_done = concurrent.futures.wait(
                future_to_url, timeout=0.25,
                return_when=concurrent.futures.FIRST_COMPLETED)
    
            # if there is incoming work, start a new future
            while not q.empty():
    
                # fetch a url from the queue
                url = q.get()
    
                # Start the load operation and mark the future with its URL
                future_to_url[executor.submit(load_url, url, 60)] = url
    
            # process any completed futures
            for future in done:
                url = future_to_url[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (url, exc))
                else:
                    if url == 'FEEDER DONE':
                        print(data)
                    else:
                        print('%r page is %d bytes' % (url, len(data)))
    
                # remove the now completed future
                del future_to_url[future]
    

    url两次获取的输出:

    'http://www.foxnews.com/' page is 67574 bytes
    'http://www.cnn.com/' page is 136975 bytes
    'http://www.bbc.co.uk/' page is 193780 bytes
    'http://some-made-up-domain.com/' page is 896 bytes
    'http://www.foxnews.com/' page is 67574 bytes
    'http://www.cnn.com/' page is 136975 bytes
    DONE FEEDING
    'http://www.bbc.co.uk/' page is 193605 bytes
    'http://some-made-up-domain.com/' page is 896 bytes
    'http://europe.wsj.com/' page is 874649 bytes
    'http://europe.wsj.com/' page is 874649 bytes
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看