python类Pool()的实例源码

test_mp_speed.py 文件源码 项目:TensorArtist 作者: vacancy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_mp():
    pool = mppool.Pool(4)
    start_time = time.time()
    lengths = pool.map(worker, range(4))
    finish_time = time.time()
    print('Multiprocessing: total_length={}, time={:.2f}s.'.format(sum(lengths), finish_time - start_time))
preprocessing.py 文件源码 项目:speechT 作者: timediv 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def store_samples(self, directory, preprocess_fnc):
    """
    Read audio files from `directory` and store the preprocessed version in preprocessed/`directory`

    Args:
      directory: the sub-directory to read from
      preprocess_fnc: The preprocessing function to use

    """

    out_directory = self._get_directory(preprocess_fnc, directory)

    if not os.path.exists(out_directory):
      os.makedirs(out_directory)

    audio_files = list(iglob_recursive(self._data_directory + '/' + directory, '*.flac'))

    with Pool(processes=multiprocessing.cpu_count()) as pool:

      transcript_dict = self._transcript_dict

      for audio_file in audio_files:
        audio_id = self._extract_audio_id(audio_file)
        transcript_entry = transcript_dict[audio_id]
        transform_args = (audio_file, preprocess_fnc, transcript_entry, out_directory)
        pool.apply_async(SpeechCorpusReader._transform_and_store_sample, transform_args,
                         error_callback=self._preprocessing_error_callback)

      pool.close()
      pool.join()
__init__.py 文件源码 项目:empyrion-python-api 作者: huhlig 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
    '''
    Returns a process pool object
    '''
    from multiprocessing.pool import Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
trainset.py 文件源码 项目:talkbot 作者: nimnull 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def gen_set():
    img_files = [files for _, _, files in os.walk(BASEDIR)]

    with Pool(processes=pool_size) as pool:
        for idx, image in enumerate(img_files):
            image_path = os.path.join(BASEDIR, image)
            for sample_options in ALG:
                pool.apply(save_training_sampl, (image_path, idx, sample_options))
trainset.py 文件源码 项目:talkbot 作者: nimnull 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def store_training(out):
    start = time.time()
    img_files = [files for _, _, files in os.walk(TRAINDIR)]
    train_combs = itertools.combinations(img_files, 2)

    with Pool(processes=pool_size) as pool:
        vectors = pool.map(get_images_diff_vectors, train_combs)
    df = pd.DataFrame(vectors)
    df.to_csv(out, index=False)
    finished = time.time() - start
    click.echo("Finised in %s seconds" % finished)
run.py 文件源码 项目:hangul-utils 作者: kaniblu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def process_mp(texts, args, pool=None):
    if pool is None:
        pool = mp.Pool(args.n_processes,
                       initializer=mp_initialize,
                       initargs=(args, ))

    iterator = chunks(enumerate(texts),
                      n=args.n_processes,
                      k=args.n_processes * 1000)

    if args.progress:
        t = tqdm.tqdm()
    else:
        t = None

    results = []

    for batches in iterator:
        n_items = sum(len(x) for x in batches)

        result = pool.map_async(process, batches)
        result = result.get()
        result = [i for batch in result for i in batch]
        result.sort(key=lambda x: x[0])

        idx, result = zip(*result)
        results.extend(result)

        if args.progress:
            t.update(n_items)

    return results
malshare_downloader.py 文件源码 项目:PD_update 作者: liebesu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def pool(md5s):
    pool=Pool(processes=10)
    pool.map(downloader,md5s)
    pool.close()
    pool.join()
tasks.py 文件源码 项目:SoS 作者: vatlab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def kill_tasks(tasks, tags=None):
    #
    import glob
    from multiprocessing.pool import ThreadPool as Pool
    if not tasks:
        tasks = glob.glob(os.path.join(os.path.expanduser('~'), '.sos', 'tasks', '*.task'))
        all_tasks = [os.path.basename(x)[:-5] for x in tasks]
    else:
        all_tasks = []
        for t in tasks:
            matched = glob.glob(os.path.join(os.path.expanduser('~'), '.sos', 'tasks', f'{t}*.task'))
            matched = [os.path.basename(x)[:-5] for x in matched]
            if not matched:
                env.logger.warning(f'{t} does not match any existing task')
            else:
                all_tasks.extend(matched)
    if tags:
        all_tasks = [x for x in all_tasks if any(x in tags for x in taskTags(x).split(' '))]

    if not all_tasks:
        env.logger.warning('No task to kill')
        return
    all_tasks = sorted(list(set(all_tasks)))
    p = Pool(len(all_tasks))
    killed = p.map(kill_task, all_tasks)
    for s, t in zip(killed, all_tasks):
        print(f'{t}\t{s}')
multiprocessing.py 文件源码 项目:python-pool-performance 作者: JohnStarich 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def init_pool(self, worker_count):
        return Pool(worker_count)
train.py 文件源码 项目:hh-page-classifier 作者: TeamHG-Memex 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def add_extracted_text(xs):
    with Pool() as pool:
        for doc, features in zip(
                xs, pool.imap(extract_features, xs, chunksize=10)):
            doc.update(features)
abstract_db.py 文件源码 项目:ntee 作者: studio-ousia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def build(in_dir, out_file, pool_size):
        with closing(AbstractDB(out_file, protocol=-1)) as db:
            target_files = [f for f in sorted(os.listdir(in_dir)) if f.endswith('ttl.gz')]
            with closing(Pool(pool_size)) as pool:
                f = partial(_process_file, in_dir=in_dir)
                for ret in pool.imap(f, target_files):
                    for (key, obj) in ret:
                        db[key] = obj
mpindexer.py 文件源码 项目:snovault 作者: ENCODE-DCC 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pool(self):
        return Pool(
            processes=self.processes,
            initializer=initializer,
            initargs=self.initargs,
            maxtasksperchild=self.maxtasks,
            context=get_context('forkserver'),
        )
bench_stats.py 文件源码 项目:composability_bench 作者: IntelPython 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run_pp(n, body):
    """Process Pool.map"""
    from multiprocessing.pool import Pool
    global reused_pool, numthreads
    global args
    if 'reused_pool' not in globals():
        log.debug("Creating Pool(%s)" % numthreads)
        reused_pool = Pool(int(numthreads))
    reused_pool.map(body, n)
convert.py 文件源码 项目:melanoma-transfer 作者: learningtitans 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def main(directory, convert_directory, test, crop_size, extension):

    try:
        os.mkdir(convert_directory)
    except OSError:
        pass

    filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
                 for f in fn if f.endswith('jpeg') or f.endswith('tiff')]
    filenames = sorted(filenames)

    if test:
        names = data.get_names(filenames)
        y = data.get_labels(names)
        for f, level in zip(filenames, y):
            if level == 1:
                try:
                    img = convert(f, crop_size)
                    img.show()
                    Image.open(f).show()
                    real_raw_input = vars(__builtins__).get('raw_input',input)
                    real_raw_input('enter for next')
                except KeyboardInterrupt:
                    exit(0)

    print("Resizing images in {} to {}, this takes a while."
          "".format(directory, convert_directory))

    n = len(filenames)
    # process in batches, sometimes weird things happen with Pool on my machine
    batchsize = 500
    batches = n // batchsize + 1
    pool = Pool(N_PROC)

    args = []

    for f in filenames:
        args.append((convert, (directory, convert_directory, f, crop_size,
                           extension)))

    for i in range(batches):
        print("batch {:>2} / {}".format(i + 1, batches))
        pool.map(process, args[i * batchsize: (i + 1) * batchsize])

    pool.close()

    print('done')
query.py 文件源码 项目:twitterscraper 作者: taspinar 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def query_all_tweets(query):
    """
    Queries *all* tweets in the history of twitter for the given query. This
    will run in parallel for each ~10 days.

    :param query: A twitter advanced search query.
    :return: A list of tweets.
    """
    year = 2006
    month = 3

    limits = []
    while date(year=year, month=month, day=1) < date.today():
        nextmonth = month + 1 if month < 12 else 1
        nextyear = year + 1 if nextmonth == 1 else year

        limits.append(
            (date(year=year, month=month, day=1),
             date(year=year, month=month, day=10))
        )
        limits.append(
            (date(year=year, month=month, day=10),
             date(year=year, month=month, day=20))
        )
        limits.append(
            (date(year=year, month=month, day=20),
             date(year=nextyear, month=nextmonth, day=1))
        )
        year, month = nextyear, nextmonth

    queries = ['{} since:{} until:{}'.format(query, since, until)
               for since, until in reversed(limits)]

    pool = Pool(20)
    all_tweets = []
    try:
        for new_tweets in pool.imap_unordered(query_tweets_once, queries):
            all_tweets.extend(new_tweets)
            logging.info("Got {} tweets ({} new).".format(
                len(all_tweets), len(new_tweets)))
    except KeyboardInterrupt:
        logging.info("Program interrupted by user. Returning all tweets "
                     "gathered so far.")

    return sorted(all_tweets)
convert.py 文件源码 项目:tefla 作者: openAGI 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main(directory, convert_directory, test, crop_size, extension):
    try:
        os.mkdir(convert_directory)
    except OSError:
        pass

    supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif'])

    filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
                 for f in fn if f.split('.')[-1].lower() in supported_extensions]
    filenames = sorted(filenames)

    if test:
        names = data.get_names(filenames)
        y = data.get_labels(names)
        for f, level in zip(filenames, y):
            if level == 1:
                try:
                    img = convert(f, crop_size)
                    img.show()
                    Image.open(f).show()
                    real_raw_input = vars(__builtins__).get('raw_input', input)
                    real_raw_input('enter for next')
                except KeyboardInterrupt:
                    exit(0)

    print("Resizing images in {} to {}, this takes a while."
          "".format(directory, convert_directory))

    n = len(filenames)
    # process in batches, sometimes weird things happen with Pool on my machine
    batchsize = 500
    batches = n // batchsize + 1
    pool = Pool(N_PROC)

    args = []

    for f in filenames:
        args.append((convert, (directory, convert_directory, f, crop_size,
                               extension)))

    for i in range(batches):
        print("batch {:>2} / {}".format(i + 1, batches))
        pool.map(process, args[i * batchsize: (i + 1) * batchsize])

    pool.close()

    print('done')
convert_seg.py 文件源码 项目:tefla 作者: openAGI 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main(directory, convert_directory, test, crop_size, extension):
    try:
        os.mkdir(convert_directory)
    except OSError:
        pass

    supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif'])

    # filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
    # for f in fn if f.split('.')[-1].lower() in supported_extensions]
    filenames = [each for each in os.listdir(
        directory) if each.endswith('.jpg')]
    filenames = [os.path.join(directory, filename.strip(
        '\n')) for filename in filenames]
    # with open('/home/artelus_server/data/segment_artelus/train.txt', 'r') as f:
    #    filenames = f.readlines()
    # filenames = [os.path.join(directory, filename.strip(
    #    '\n') + '.jpg') for filename in filenames]
    filenames = sorted(filenames)

    if test:
        names = data.get_names(filenames)
        y = data.get_labels(names)
        for f, level in zip(filenames, y):
            if level == 1:
                try:
                    img = convert(f, crop_size)
                    img.show()
                    Image.open(f).show()
                    real_raw_input = vars(__builtins__).get('raw_input', input)
                    real_raw_input('enter for next')
                except KeyboardInterrupt:
                    exit(0)

    print("Resizing images in {} to {}, this takes a while."
          "".format(directory, convert_directory))

    n = len(filenames)
    # process in batches, sometimes weird things happen with Pool on my machine
    batchsize = 500
    batches = n // batchsize + 1
    pool = Pool(N_PROC)

    args = []

    for f in filenames:
        label_f = f[:-4] + '_final_mask.png'
        args.append((convert, (directory, convert_directory, f, label_f, crop_size,
                               extension)))

    for i in range(batches):
        print("batch {:>2} / {}".format(i + 1, batches))
        pool.map(process, args[i * batchsize: (i + 1) * batchsize])

    pool.close()

    print('done')
quantifier.py 文件源码 项目:semeval2016-task4 作者: aesuli 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fit(self, X, y):
        labels = list(set(y))
        if len(labels) != 2:
            raise Exception("A binary setup is required")

        min_count = X.shape[0]
        self._min_label = None
        for label in labels:
            count = list(y).count(label)
            if count <= min_count:
                min_count = count
                self._min_label = label

        if self._reference_label is None:
            self._reference_label = self._min_label

        if not self._reference_label in labels:
            raise Exception("Reference label does not appear in training data")

        if min_count >= self._n_folds:
            cv = cross_validation.StratifiedKFold(y, n_folds=min(X.shape[0], self._n_folds), shuffle=True,
                                                  random_state=self._seed)
        else:
            cv = cross_validation.KFold(X.shape[0], n_folds=min(X.shape[0], self._n_folds), shuffle=True,
                                        random_state=self._seed)

        tp = 0
        fp = 0
        ptp = 0
        pfn = 0
        pfp = 0
        ptn = 0

        pool = Pool(processes=10)
        requests = list()
        for train_cv, test_cv in cv:
            requests.append((X, y, train_cv, test_cv))

        for tp, fp, ptp, pfn, pfp, ptn in pool.map(self._fit_fold, requests):
            tp += tp
            fp += fp
            ptp += ptp
            pfn += ptn
            pfp += pfp
            ptn += ptn

        pool.close()

        positives = min_count
        negatives = X.shape[0] - positives
        self._tpr = tp / positives
        self._fpr = fp / negatives
        self._ptpr = ptp / (ptp + pfn)
        self._pfpr = pfp / (pfp + ptn)
        self._clf.fit(X, y)
        if self._clf.classes_[0] == self._min_label:
            self._pos_idx = 0
            self._neg_idx = 1
        else:
            self._neg_idx = 0
            self._pos_idx = 1
pool.py 文件源码 项目:mlens 作者: flennerhag 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __call__(self, a):
        m = _get_backing_memmap(a)
        if m is not None:
            # a is already backed by a memmap file, let's reuse it directly
            return _reduce_memmap_backed(a, m)

        if (not a.dtype.hasobject
                and self._max_nbytes is not None
                and a.nbytes > self._max_nbytes):
            # check that the folder exists (lazily create the pool temp folder
            # if required)
            try:
                os.makedirs(self._temp_folder)
                os.chmod(self._temp_folder, FOLDER_PERMISSIONS)
            except OSError as e:
                if e.errno != errno.EEXIST:
                    raise e

            # Find a unique, concurrent safe filename for writing the
            # content of this array only once.
            basename = "%d-%d-%s.pkl" % (
                os.getpid(), id(threading.current_thread()), hash(a))
            filename = os.path.join(self._temp_folder, basename)

            # In case the same array with the same content is passed several
            # times to the pool subprocess children, serialize it only once

            # XXX: implement an explicit reference counting scheme to make it
            # possible to delete temporary files as soon as the workers are
            # done processing this data.
            if not os.path.exists(filename):
                if self.verbose > 0:
                    print("Memmaping (shape=%r, dtype=%s) to new file %s" % (
                        a.shape, a.dtype, filename))
                for dumped_filename in dump(a, filename):
                    os.chmod(dumped_filename, FILE_PERMISSIONS)

                if self._prewarm:
                    # Warm up the data to avoid concurrent disk access in
                    # multiple children processes
                    load(filename, mmap_mode=self._mmap_mode).max()
            elif self.verbose > 1:
                print("Memmaping (shape=%s, dtype=%s) to old file %s" % (
                    a.shape, a.dtype, filename))

            # The worker process will use joblib.load to memmap the data
            return (load, (filename, self._mmap_mode))
        else:
            # do not convert a into memmap, let pickler do its usual copy with
            # the default system pickler
            if self.verbose > 1:
                print("Pickling array (shape=%r, dtype=%s)." % (
                    a.shape, a.dtype))
            return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),))


###############################################################################
# Enable custom pickling in Pool queues
LazyMultithread.py 文件源码 项目:LazyScripts 作者: jameswenzel 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def multithread_failsafe(fn, args=[[]], pool_type=Pool,
                         processes=_cpus, maxtasksperchild=1, chunksize=1,
                         verbose=True):
    '''Aynchronous multithreading that does not break on individual errors.
    Instead, prints error and message, and the input is disregarded

    Unfortunately, due to context-management restrictions, (as far as I can
    tell) both generators are needed even though the only difference is the
    maxtasksperchild arg'''

    '''Generators that yield next completed task. While execution of individual
    tasks is asynchronous, iterating through the results is not'''

    def process_generator(pool_type):
        with pool_type(processes, maxtasksperchild=maxtasksperchild) as pool:
            result_objs = (pool.apply_async(fn, arg) for arg in args)
            for r in result_objs:
                try:
                    yield r.get()
                except GeneratorExit as g:
                    raise g
                except:
                    if verbose:
                        print('######BEGIN TRACEBACK######')
                        traceback.print_exc()
                        print('######END TRACEBACK######')
                        print()
                    continue

    def thread_generator(pool_type):
        with pool_type(processes) as pool:
            result_objs = (pool.apply_async(fn, arg) for arg in args)
            for r in result_objs:
                try:
                    yield r.get()
                except GeneratorExit as g:
                    raise g
                except:
                    if verbose:
                        print('######BEGIN TRACEBACK######')
                        traceback.print_exc()
                        print('######END TRACEBACK######')
                        print()
                    continue

    # ThreadPools do not take a maxtasksperchild argument,
    # so we need to conditionally construct a generator

    if issubclass(pool_type, ThreadPool):
        return thread_generator(pool_type)
    else:
        return process_generator(pool_type)


问题


面经


文章

微信
公众号

扫码关注公众号