如何并行化文件下载?

发布于 2021-01-29 18:25:09

我可以一次下载一个文件:

import urllib.request

urls = ['foo.com/bar.gz', 'foobar.com/barfoo.gz', 'bar.com/foo.gz']

for u in urls:
  urllib.request.urlretrieve(u)

我可以这样尝试subprocess

import subprocess
import os

def parallelized_commandline(command, files, max_processes=2):
    processes = set()
    for name in files:
        processes.add(subprocess.Popen([command, name]))
        if len(processes) >= max_processes:
            os.wait()
            processes.difference_update(
                [p for p in processes if p.poll() is not None])

    #Check if all the child processes were closed
    for p in processes:
        if p.poll() is None:
            p.wait()

urls = ['http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.en.gz',
'http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.cs.gz', 
'http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.de.gz']

parallelized_commandline('wget', urls)

urlretrieve没有不使用os.systemsubprocess作弊的并行化方法?

鉴于我现在必须诉诸“作弊”,是否是subprocess.Popen下载数据的正确方法?

使用parallelized_commandline()上述方法时,它使用的是多线程而不是多核的wget,是否正常?有没有办法使它成为多核而不是多线程?

关注者
0
被浏览
64
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    您可以使用线程池并行下载文件:

    #!/usr/bin/env python3
    from multiprocessing.dummy import Pool # use threads for I/O bound tasks
    from urllib.request import urlretrieve
    
    urls = [...]
    result = Pool(4).map(urlretrieve, urls) # download 4 files at a time
    

    您还可以使用asyncio以下命令在一个线程中一次下载多个文件:

    #!/usr/bin/env python3
    import asyncio
    import logging
    from contextlib import closing
    import aiohttp # $ pip install aiohttp
    
    @asyncio.coroutine
    def download(url, session, semaphore, chunk_size=1<<15):
        with (yield from semaphore): # limit number of concurrent downloads
            filename = url2filename(url)
            logging.info('downloading %s', filename)
            response = yield from session.get(url)
            with closing(response), open(filename, 'wb') as file:
                while True: # save file
                    chunk = yield from response.content.read(chunk_size)
                    if not chunk:
                        break
                    file.write(chunk)
            logging.info('done %s', filename)
        return filename, (response.status, tuple(response.headers.items()))
    
    urls = [...]
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
    with closing(asyncio.get_event_loop()) as loop, \
         closing(aiohttp.ClientSession()) as session:
        semaphore = asyncio.Semaphore(4)
        download_tasks = (download(url, session, semaphore) for url in urls)
        result = loop.run_until_complete(asyncio.gather(*download_tasks))
    

    这里url2filename()定义在哪里。



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看