python类ProcessPoolExecutor()的实例源码

test_futures.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_del_shutdown(self):
        executor = futures.ProcessPoolExecutor(max_workers=5)
        list(executor.map(abs, range(-5, 5)))
        queue_management_thread = executor._queue_management_thread
        processes = executor._processes
        del executor
        gc.collect()

        queue_management_thread.join()
        for p in processes:
            p.join()
primes.py 文件源码 项目:deb-python-concurrent.futures 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def with_process_pool_executor():
    with ProcessPoolExecutor(10) as executor:
        return list(executor.map(is_prime, PRIMES))
_extractor.py 文件源码 项目:DataProperty 作者: thombashi 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __to_dp_matrix_mt(self, value_matrix):
        from concurrent import futures

        col_data_mapping = {}
        try:
            with futures.ProcessPoolExecutor(self.max_workers) as executor:
                future_list = [
                    executor.submit(
                        _to_dp_list_helper, self, col_idx,
                        data_list, self.__get_col_type_hint(col_idx),
                        self.strip_str_value
                    )
                    for col_idx, data_list
                    in enumerate(zip(*value_matrix))
                ]

                for future in futures.as_completed(future_list):
                    col_idx, value_dp_list = future.result()
                    col_data_mapping[col_idx] = value_dp_list
        finally:
            logger.debug("shutdown ProcessPoolExecutor")
            executor.shutdown()

        return list(zip(*[
            col_data_mapping[col_idx] for col_idx in sorted(col_data_mapping)
        ]))
processPoolExe.py 文件源码 项目:Learning-Concurrency-in-Python 作者: PacktPublishing 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
  print("Starting ThreadPoolExecutor")
  with ProcessPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, (2))
    future = executor.submit(task, (3))
    future = executor.submit(task, (4))

  print("All tasks complete")
processPool.py 文件源码 项目:Learning-Concurrency-in-Python 作者: PacktPublishing 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
  executor = ProcessPoolExecutor(max_workers=3)
  task1 = executor.submit(task)
  task2 = executor.submit(task)
Indexer.py 文件源码 项目:structured-query-engine 作者: apsdehal 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def future_flush(self):
        with ProcessPoolExecutor() as executor:
            executor.submit(self.flush_to_file())
prepro.py 文件源码 项目:sru 作者: taolei87 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def flatten_json(file, proc_func):
    '''A multi-processing wrapper for loading SQuAD data file.'''
    with open(file) as f:
        data = json.load(f)['data']
    with ProcessPoolExecutor(max_workers=args.threads) as executor:
        rows = executor.map(proc_func, data)
    rows = sum(rows, [])
    return rows
test_utils.py 文件源码 项目:peony-twitter 作者: odrling 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def executor():
    return ProcessPoolExecutor()
sha_futures.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main(workers=None):
    if workers:
        workers = int(workers)
    t0 = time.time()

    with futures.ProcessPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
        for future in futures.as_completed(to_do):
            res = future.result()
            print(res)

    print(STATUS.format(actual_workers, time.time() - t0))
flags_processpool.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:  # <1>
        res = executor.map(download_one, sorted(cc_list))

    return len(list(res))
futures.py 文件源码 项目:squishy 作者: tmehlinger 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process_messages(self, messages):
        future_to_message = {}
        processed = []

        self.logger.debug('processing %d messages', len(messages))
        for message in messages:
            # ThreadPoolExecutor/ProcessPoolExecutor will throw a
            # RuntimeException if we try to submit while it's shutting down.
            # If we encounter a RuntimeError, immediately stop trying to
            # submit new tasks; they will get requeued after the interval
            # configured on the queue's policy.
            try:
                future = self.pool.submit(self.func, message)
            except RuntimeError:
                self.logger.exception('cannot submit jobs to pool')
                raise
            else:
                future_to_message[future] = message

        for future in futures.as_completed(future_to_message,
                                           timeout=self.timeout):
            message = future_to_message.pop(future)
            try:
                future.result()
            except:
                self.logger.exception('exception processing message %s',
                                      message.message_id)
            else:
                processed.append(message)

        return processed
futures.py 文件源码 项目:squishy 作者: tmehlinger 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, func, pool_size=4, timeout=None):
        super(ProcessPoolWorker, self).__init__(func, pool_size=pool_size,
                                                timeout=timeout)
        self.pool = futures.ProcessPoolExecutor(max_workers=pool_size)
        self.logger = get_logger(__name__)
spider_multiprocess.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        self.crawl = CrawlProcess()
        self.output = OutPutProcess()
        self.crawl_pool = ProcessPoolExecutor(max_workers=8)
        self.crawl_deep = 100   #????
        self.crawl_cur_count = 0
gap_from_csv.py 文件源码 项目:Y8M 作者: mpekalski 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def combine_preds_2_csv(preds1, preds2, fn_out='/tmp/combo.csv', wgts=None):
    t1 = datetime.now()
    print('start combination at ', t1)
    executor = futures.ProcessPoolExecutor(max_workers=6)
    add_pred_series_wgts = functools.partial(add_pred_series, wgts=wgts)

    preds1 = sorted(preds1, key=lambda x: x[0])
    preds2 = sorted(preds2, key=lambda x: x[0])
    t2 = datetime.now()
    print('sorted preds2 at ', t2)

    lines = executor.map(add_pred_series_wgts, zip(preds1, preds2))

    t2 = datetime.now()
    print('finished adding lines at ', t2)
    #print('Lines processed: {}'.format(len(lines)))

    cnt = 0             
    with open(fn_out, 'w') as fout:
        fout.write('VideoId,LabelConfidencePairs\n')
        for line in lines:
            fout.write(line+'\n')
            cnt += 1

    print('{} prediction lines were written to {}'.format(cnt, fn_out))
    t3 = datetime.now()
    print('finished combination at', t3)
    print('Total run time: {}'.format(t3 - t1))
    return None
convert_features.py 文件源码 项目:Y8M 作者: mpekalski 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main(unused_argv):

  print("tensorflow version: %s" % tf.__version__)

  all_frame_files = gfile.Glob(FLAGS.input_data_pattern)
  f_fullpath = all_frame_files[FLAGS.file_from : FLAGS.file_to]
  f_fns = [x.split('/')[-1] for x in f_fullpath]

  exist_files = gfile.Glob(FLAGS.output_path + "C*tfrecord")
  exist_fn = [x.split('/')[-1].replace('CAtr', 'Atr')  for x in exist_files]

  yet_2_split = [x for x,y in zip(f_fullpath, f_fns) if y not in exist_fn]

  vf = [FLAGS.output_path + 'C' + x.split('/')[-1] for x in yet_2_split]

  mylog('number of files suggested: %d'%len(f_fullpath))
  mylog('number of files yet to process: %d'%len(yet_2_split))

  if FLAGS.parallel:
    from concurrent import futures
    executor = futures.ProcessPoolExecutor(max_workers=2)
    executor.map(process_one_file, zip(yet_2_split, vf))
  else: 
    for filenames in zip(yet_2_split, vf):
        #mylog('processing: {}'.format(filenames))
        process_one_file(filenames)

  mylog("done")
bench.py 文件源码 项目:Safe-RL-Benchmark 作者: befelix 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _benchmark_par(self):
        n_jobs = config.n_jobs
        with ProcessPoolExecutor(max_workers=n_jobs) as ex:
            fs = [ex.submit(_dispatch_wrap, run) for run in self.runs]
            self.runs = [f.result() for f in fs]
findrobots_par.py 文件源码 项目:python-cookbook-3rd 作者: tuanavu 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+"/*.log.gz")
    all_robots = set()
    with futures.ProcessPoolExecutor() as pool:
        for robots in pool.map(find_robots, files):
            all_robots.update(robots)
    return all_robots
test_concurrent_futures.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_context_manager_shutdown(self):
        with futures.ProcessPoolExecutor(max_workers=5) as e:
            processes = e._processes
            self.assertEqual(list(e.map(abs, range(-5, 5))),
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])

        for p in processes:
            p.join()
test_concurrent_futures.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_del_shutdown(self):
        executor = futures.ProcessPoolExecutor(max_workers=5)
        list(executor.map(abs, range(-5, 5)))
        queue_management_thread = executor._queue_management_thread
        processes = executor._processes
        del executor
        test.support.gc_collect()

        queue_management_thread.join()
        for p in processes:
            p.join()
au_dlink_ftp.py 文件源码 项目:DLink_Harvester 作者: MikimotoH 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    os.makedirs(localstor, exist_ok=True)
    with ProcessPoolExecutor() as executor:
        with ftputil.FTPHost('files.dlink.com.au', 'anonymous', '') as host:
            with open('au_dlink_ftp_filelist.csv', 'w') as fout:
                cw = csv.writer(fout)
                cw.writerow(["ftpurl", "fsize", "fdate", "file_sha1", "file_md5"])

            models = host.listdir('/Products/')
            for model in models:
                if not host.path.isdir('/Products/%(model)s'%locals()):
                    continue
                if not re.match(r'[A-Z]+', model, re.I):
                    continue
                revs = host.listdir('/Products/%(model)s/'%locals())
                for rev in revs:
                    if not re.match(r'REV_\w+', rev, re.I):
                        continue
                    try:
                        fwitems = host.listdir('/Products/%(model)s/%(rev)s/Firmware/'%locals())
                    except:
                        continue
                    try:
                        for fwitem in fwitems:
                            print('visiting /Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/'%locals())
                            try:
                                fw_files = host.path.listdir('/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/'%locals())
                                for fw_file in fw_files:
                                    host.keep_alive()
                                    executor.submit(download, '/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/%(fw_file)s'%locals())
                            except:
                                if host.path.isfile('/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s'%locals()):
                                    executor.submit(download,'/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s'%locals())
                    except Exception as ex:
                        print(ex)


问题


面经


文章

微信
公众号

扫码关注公众号