python类Pool()的实例源码

nifti_viewer.py 文件源码 项目:cortex 作者: rdevon 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def save_images(nifti_files, anat, roi_dict, out_dir, **kwargs):
    '''Saves multiple nifti images using multiprocessing.

    Uses `multiprocessing`.

    Args:
        nifti_files (list): list of nifti file paths.
        anat (nipy.core.api.image.image.Image): anatomical image.
        roi_dict (dict): dictionary of cluster dictionaries.
        out_dir (str): output directory path.
        **kwargs: extra keyword arguments.

    '''
    p = mp.Pool(30)
    idx = [int(f.split('/')[-1].split('.')[0]) for f in nifti_files]
    args_iter = itertools.izip(nifti_files,
                               itertools.repeat(anat),
                               [roi_dict[i] for i in idx],
                               [path.join(out_dir, '%d.png' % i) for i in idx],
                               idx)

    p.map(save_helper, args_iter)
    p.close()
    p.join()
utils.py 文件源码 项目:census-loader 作者: minus34 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def multiprocess_csv_import(work_list, settings, logger):
    pool = multiprocessing.Pool(processes=settings['max_concurrent_processes'])

    num_jobs = len(work_list)

    results = pool.imap_unordered(run_csv_import_multiprocessing, [[w, settings] for w in work_list])

    pool.close()
    pool.join()

    result_list = list(results)
    num_results = len(result_list)

    if num_jobs > num_results:
        logger.warning("\t- A MULTIPROCESSING PROCESS FAILED WITHOUT AN ERROR\nACTION: Check the record counts")

    for result in result_list:
        if result != "SUCCESS":
            logger.info(result)
utils.py 文件源码 项目:census-loader 作者: minus34 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def multiprocess_list(mp_type, work_list, settings, logger):
    pool = multiprocessing.Pool(processes=settings['max_concurrent_processes'])

    num_jobs = len(work_list)

    if mp_type == "sql":
        results = pool.imap_unordered(run_sql_multiprocessing, [[w, settings] for w in work_list])
    else:
        results = pool.imap_unordered(run_command_line, work_list)

    pool.close()
    pool.join()

    result_list = list(results)
    num_results = len(result_list)

    if num_jobs > num_results:
        logger.warning("\t- A MULTIPROCESSING PROCESS FAILED WITHOUT AN ERROR\nACTION: Check the record counts")

    for result in result_list:
        if result != "SUCCESS":
            logger.info(result)
utils.py 文件源码 项目:census-loader 作者: minus34 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def multiprocess_shapefile_load(work_list, settings, logger):
    pool = multiprocessing.Pool(processes=settings['max_concurrent_processes'])

    num_jobs = len(work_list)

    results = pool.imap_unordered(intermediate_shapefile_load_step, [[w, settings] for w in work_list])

    pool.close()
    pool.join()

    result_list = list(results)
    num_results = len(result_list)

    if num_jobs > num_results:
        logger.warning("\t- A MULTIPROCESSING PROCESS FAILED WITHOUT AN ERROR\nACTION: Check the record counts")

    for result in result_list:
        if result != "SUCCESS":
            logger.info(result)
__init__.py 文件源码 项目:indexed_gzip 作者: pauldmccarthy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def check_data_valid(data, startval, endval=None):
    if endval is None:
        endval = len(data)

    chunksize = 10000000

    startval = int(startval)
    endval   = int(endval)

    offsets = np.arange(0, len(data), chunksize)
    args = []
    result = True
    for offset in offsets:
        s      = startval + offset
        e      = min(s + chunksize, endval)
        nelems = e - s
        test_chunk = data[offset:offset + nelems]
        args.append((s, e, test_chunk))

    pool = mp.Pool()
    result = all(pool.map(_check_chunk, args))
    pool.terminate()

    return result
rm.py 文件源码 项目:dlcli 作者: outlyerapp 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def metrics(period, resolution, tag, threads, metricsfile):
    try:
        pool = Pool(processes=threads)
        period_seconds = period * 3600
        resolution_seconds = resolution * 3600

        if metricsfile:
            with open(metricsfile) as fp:
                m = json.loads(fp.read().replace('][', ','))
        else:
            m = metrics_api.get_tag_metrics(tag_name=tag, **context.settings)

        click.echo(click.style('Found: %s metrics', fg='green') % (len(m)))

        expire = partial(_expire_metric_path, period_seconds, resolution_seconds, tag)
        expired_paths = tqdm(pool.imap_unordered(expire, m))
        expired_paths = sum(filter(None, expired_paths))
        click.echo(click.style('Expired: %s metric paths', fg='green') % expired_paths)

    except Exception, e:
        print 'Cleanup metrics failed. %s' % e

    finally:
        pool.terminate()
        pool.join()
cmdprox.py 文件源码 项目:proxmox-tools 作者: FredHutch 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def easy_par(f, sequence):    
    from multiprocessing import Pool
    poolsize=len(sequence)
    if poolsize > 16:
        poolsize = 16
    pool = Pool(processes=poolsize)
    try:
        # f is given sequence. guaranteed to be in order
        cleaned=False
        result = pool.map(f, sequence)
        cleaned = [x for x in result if not x is None]
        #cleaned = asarray(cleaned)
        # not optimal but safe
    except KeyboardInterrupt:
        pool.terminate()
    except Exception as e:
        print('got exception: %r' % (e,))
        if not args.force:
            print("Terminating the pool")
            pool.terminate()
    finally:
        pool.close()
        pool.join()
        return cleaned
utils.py 文件源码 项目:siggi 作者: rieck 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_bundle(filename, regex="^\d+", chunk=None):
    """ Load graphs from zip archive """

    pool = Pool()
    archive = zf.ZipFile(filename)

    # Determine entries and select subset if requested
    entries = archive.namelist()
    if chunk:
        entries = list(set(entries) & set(chunk))
    entries = [(archive, entry) for entry in entries]

    # Load entries in parallel
    func = partial(load_bundle_entry, regex=re.compile(regex))
    items = pool.map(func, entries)
    items = filter(lambda (g, l): g is not None, items)
    graphs, labels = zip(*items)

    archive.close()
    pool.close()
    pool.join()

    return graphs, labels
fun_page.py 文件源码 项目:facebook_group_crawler 作者: spartakos87 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def fun_page(page_id,onoma):
    pp= Pool(50)
    mega_list = []
    start = time.time()
    pst = p(page_id,access_token)
    n = 50 
    group_post = group(pst,n)
    temp = 0 
    for j in group_post:
        temp += len(j)
        print(str(temp)+'/'+str(len(pst)))
        re = pp.map(pros,list(j))
        for jj in re:
            mega_list.append(jj)
    duration = (time.time()-start)/float(60)
    print ("Time:"+str(duration)+'min')
    with open(onoma,'w') as f:
        json.dump(mega_list,f)
    return mega_list
docsim.py 文件源码 项目:paragraph2vec 作者: thunlp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def query_shards(self, query):
        """
        Return the result of applying shard[query] for each shard in self.shards,
        as a sequence.

        If PARALLEL_SHARDS is set, the shards are queried in parallel, using
        the multiprocessing module.
        """
        args = zip([query] * len(self.shards), self.shards)
        if PARALLEL_SHARDS and PARALLEL_SHARDS > 1:
            logger.debug("spawning %i query processes" % PARALLEL_SHARDS)
            pool = multiprocessing.Pool(PARALLEL_SHARDS)
            result = pool.imap(query_shard, args, chunksize=1 + len(args) / PARALLEL_SHARDS)
        else:
            # serial processing, one shard after another
            pool = None
            result = imap(query_shard, args)
        return pool, result
process_usage.py 文件源码 项目:base_function 作者: Rockyzsu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_pool():
    p=Pool(10)
    start=time.time()
    #q1=Queue.Queue()
    manager=Manager()
    q=manager.Queue()
    print "main start ",start
    for i in xrange(10):
        p.apply_async(sub_pool,args=(q,))
    p.close()
    p.join()
    end=time.time()

    print "process done at ",end
    #print q
    print q.get()
    '''
    while q1.empty() ==False:
        d= q1.get(True)
        print d
    '''
process_usage.py 文件源码 项目:base_function 作者: Rockyzsu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pool_map():
    x=[i for i in range (50) if i%2==0]
    #print x

    start=time.time()
    '''
    for i in x:
        single(i)
    print "time used " , time.time()-start
    '''
    #using multiprocess
    p=Pool(2)
    s=p.map(single,x)
    p.close()
    p.join()
    print s
    print len(s)
    print "end. Time used: ",time.time()-start
tts_files.py 文件源码 项目:tts-stray 作者: tweetyf 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def transTest():
     #??????????CPU????
    numprocs = 8 #?4????4???????4?
    pool = multiprocessing.Pool(processes=numprocs)
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("BBCHeadline.txt","BBCHeadline.wav","en"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("1984.txt","1984.wav","en"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("emma.txt","emma.wav","en"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("Home.2009.eng.txt","Home.2009.eng.wav","en"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("StrayBirds.txt","StrayBirds.wav","en"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("????.txt","????.wav","zh"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("???????.txt","???????.wav","zh"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("????????.txt","????????.wav","zh"))
    pool.apply_async(func =tts_baidu.fileToVoice , args=("????.txt","????.wav","zh"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("????????.txt","????????.wav","zh"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("???????.txt","???????.wav","zh"))
    #pool.apply_async(func =tts_baidu.fileToVoice , args=("??????.txt","??????.wav","zh"))
    pool.close()
    pool.join()
tts_files.py 文件源码 项目:tts-stray 作者: tweetyf 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def trans_novs():
    # 249 novals gaga
    #??????????CPU????
    count =0 
    numprocs = 8 #?4????4???????4?
    pool = multiprocessing.Pool(processes=numprocs)
    files = os.listdir("./nov")
    for filename in files:
        if not os.path.isdir(filename):
            if filename.endswith("txt"):
                fname = "./nov/"+filename
                #print fname
                pool.apply_async(func =tts_baidu.fileToVoice , args=(fname,fname+".wav","zh"))
    pool.close()
    pool.join()
    #print "finish all ,total handle TTS GET() :",count
gaeDataExport.py 文件源码 项目:GAEDataExport 作者: nikhilsaraf 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    input_dir, output_dir = getDirs()
    table_list = listFiles(input_dir)

    concurrency = cpu_count()
    print 'Using {0:d} Processes'.format(concurrency)
    pool = Pool(concurrency)

    # perform the passed in write action (function) for each csv row
    time_capture = TimeCapture(time.time())
    results = pool.map(
        multiprocess,
        izip(repeat(output_dir),
            [copy.deepcopy(time_capture) for i in range(len(table_list))],
            table_list,
            repeat(write)))
    time_capture.end(1)

    pool.close()
    pool.join()

    print 'Finished Successfully!'
    displayResults(results, time_capture.total_time)
fuzzy_c_means.py 文件源码 项目:HSISeg 作者: HSISeg 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_dissimilarity_matrix(U,V,X,n,error_list,beta,alpha_w,alpha_e_avg_t,alpha_n0,maxconn):
    row_size = X.shape[0]
    col_size = X.shape[1]
    channel_count = X.shape[2]
    alpha = get_alpha(n,error_list,alpha_w,alpha_e_avg_t,alpha_n0)
    cluster_number = V.shape[0]
    D = np.zeros((row_size,col_size,cluster_number)) 
    index_arr = np.array([[k,l] for k in xrange(row_size) for l in xrange(col_size)],dtype='int32')
    U_new = U.reshape(row_size*col_size,cluster_number, order='F')
    data_inputs = [0 for i in xrange(0,row_size*col_size)]
    for i in xrange(0, row_size*col_size):
        x = index_arr[i][0]
        y = index_arr[i][1]
        data_inputs[i] = [U_new,V,X[x][y],x,y,alpha,beta[x*row_size+y,:]]
    pool = Pool(maxconn) 
    outputs = pool.map(compute_cluster_distances_pool, data_inputs)
    pool.close()
    pool.join()
    for i in xrange(0,row_size*col_size):
        x = index_arr[i][0]
        y = index_arr[i][1]
        D[x][y] = outputs[i]
    return D
dataset.py 文件源码 项目:kaggle-seizure-prediction 作者: sics-lm 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def load_files_parallel(feature_files, load_function, processes, **kwargs):
    """
    Function for loading feature files in parallel.
    :param feature_files: The collection of files to load.
    :param load_function: The function used to load the objects.
    :param processes: The number of processes to use for loading the feature files.
    :param kwargs: Keyword arguments which will be sent to the load function.
    :return: A list of loaded feature data frames or numpy arrays.
    """
    logging.info("Reading files in parallel")
    pool = multiprocessing.Pool(processes)
    try:
        #Create a partial function with the keyword arguments set. This is done since the parallel map from
        # multiprocessing expects a function which takes a single argument.
        partial_load_and_pivot = partial(load_function, **kwargs)
        segment_frames = pool.map(partial_load_and_pivot, feature_files)
    finally:
        pool.close()
    return segment_frames
Similarity Metrics - In Parallel.py 文件源码 项目:Parallel-Processing-Nadig 作者: madhug-nadig 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def parallel_jaccard_similarity(self,x,y):

        p = 16
        pool = mp.Pool(processes= p)

        chunk_X = []
        chunk_Y = []

        for i in range(0, len(x), p):

            chunk_X.append(x[int(i):int((i+1)*p)])
            chunk_Y.append(y[int(i):int((i+1)*p)])

        s = time.clock()

        intersection_cardinality = sum(pool.starmap(self.interc_card_locl, zip(chunk_X,chunk_Y)))
        union_cardinality = sum(pool.starmap(self.union_card_locl, zip(chunk_X,chunk_Y)))
        print(intersection_cardinality, union_cardinality)
        e = time.clock()
        print("Parallel Jaccard Exec Time: ", e-s)
        return intersection_cardinality/float(union_cardinality)
boot.py 文件源码 项目:pylspm 作者: lseman 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def boot(self):
        p = Pool(self.cores)
        result = p.map(self.do_work, range(self.br))
        p.close()
        p.join()
        return result
boot.py 文件源码 项目:pylspm 作者: lseman 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def jk(self):
        p = Pool(self.cores)

        base = np.arange(0, len(self.data))
        self.indices = list(np.delete(base, i) for i in base)

        result = p.map(self.do_work_jk, range(self.br))
        p.close()
        p.join()
        return result
debug_snapshot.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_container_id_mapping(pool, compose_cmd):
    service_names = subprocess.check_output(
        compose_cmd + ["config", "--services"]
    )
    service_names = service_names.strip().decode("utf-8").split("\n")
    id_mapping = {
        name: pool.apply_async(pool_container_id, (name, compose_cmd))
        for name in service_names
    }

    while not all(future.ready() for future in id_mapping.values()):
        time.sleep(0.1)
    for name, future in list(id_mapping.items()):
        if not future.successful():
            raise RuntimeError("Cannot get ID of service {0}".format(name))
        id_mapping[name] = future.get()

    return id_mapping
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_pool_worker_lifetime_early_close(self):
        # Issue #10332: closing a pool whose workers have limited lifetimes
        # before all the tasks completed would make join() hang.
        p = multiprocessing.Pool(3, maxtasksperchild=1)
        results = []
        for i in range(6):
            results.append(p.apply_async(sqr, (i, 0.3)))
        p.close()
        p.join()
        # check the results
        for (j, res) in enumerate(results):
            self.assertEqual(res.get(), sqr(j))

#
# Test of creating a customized manager class
#
pool.py 文件源码 项目:pytorch-dist 作者: apaszke 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            # changed worker -> clean_worker
            args = (self._inqueue, self._outqueue,
                    self._initializer,
                    self._initargs, self._maxtasksperchild)
            if hasattr(self, '_wrap_exception'):
                args += (self._wrap_exception,)
            w = self.Process(target=clean_worker, args=args)
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            util.debug('added worker')
pool.py 文件源码 项目:planetplanet 作者: rodluger 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, comm=None, loadbalance=False, debug=False,
                 wait_on_start = True, exit_on_end = True, 
                 cores_per_task = 1, **kwargs):
        if MPI is None:
            raise ImportError("Please install mpi4py")

        self.comm = MPI.COMM_WORLD if comm is None else comm
        self.rank = self.comm.Get_rank()
        if cores_per_task > 1:
          self.size  = max(1, self.comm.Get_size() // cores_per_task)
        else:
          self.size = self.comm.Get_size() - 1
        self.function = _error_function
        self.loadbalance = loadbalance
        self.debug = debug
        if self.size == 0:
            raise ValueError("Tried to create an MPI pool, but there "
                             "was only one MPI process available. "
                             "Need at least two.")
        self.exit_on_end = exit_on_end

        # Enter main loop for workers?
        if wait_on_start:
            if self.is_worker():
                self.wait()
pool.py 文件源码 项目:planetplanet 作者: rodluger 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def Pool(pool = 'AnyPool', **kwargs):
    '''
    Chooses between the different pools.
    If ``pool == 'AnyPool'``, chooses based on availability.

    '''

    if pool == 'MPIPool':
        return MPIPool(**kwargs)  
    elif pool == 'MultiPool':
        return MultiPool(**kwargs)
    elif pool == 'SerialPool':
        return SerialPool(**kwargs)
    elif pool == 'AnyPool':
        if MPIPool.enabled():
            return MPIPool(**kwargs)  
        elif MultiPool.enabled():
            return MultiPool(**kwargs)
        else:
            return SerialPool(**kwargs)
    else:
        raise ValueError('Invalid pool ``%s``.' % pool)
wordembed.py 文件源码 项目:pytorch-skipthoughts 作者: kaniblu 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def load_embeddings_mp(path, word_dim, processes=None):

    if processes is None:
        processes = multiprocessing.cpu_count()

    pool = mp.Pool(processes, initializer=_mp_initialize,
                   initargs=(word_dim,))

    with open(path, "r") as f:
        iterator = chunks(f, n=processes,
                          k=processes * 10000)
        ret = {}
        for batches in iterator:
            results = pool.map_async(_mp_process, batches)
            results = results.get()
            results = aggregate_dicts(*results)

            ret.update(results)

        return ret
utils.py 文件源码 项目:isar 作者: ilbers 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process_profilelog(fn, pout = None):
    # Either call with a list of filenames and set pout or a filename and optionally pout.
    if not pout:
        pout = fn + '.processed'
    pout = open(pout, 'w')

    import pstats
    if isinstance(fn, list):
        p = pstats.Stats(*fn, stream=pout)
    else:
        p = pstats.Stats(fn, stream=pout)
    p.sort_stats('time')
    p.print_stats()
    p.print_callers()
    p.sort_stats('cumulative')
    p.print_stats()

    pout.flush()
    pout.close()  

#
# Was present to work around multiprocessing pool bugs in python < 2.7.3
#
utils.py 文件源码 项目:isar 作者: ilbers 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def multiprocessingpool(*args, **kwargs):

    import multiprocessing.pool
    #import multiprocessing.util
    #multiprocessing.util.log_to_stderr(10)
    # Deal with a multiprocessing bug where signals to the processes would be delayed until the work
    # completes. Putting in a timeout means the signals (like SIGINT/SIGTERM) get processed.
    def wrapper(func):
        def wrap(self, timeout=None):
            return func(self, timeout=timeout if timeout is not None else 1e100)
        return wrap
    multiprocessing.pool.IMapIterator.next = wrapper(multiprocessing.pool.IMapIterator.next)

    return multiprocessing.Pool(*args, **kwargs)
pool.py 文件源码 项目:pytorch 作者: tylergenter 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            # changed worker -> clean_worker
            args = (self._inqueue, self._outqueue,
                    self._initializer,
                    self._initargs, self._maxtasksperchild)
            if hasattr(self, '_wrap_exception'):
                args += (self._wrap_exception,)
            w = self.Process(target=clean_worker, args=args)
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            util.debug('added worker')
test_multiprocessing.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_imap_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
        for i in range(3):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)

        # SayWhenError seen at start of problematic chunk's results
        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
        for i in range(6):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)
        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
        for i in range(4):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)


问题


面经


文章

微信
公众号

扫码关注公众号