multiprocessing.Pool-PicklingError:无法腌制 :属性查找thread.lock失败

发布于 2021-01-29 19:34:37

multiprocessing.Pool令我发疯……
我想升级许多软件包,对于每个软件包,我都必须检查是否有更大的版本。这是通过check_one函数来完成的。
主要代码在Updater.update方法中:在此处创建Pool对象并调用该map()方法。

这是代码:

def check_one(args):
    res, total, package, version = args
    i = res.qsize()
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
        i / float(total), package, i, total, addn=False)
    try:
        json = PyPIJson(package).retrieve()
        new_version = Version(json['info']['version'])
    except Exception as e:
        logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
        return
    if new_version > version:
        res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

    # __init__ and other methods...

    def update(self):    
        logger.info('Searching for updates')
        packages = Queue.Queue()
        data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
            for dist in self.working_set)
        pool = multiprocessing.Pool()
        pool.map(check_one, data)
        pool.close()
        pool.join()
        while True:
            try:
                package, version, new_version, json = packages.get_nowait()
            except Queue.Empty:
                break
            txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
                                                                                      new_version,
                                                                                      version)
            u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
            if u:
                self.upgrade(package, json, new_version)
            else:
                logger.info('{0} has not been upgraded', package)
        self._clean()
        logger.success('Updating finished successfully')

当我运行它时,我得到这个奇怪的错误:

Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
关注者
0
被浏览
58
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    多重处理通过传递任务(包括check_onedata)到工作进程mp.SimpleQueue。与Queue.Queues不同,放入s中的所有内容都mp.SimpleQueue必须是可挑选的。Queue.Queues不可选:

    import multiprocessing as mp
    import Queue
    
    def foo(queue):
        pass
    
    pool=mp.Pool()
    q=Queue.Queue()
    
    pool.map(foo,(q,))
    

    产生此异常:

    UnpickleableError: Cannot pickle <type 'thread.lock'> objects
    

    您的datainclude包括packages一个Queue.Queue。这可能是问题的根源。


    这是一个可能的解决方法:该方法Queue有两个用途:

    1. 找出大概的大小(通过调用qsize
    2. 存储结果供以后检索。

    qsize我们可以使用而不是在多个进程之间共享值,而无需调用mp.Value

    除了将结果存储在队列中之外,我们可以(并且应该)仅从调用返回值check_one。该pool.map在自己制作的队列中收集结果,并返回结果的返回值pool.map

    例如:

    import multiprocessing as mp
    import Queue
    import random
    import logging
    
    # logger=mp.log_to_stderr(logging.DEBUG)
    logger = logging.getLogger(__name__)
    
    
    qsize = mp.Value('i', 1)
    def check_one(args):
        total, package, version = args
        i = qsize.value
        logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
            i / float(total), package, i, total))
        new_version = random.randrange(0,100)
        qsize.value += 1
        if new_version > version:
            return (package, version, new_version, None)
        else:
            return None
    
    def update():    
        logger.info('Searching for updates')
        set_len=10
        data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
                 for i in range(set_len) )
        pool = mp.Pool()
        results = pool.map(check_one, data)
        pool.close()
        pool.join()
        for result in results:
            if result is None: continue
            package, version, new_version, json = result
            txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
                package, new_version, version)
            logger.info(txt)
        logger.info('Updating finished successfully')
    
    if __name__=='__main__':
        logging.basicConfig(level=logging.DEBUG)
        update()
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看