common.py 文件源码

python
阅读 17 收藏 0 点赞 0 评论 0

项目:STAR-SEQR 作者: ExpressionAnalysis 项目源码 文件源码
def pandas_parallel(df, func, nthreads, mp_type, split, *opts):
    '''wrapper to run pandas apply function with multiprocessing. *opts will take any number of optional arguments to be passed
    into the function. Note these will be passed to the function as a tuple and need to be parsed. '''
    def init_worker():
        signal.signal(signal.SIGINT, signal.SIG_IGN)

    logger.info("Begin multiprocessing of function %s in a pool of %s workers using %s protocol" % (str(func.__name__), nthreads, mp_type))
    if split == '':
        logger.debug("*The dataframe will be split evenly across the %s workers" % nthreads)
        df_split = np.array_split(df, min(nthreads, len(df.index)))
    else:
        logger.debug('*The dataframe will be split on the column %s' % split)
        df_split = (df.loc[df[split] == sub, :] for sub in df[split].unique())

    # Pack params for processing
    if not opts:
        params = df_split
    elif len(opts) == 1:
        params = zip(df_split, repeat(opts[0]))
    elif len(opts) > 1:
        params = zip(df_split, repeat(opts))

    init = time.time()
    try:
        logger.debug("*Initializing a %s pool with %s workers" % (mp_type, nthreads))
        pool = mp.Pool(nthreads, init_worker)  # Initialize the pool
        pool_func = getattr(pool, mp_type) # set the function (map, imap, etc)
        if mp_type == "map_async":
            these_res = pool_func(func, params).get()
            out = pd.concat(these_res, axis=0)
        elif mp_type in ["map", "imap", "imap_unordered"]:
            these_res = pool_func(func, params)
            out = pd.concat(these_res, axis=0) # this works even for iterables
        pool.close()
    except KeyboardInterrupt as e:
        logger.error("Error: Keyboard interrupt")
        pool.terminate()
        raise e
    except Exception as e:
        logger.error("Exception: " + str(e))
        pool.terminate()
        traceback = sys.exc_info()[2]
        raise_(ValueError, e, traceback)
    finally:
            pool.join()
    logger.debug("*Time to run pandas_parallel on " + str(func.__name__) + " took %g seconds" % (time.time() - init))
    return(out)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号