我将如何在实时场景中使用并发。期货和队列?
concurrent.futures
如下所示,使用Python 3的模块进行并行工作非常容易。
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
for future in concurrent.futures.as_completed(future_to):
data = future.result()
将项目插入和检索到队列中也非常方便。
q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
q.get()
我有一个脚本在后台运行,以监听更新。现在,理论上假设,随着这些更新的到来,我将对它们进行排队,并使用进行并发处理ThreadPoolExecutor
。
现在,单独地,所有这些组件都是独立工作的,并且很有意义,但是我如何一起使用它们呢?我不知道是否有可能ThreadPoolExecutor
实时从队列中馈送工作,除非预先确定要工作的数据?
简而言之,我要做的就是,每秒接收4条消息的更新,将它们推送到队列中,并让我的current.futures对其进行处理。如果我不这样做,那我就会陷入缓慢的顺序方法中。
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
的列表URLS
是固定的。是否可以实时提供此列表,并让工作人员从列表中进行处理,也许出于管理目的而从队列中进行处理?我有点困惑我的方法是否 真能 ?
-
Python文档中的示例已扩展为可以从队列中进行工作。需要注意的更改是,此代码使用
concurrent.futures.wait
而不是concurrent.futures.as_completed
允许在等待其他工作完成时开始新工作。import concurrent.futures import urllib.request import time import queue q = queue.Queue() URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def feed_the_workers(spacing): """ Simulate outside actors sending in work to do, request each url twice """ for url in URLS + URLS: time.sleep(spacing) q.put(url) return "DONE FEEDING" def load_url(url, timeout): """ Retrieve a single page and report the URL and contents """ with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # start a future for a thread which sends work in through the queue future_to_url = { executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'} while future_to_url: # check for status of the futures which are currently working done, not_done = concurrent.futures.wait( future_to_url, timeout=0.25, return_when=concurrent.futures.FIRST_COMPLETED) # if there is incoming work, start a new future while not q.empty(): # fetch a url from the queue url = q.get() # Start the load operation and mark the future with its URL future_to_url[executor.submit(load_url, url, 60)] = url # process any completed futures for future in done: url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: if url == 'FEEDER DONE': print(data) else: print('%r page is %d bytes' % (url, len(data))) # remove the now completed future del future_to_url[future]
url
两次获取的输出:'http://www.foxnews.com/' page is 67574 bytes 'http://www.cnn.com/' page is 136975 bytes 'http://www.bbc.co.uk/' page is 193780 bytes 'http://some-made-up-domain.com/' page is 896 bytes 'http://www.foxnews.com/' page is 67574 bytes 'http://www.cnn.com/' page is 136975 bytes DONE FEEDING 'http://www.bbc.co.uk/' page is 193605 bytes 'http://some-made-up-domain.com/' page is 896 bytes 'http://europe.wsj.com/' page is 874649 bytes 'http://europe.wsj.com/' page is 874649 bytes