如何并行化文件下载?
我可以一次下载一个文件:
import urllib.request
urls = ['foo.com/bar.gz', 'foobar.com/barfoo.gz', 'bar.com/foo.gz']
for u in urls:
urllib.request.urlretrieve(u)
我可以这样尝试subprocess
:
import subprocess
import os
def parallelized_commandline(command, files, max_processes=2):
processes = set()
for name in files:
processes.add(subprocess.Popen([command, name]))
if len(processes) >= max_processes:
os.wait()
processes.difference_update(
[p for p in processes if p.poll() is not None])
#Check if all the child processes were closed
for p in processes:
if p.poll() is None:
p.wait()
urls = ['http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.en.gz',
'http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.cs.gz',
'http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.de.gz']
parallelized_commandline('wget', urls)
有urlretrieve
没有不使用os.system
或subprocess
作弊的并行化方法?
鉴于我现在必须诉诸“作弊”,是否是subprocess.Popen
下载数据的正确方法?
使用parallelized_commandline()
上述方法时,它使用的是多线程而不是多核的wget
,是否正常?有没有办法使它成为多核而不是多线程?
-
您可以使用线程池并行下载文件:
#!/usr/bin/env python3 from multiprocessing.dummy import Pool # use threads for I/O bound tasks from urllib.request import urlretrieve urls = [...] result = Pool(4).map(urlretrieve, urls) # download 4 files at a time
您还可以使用
asyncio
以下命令在一个线程中一次下载多个文件:#!/usr/bin/env python3 import asyncio import logging from contextlib import closing import aiohttp # $ pip install aiohttp @asyncio.coroutine def download(url, session, semaphore, chunk_size=1<<15): with (yield from semaphore): # limit number of concurrent downloads filename = url2filename(url) logging.info('downloading %s', filename) response = yield from session.get(url) with closing(response), open(filename, 'wb') as file: while True: # save file chunk = yield from response.content.read(chunk_size) if not chunk: break file.write(chunk) logging.info('done %s', filename) return filename, (response.status, tuple(response.headers.items())) urls = [...] logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') with closing(asyncio.get_event_loop()) as loop, \ closing(aiohttp.ClientSession()) as session: semaphore = asyncio.Semaphore(4) download_tasks = (download(url, session, semaphore) for url in urls) result = loop.run_until_complete(asyncio.gather(*download_tasks))
这里
url2filename()
定义在哪里。