具有有限CPU /端口的Python多线程处理

发布于 2021-01-29 15:01:27

我有一个要 并行 处理的文件夹名称字典。在每个文件夹,里面是文件名的数组,我想在加工 系列

folder_file_dict = {
         folder_name : {
                         file_names_key : [file_names_array]
                       }
        }

最终,我将创建一个名为folder_name的文件夹,其中包含名称为的文件len(folder_file_dict[folder_name][file_names_key])。我有这样的方法:

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"

udp_ports = [123, 456, 789]

请注意time_consuming_method()上面的内容,由于通过UDP端口进行的调用会花费很长时间。我也仅限于在上面的阵列中使用UDP端口。因此,我必须等待time_consuming_methodUDP端口完成操作,然后才能再次使用该UDP端口。这意味着我一次只能len(udp_ports)运行线程。

因此,我最终将len(folder_file_dict.keys())通过len(folder_file_dict.keys())调用来创建线程process_files_in_series。我也有MAX_THREAD个计数。我正在尝试使用QueueThreading模块,但是我不确定我需要哪种设计。如何使用队列和线程以及可能的条件来做到这一点?使用线程池的解决方案也可能会有所帮助。

注意

我没有试图提高读取/写入速度。我正在尝试并行调用time_consuming_methodunder
process_files_in_series。创建这些文件只是过程的一部分,而不是速率限制步骤。

另外,我要寻找一个解决方案,使用QueueThreading以及可能的Condition模块或相关于这些模块什么。线程池解决方案也可能会有所帮助。我不能使用进程,只能使用线程。

我也在寻找Python 2.7中的解决方案。

关注者
0
被浏览
66
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    使用线程池:

    #!/usr/bin/env python2
    from multiprocessing.dummy import Pool, Queue # thread pool
    
    folder_file_dict = {
        folder_name: {
            file_names_key: file_names_array
        }
    }
    
    def process_files_in_series(file_names_array, udp_port):
        for file_name in file_names_array:
             time_consuming_method(file_name, udp_port)
             # create "file_name"
             ...
    
    def mp_process(filenames):
        udp_port = free_udp_ports.get() # block until a free udp port is available
        args = filenames, udp_port
        try:
            return args, process_files_in_series(*args), None
        except Exception as e:
            return args, None, str(e)
        finally:
            free_udp_ports.put_nowait(udp_port)
    
    free_udp_ports = Queue() # in general, use initializer to pass it to children
    for port in udp_ports:
        free_udp_ports.put_nowait(port)
    pool = Pool(number_of_concurrent_jobs) #
    for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()):
        if error is not None:
           print args, error
    

    我认为如果不同文件名数组的处理时间可能不同,则不需要将线程数绑定到udp端口数。

    如果我folder_file_dict正确理解了结构,则生成文件名数组:

    def get_files_arrays(folder_file_dict=folder_file_dict):
        for folder_name_dict in folder_file_dict.itervalues():
            for filenames_array in folder_name_dict.itervalues():
                yield filenames_array
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看