python类as_completed()的实例源码

asg.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def tag(self, asgs, key, value):
        error = None
        with self.executor_factory(max_workers=3) as w:
            futures = {}
            for asg_set in chunks(asgs, self.batch_size):
                futures[w.submit(
                    self.process_asg_set, asg_set, key, value)] = asg_set
            for f in as_completed(futures):
                asg_set = futures[f]
                if f.exception():
                    self.log.exception(
                        "Exception untagging tag:%s error:%s asg:%s" % (
                            self.data.get('key', DEFAULT_TAG),
                            f.exception(),
                            ", ".join([a['AutoScalingGroupName']
                                       for a in asg_set])))
        if error:
            raise error
iam.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def process(self, resources, event=None):
        client = local_session(self.manager.session_factory).client('iam')
        with self.executor_factory(max_workers=2) as w:
            futures = []
            for user_set in chunks(
                    [r for r in resources if 'c7n:Groups' not in r], size=50):
                futures.append(
                    w.submit(self.get_user_groups, client, user_set))
            for f in as_completed(futures):
                pass

        matched = []
        for r in resources:
            for p in r['c7n:Groups']:
                if self.match(p) and r not in matched:
                    matched.append(r)
        return matched
elasticache.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def process(self, clusters):
        with self.executor_factory(max_workers=3) as w:
            futures = []
            for cluster in clusters:
                if not _cluster_eligible_for_snapshot(cluster):
                    continue
                futures.append(w.submit(
                    self.process_cluster_snapshot,
                    cluster))

            for f in as_completed(futures):
                if f.exception():
                    self.log.error(
                        "Exception creating cache cluster snapshot \n %s",
                        f.exception())
        return clusters
tags.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _common_tag_processer(executor_factory, batch_size, concurrency,
                          process_resource_set, id_key, resources, tags,
                          log):

    with executor_factory(max_workers=concurrency) as w:
        futures = []
        for resource_set in utils.chunks(resources, size=batch_size):
            futures.append(
                w.submit(process_resource_set, resource_set, tags))

        for f in as_completed(futures):
            if f.exception():
                log.error(
                    "Exception with tags: %s on resources: %s \n %s" % (
                        tags,
                        ", ".join([r[id_key] for r in resource_set]),
                        f.exception()))
tags.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process(self, resources):
        count = len(resources)
        resources = self.filter_resources(resources)
        self.log.info(
            "Filtered from %s resources to %s" % (count, len(resources)))
        self.id_key = self.manager.get_model().id
        resource_set = self.create_set(resources)
        with self.executor_factory(max_workers=3) as w:
            futures = []
            for r in resource_set:
                futures.append(
                    w.submit(self.process_rename, r, resource_set[r]))
            for f in as_completed(futures):
                if f.exception():
                    self.log.error(
                        "Exception renaming tag set \n %s" % (
                            f.exception()))
        return resources
exporter.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run(config, start, end, accounts):
    """run export across accounts and log groups specified in config."""
    config = validate.callback(config)
    destination = config.get('destination')
    start = start and parse(start) or start
    end = end and parse(end) or datetime.now()
    executor = debug and MainThreadExecutor or ThreadPoolExecutor
    with executor(max_workers=32) as w:
        futures = {}
        for account in config.get('accounts', ()):
            if accounts and account['name'] not in accounts:
                continue
            futures[
                w.submit(process_account, account, start, end, destination)] = account
        for f in as_completed(futures):
            account = futures[f]
            if f.exception():
                log.error("Error on account %s err: %s",
                          account['name'], f.exception())
            log.info("Completed %s", account['name'])
downloader.py 文件源码 项目:mobileinsight-mobile 作者: mobile-insight 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _check_executor(self, dt):
        start = time()
        try:
            for future in as_completed(self._futures[:], 0):
                self._futures.remove(future)
                try:
                    result = future.result()
                except Exception:
                    traceback.print_exc()
                    # make an error tile?
                    continue
                if result is None:
                    continue
                callback, args = result
                callback(*args)

                # capped executor in time, in order to prevent too much
                # slowiness.
                # seems to works quite great with big zoom-in/out
                if time() - start > self.cap_time:
                    break
        except TimeoutError:
            pass
youtube_bb.py 文件源码 项目:youtube-bb 作者: mbuckler 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def sched_downloads(d_set,dl_dir,num_threads,vids):
  d_set_dir = dl_dir+'/'+d_set+'/'

  # Make the directory for this dataset
  check_call(' '.join(['mkdir', '-p', d_set_dir]), shell=True)

  # Tell the user when downloads were started
  datetime.now().strftime("%Y-%m-%d %H:%M:%S")

  # Download and cut in parallel threads giving
  with futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
    fs = [executor.submit(dl_and_cut,vid) for vid in vids]
    for i, f in enumerate(futures.as_completed(fs)):
      # Write progress to error so that it can be seen
      sys.stderr.write( \
        "Downloaded video: {} / {} \r".format(i, len(vids)))

  print( d_set+': All videos downloaded' )
slinkie.py 文件源码 项目:slinkie 作者: segfaultsourcery 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def parallelize(self, fn, number_of_threads=None):
        """
        Parallelize a function call. Number of threads defaults to your cpu count + 1.
        """

        number_of_threads = number_of_threads or (cpu_count() + 1)

        def _inner():
            with ThreadPoolExecutor(number_of_threads) as tpe:
                tasks = [tpe.submit(fn, item) for item in self._items]
                for future in as_completed(tasks):
                    try:
                        yield future.result()
                    except Exception as exception:
                        yield exception

        return Slinkie(_inner())
upload_arm_templates.py 文件源码 项目:azure-cli 作者: Azure 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def upload_template_files(*args):

    print('\n== UPLOAD ARM TEMPLATES ==')

    parser = argparse.ArgumentParser(description='Upload ARM Templates')
    parser.add_argument('--name', metavar='NAME', required=True, help='Name of the thing being uploaded (in CamelCase)')
    parser.add_argument('--src', metavar='PATH', required=True, help='Path to the directory containing ARM templates to upload. Subdirectories will automatically be crawled.')
    parser.add_argument('--api-version', metavar='VERSION', required=True, help='API version for the templates being uploaded in yyyy-MM-dd format. (ex: 2016-07-01)')
    args = parser.parse_args(args)

    name = args.name
    api_version = args.api_version
    src = args.src

    _upload_templates(name, api_version, src)

    from concurrent.futures import ThreadPoolExecutor, as_completed
    with ThreadPoolExecutor(max_workers=40) as executor:
        tasks = [executor.submit(lambda cmd: os.system(cmd), u) for u in uploads]
        for t in as_completed(tasks):
            t.result() # don't use the result but expose exceptions from the threads
__init__.py 文件源码 项目:azure-cli 作者: Azure 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def cli_build(args):
    assert check_output(['docker', 'ps']), "Docker required."
    build_types = args.build_types
    git_url = args.git_clone_url
    git_branch = args.git_clone_branch
    cli_version = args.cli_version
    artifact_dir = tempfile.mkdtemp(prefix='cli-build-{}-'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')), dir=os.getcwd())
    if len(build_types) == 1 and build_types[0] == '*':
        build_types = BUILD_TYPES
    print_heading('Building for {} from branch {} of {} '
                  'and version number will be {}\n'
                  'Build artifacts will be in {}'.format(', '.join(build_types), git_branch, git_url, cli_version, artifact_dir))
    from concurrent.futures import ThreadPoolExecutor, as_completed
    with ThreadPoolExecutor(max_workers=len(build_types)) as executor:
        tasks = {executor.submit(build_dispatch, bt, git_url, git_branch, cli_version, artifact_dir, arg_ns=args) for bt in build_types}
        for t in as_completed(tasks):
            t.result()
    print('Done.')
chromeboy.py 文件源码 项目:falsy 作者: pingf 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self, data, max=4):
        results = []
        with futures.ThreadPoolExecutor(max_workers=max) as executor:
            future_to_url = {}
            for i, payload in enumerate(data):
                payload['chrome_id'] = i
                future_to_url[executor.submit(self.run1, payload)] = payload
                # future_to_url[executor.submit(self.run1_core, payload, browser, begin_time)] = payload
            for future in futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (url, exc))
                else:
                    data['chrome_id'] = url['chrome_id']
                    results.append(data)

        sorted_results = sorted(results, key=lambda tup: tup['chrome_id'])
        return sorted_results
converters.py 文件源码 项目:fmrif_tools 作者: nih-fmrif 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def map_to_bids(self, bids_map, bids_dir, dicom_dir, biopac_dir, nthreads, overwrite):

        # Parse bids_map csv table, and create execution list for BIDS generation
        mapping = pd.read_csv(bids_map, header=0, index_col=None)
        mapping.replace(np.nan, '', regex=True, inplace=True)

        with ThreadPoolExecutor(max_workers=nthreads) as executor:

            futures = []

            for _, row in mapping.iterrows():
                futures.append(executor.submit(self._process_map_row, row, bids_dir, dicom_dir, self.conversion_tool,
                                               biopac_dir, overwrite))

            success = True

            for future in as_completed(futures):

                if not future.result():
                    success = False
                    break

            if not success:
                self.log.error("There were errors converting the provided datasets to BIDS format. See log for more" 
                               " information.")
ultrachronic.py 文件源码 项目:ultrachronic 作者: yoavram 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def repeat(f, reps, cpus, **kwargs):
    if reps == 1:
        f(**kwargs)
        return
    fname = f.__name__
    print("Starting {} {} times with:".format(fname, reps))
    print(kwargs)
    if cpus == 1:
        for _ in range(reps):
            try:
                f(**kwargs)
            except Exception as e:
                warnings.warn(str(e))
    else:
        from multiprocessing import cpu_count
        from concurrent.futures import ProcessPoolExecutor, as_completed
        if cpus < 1:
            cpus = cpu_count()
        with ProcessPoolExecutor(cpus) as executor:
            futures = [executor.submit(f, **kwargs) for _ in range(reps)]
        for fut in as_completed(futures):
            if fut.exception():
                warnings.warn(str(fut.exception()))
    print("Finished")
test_concurrent_futures.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_zero_timeout(self):
        future1 = self.executor.submit(time.sleep, 2)
        completed_futures = set()
        try:
            for future in futures.as_completed(
                    [CANCELLED_AND_NOTIFIED_FUTURE,
                     EXCEPTION_FUTURE,
                     SUCCESSFUL_FUTURE,
                     future1],
                    timeout=0):
                completed_futures.add(future)
        except futures.TimeoutError:
            pass

        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
                              EXCEPTION_FUTURE,
                              SUCCESSFUL_FUTURE]),
                         completed_futures)
concurrency.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def concurrent_find(func, params, **kw):
    timeout = kw.pop("concurrent_timeout", None)
    with async(func, list(params), **kw) as futures:
        future = None
        try:
            for future in futures.as_completed(timeout=timeout):
                if not future.exception() and future.result():
                    futures.kill()
                    return future.result()
            else:
                if future:
                    return future.result()
        except FutureTimeoutError as exc:
            if not timeout:
                # ??
                raise
            futures.kill()
            _logger.warning("Concurrent future timed out (%s)", exc)
test_concurrent_futures.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_zero_timeout(self):
        future1 = self.executor.submit(time.sleep, 2)
        completed_futures = set()
        try:
            for future in futures.as_completed(
                    [CANCELLED_AND_NOTIFIED_FUTURE,
                     EXCEPTION_FUTURE,
                     SUCCESSFUL_FUTURE,
                     future1],
                    timeout=0):
                completed_futures.add(future)
        except futures.TimeoutError:
            pass

        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
                              EXCEPTION_FUTURE,
                              SUCCESSFUL_FUTURE]),
                         completed_futures)
warned_stock.py 文件源码 项目:yQuant 作者: yoonbae81 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main(keys):
    t0 = time.time()

    executor = futures.ThreadPoolExecutor(max_workers=len(keys))

    to_do = []
    for key in keys:
        config = copy.deepcopy(krx.load_config(key))
        future = executor.submit(download, key, config)
        to_do.append(future)

    done = 0
    total_records = 0
    for future in futures.as_completed(to_do):
        done += 1
        key, records = future.result()
        total_records += records
        print(f'[{done:,d}/{len(keys):,d}] {key} records fetched {records:>8,} records', file=sys.stderr)

    elapsed = time.time() - t0
    print(f'{total_records:,} Records fetched in {elapsed:.2f}s', file=sys.stderr)
value_stock.py 文件源码 项目:yQuant 作者: yoonbae81 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main(keys):
    t0 = time.time()

    ARGS = dict(zip(['command', 'date'], sys.argv))
    date = ARGS.get('date', f'{datetime.date.today():%Y%m%d}')

    executor = futures.ThreadPoolExecutor(max_workers=len(keys))

    to_do = []
    for key in keys:
        config = copy.deepcopy(krx.load_config(key))
        config['contents']['data']['schdate'] = date
        future = executor.submit(download, key, config)
        to_do.append(future)

    done = 0
    total_records = 0
    for future in futures.as_completed(to_do):
        done += 1
        key, records = future.result()
        total_records += records
        print(f'[{done:,d}/{len(keys):,d}] {key} records fetched {records:>8,} records', file=sys.stderr)

    elapsed = time.time() - t0
    print(f'{total_records:,} Records fetched in {elapsed:.2f}s', file=sys.stderr)
collector.py 文件源码 项目:llama 作者: dropbox 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def collect(self, count, dst_port=util.DEFAULT_DST_PORT,
                timeout=util.DEFAULT_TIMEOUT):
        """Collects latency against a set of hosts.

        Args:
            count: (int) number of datagrams to send each host
            timeout: (float) seconds to wait for probes to return
        """
        jobs = []
        with futures.ThreadPoolExecutor(max_workers=50) as executor:
            for host in self.metrics.keys():
                logging.info('Assigning target host: %s', host)
                jobs.append(executor.submit(self.method, host,
                                            count=count,
                                            port=dst_port,
                                            timeout=timeout,
                                           ))
        for job in futures.as_completed(jobs):
            loss, rtt, host = job.result()
            self.metrics[host].loss = loss
            self.metrics[host].rtt = rtt
            logging.info('Summary {:16}:{:>3}% loss, {:>4} ms rtt'.format(
                host, loss, rtt))


问题


面经


文章

微信
公众号

扫码关注公众号