python类ThreadPool()的实例源码

test_ocsp.py 文件源码 项目:snowflake-connector-python 作者: snowflakedb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_concurrent_ocsp_requests(tmpdir):
    from multiprocessing.pool import ThreadPool

    cache_file_name = path.join(str(tmpdir), 'cache_file.txt')
    urls = [
        'sfc-dev1-regression.s3.amazonaws.com',
        'sfctest0.snowflakecomputing.com',
        'sfc-ds2-customer-stage.s3.amazonaws.com',
        'snowflake.okta.com',
        'sfcdev1.blob.core.windows.net',
    ]
    ocsp_pyopenssl.OCSP_VALIDATION_CACHE = {}  # reset the memory cache
    urls = urls + urls + urls + urls + urls + urls
    pool = ThreadPool(len(urls))
    for url in urls:
        pool.apply_async(_validate_certs_using_ocsp, [url, cache_file_name])
    pool.close()
    pool.join()
test_ocsp.py 文件源码 项目:snowflake-connector-python 作者: snowflakedb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_concurrent_ocsp_requests(tmpdir):
    from multiprocessing.pool import ThreadPool

    cache_file_name = path.join(str(tmpdir), 'cache_file.txt')
    urls = [
        'sfc-dev1-regression.s3.amazonaws.com',
        'sfctest0.snowflakecomputing.com',
        'sfc-ds2-customer-stage.s3.amazonaws.com',
        'snowflake.okta.com',
        'sfcdev1.blob.core.windows.net',
    ]
    ocsp_pyopenssl.OCSP_VALIDATION_CACHE = {}  # reset the memory cache
    urls = urls + urls + urls + urls + urls + urls
    pool = ThreadPool(len(urls))
    for url in urls:
        pool.apply_async(_validate_certs_using_ocsp, [url, cache_file_name])
    pool.close()
    pool.join()
selenium_downloader.py 文件源码 项目:Sasila 作者: DarkSand 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def download(self, batch):
        if self.driver_pool_size:
            pool = Pool(processes=self.driver_pool_size)
        else:
            pool = Pool(processes=default_settings.DRIVER_POOL_SIZE)

        results = []

        for request in batch:
            results.append(pool.apply_async(self.download_one, (request,)))
        pool.close()
        pool.join()

        true_responses = []
        for result in results:
            true_response = result.get()
            true_responses.append(true_response)
            logger.info(true_response)

        return true_responses
test_tree_cluster.py 文件源码 项目:raptiformica 作者: vdloo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def spawn_docker_instances(self):
        # Must create shared secret beforehand otherwise the
        # testcase does not know which instances are relevant
        ensure_shared_secret('cjdns')

        spawn_command = "spawn --no-assimilate " \
                        "--server-type headless " \
                        "--compute-type docker"
        pool = ThreadPool(self.workers)
        for _ in range(self.amount_of_instances):
            pool.apply_async(
                run_raptiformica_command,
                args=(self.temp_cache_dir, spawn_command)
            )
            sleep(20)
        pool.close()
        pool.join()
test_linked_cluster.py 文件源码 项目:raptiformica 作者: vdloo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def spawn_docker_instances(self):
        # Must create shared secret beforehand otherwise the
        # testcase does not know which instances are relevant
        ensure_shared_secret('cjdns')

        spawn_command = "spawn --no-assimilate " \
                        "--server-type headless " \
                        "--compute-type docker"
        pool = ThreadPool(self.workers)
        for _ in range(self.amount_of_instances):
            pool.apply_async(
                run_raptiformica_command,
                args=(self.temp_cache_dir, spawn_command)
            )
            sleep(20)
        pool.close()
        pool.join()
mesh.py 文件源码 项目:raptiformica 作者: vdloo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def join_consul_neighbours(mapping):
    """
    Consul join all known neighbours. Will join as many instances at
    the same time as threads in the threadpool.
    :param dict mapping: Key value mapping with the config data
    :return None:
    """
    ipv6_addresses = get_neighbour_hosts(mapping)
    shuffle(ipv6_addresses)
    new_ipv6_addresses = list(
        filter(not_already_known_consul_neighbour, ipv6_addresses)
    )
    pool = ThreadPool()
    groups = group_n_elements(
        new_ipv6_addresses, CONSUL_JOIN_BATCH_SIZE
    )
    for ipv6_addresses in groups:
        pool.apply_async(try_run_consul_join, args=(ipv6_addresses,))
    pool.close()
    pool.join()
scan_network.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def nmap_scan(to_scan, slow=False, threads=None, threads_per_cpu=1):
    """Scans the specified networks using `nmap`.

    The `to_scan` dictionary must be in the format:

        {<interface-name>: <iterable-of-cidr-strings>, ...}

    If the `slow` option is specified, will limit the maximum rate nmap
    uses to send out packets.
    """
    jobs = yield_nmap_parameters(to_scan, slow)
    if threads is None:
        threads = cpu_count() * threads_per_cpu
    if threads == 1:
        yield from (run_nmap(job) for job in jobs)
    with ThreadPool(processes=threads) as pool:
        yield from pool.imap_unordered(run_nmap, jobs)
scan_network.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ping_scan(to_scan: dict, threads=None, threads_per_cpu=4):
    """Scans the specified networks using `ping`.

    The `to_scan` dictionary must be in the format:

        {<interface_name>: <iterable-of-cidr-strings>, ...}

    If the `threads` argument is supplied, the specified number of threads
    will be used for concurrent scanning. If threads=1 is specified, scanning
    will use a single process (and be very slow).
    """
    jobs = yield_ping_parameters(to_scan)
    if threads is None:
        threads = cpu_count() * threads_per_cpu
    if threads == 1:
        yield from (run_ping(job) for job in jobs)
    else:
        with ThreadPool(processes=threads) as pool:
            yield from pool.imap(run_ping, jobs)
tombstones.py 文件源码 项目:nodenative 作者: nodenative 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _ResolveTombstones(jobs, tombstones, tombstone_symbolizer):
  """Resolve a list of tombstones.

  Args:
    jobs: the number of jobs to use with multithread.
    tombstones: a list of tombstones.
  """
  if not tombstones:
    logging.warning('No tombstones to resolve.')
    return []
  tombstone_symbolizer.UnzipAPKIfNecessary()
  if len(tombstones) == 1:
    data = [_ResolveTombstone([tombstones[0], tombstone_symbolizer])]
  else:
    pool = ThreadPool(jobs)
    data = pool.map(
        _ResolveTombstone,
        [[tombstone, tombstone_symbolizer] for tombstone in tombstones])
  resolved_tombstones = []
  for tombstone in data:
    resolved_tombstones.extend(tombstone)
  return resolved_tombstones
MTestIdLoss.py 文件源码 项目:Person-Re-ID 作者: zsjbook 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, img_dir, img_names, pre_process_img_func,
               extract_feat_func, batch_size, num_threads,
               multi_thread_stacking=False):
    """
    Args:
      extract_feat_func: External model for extracting features. It takes a 
        batch of images and returns a batch of features.
      multi_thread_stacking: bool, whether to use multi threads to speed up
        `np.stack()` or not. When the system is memory overburdened, using 
        `np.stack()` to stack a batch of images takes ridiculously long time.
        E.g. it may take several seconds to stack a batch of 64 images.
    """
    self.img_dir = img_dir
    self.img_names = img_names
    self.pre_process_img_func = pre_process_img_func
    self.extract_feat_func = extract_feat_func
    self.prefetcher = utils.Prefetcher(
      self.get_sample, len(img_names), batch_size, num_threads=num_threads)
    self.epoch_done = True
    self.multi_thread_stacking = multi_thread_stacking
    if multi_thread_stacking:
      self.pool = Pool(processes=8)
MTestIdLoss.py 文件源码 项目:Person-Re-ID 作者: zsjbook 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, img_dir, img_names, pre_process_img_func,
               extract_feat_func, batch_size, num_threads,
               multi_thread_stacking=False):
    """
    Args:
      extract_feat_func: External model for extracting features. It takes a 
        batch of images and returns a batch of features.
      multi_thread_stacking: bool, whether to use multi threads to speed up
        `np.stack()` or not. When the system is memory overburdened, using 
        `np.stack()` to stack a batch of images takes ridiculously long time.
        E.g. it may take several seconds to stack a batch of 64 images.
    """
    self.img_dir = img_dir
    self.img_names = img_names
    self.pre_process_img_func = pre_process_img_func
    self.extract_feat_func = extract_feat_func
    self.prefetcher = utils.Prefetcher(
      self.get_sample, len(img_names), batch_size, num_threads=num_threads)
    self.epoch_done = True
    self.multi_thread_stacking = multi_thread_stacking
    if multi_thread_stacking:
      self.pool = Pool(processes=8)
bench_stats.py 文件源码 项目:composability_bench 作者: IntelPython 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run_tp(n, body):
    """ThreadPool.map"""
    from multiprocessing.pool import ThreadPool
    global reused_pool, numthreads
    if 'reused_pool' not in globals():
        log.debug("Creating ThreadPool(%s)" % numthreads)
        reused_pool = ThreadPool(int(numthreads))
    reused_pool.map(body, n)
bench_stats.py 文件源码 项目:composability_bench 作者: IntelPython 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run_tpaa(n, body):
    """ThreadPool.apply_async"""
    from multiprocessing.pool import ThreadPool
    global reused_pool, numthreads
    if 'reused_pool' not in globals():
        log.debug("Creating ThreadPool(%s) for apply_async()" % numthreads)
        reused_pool = ThreadPool(int(numthreads))
    reused_pool.map(body, range(n))
    wait_list = []
    for i in n:
        b = tbb_job(i, body)
        a = reused_pool.apply_async(b)
        wait_list.append(a)
    for a in wait_list:
        a.wait()
__init__.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def Pool(processes=None, initializer=None, initargs=()):
    from multiprocessing.pool import ThreadPool
    return ThreadPool(processes, initializer, initargs)
client.py 文件源码 项目:kinesis_producer 作者: ludia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, config):
        super(ThreadPoolClient, self).__init__(config)
        self.pool = ThreadPool(processes=config['kinesis_concurrency'])
recipe-578488.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run():
    from multiprocessing.pool import ThreadPool

    session = SubprocessSession('/bin/cat', EchoWriter, EchoReader)

    pool = ThreadPool(50)
    requests = pool.map(lambda j: session.put('message %d' % j), xrange(2000))
    results = pool.map(lambda r: r.get(), requests)

    print results == ['message %d' % j for j in xrange(2000)]
remote_service.py 文件源码 项目:gemstone 作者: vladcalin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_thread_pool(self):
        # lazily initialized
        if not self._thread_pool:
            self._thread_pool = ThreadPool(os.cpu_count())
        return self._thread_pool
__init__.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def Pool(processes=None, initializer=None, initargs=()):
    from multiprocessing.pool import ThreadPool
    return ThreadPool(processes, initializer, initargs)
srcinfo.py 文件源码 项目:msys2-helpers 作者: lazka 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def iter_packages(repo_path):

    pkgbuild_paths = []
    if os.path.isfile(repo_path) and os.path.basename(repo_path) == "PKGBUILD":
        pkgbuild_paths.append(repo_path)
    else:
        print("Searching for PKGBUILD files in %s" % repo_path)
        for base, dirs, files in os.walk(repo_path):
            for f in files:
                if f == "PKGBUILD":
                    # in case we find a PKGBUILD, don't go deeper
                    del dirs[:]
                    path = os.path.join(base, f)
                    pkgbuild_paths.append(path)
        pkgbuild_paths.sort()

    if not pkgbuild_paths:
        print("No PKGBUILD files found here")
        return
    else:
        print("Found %d PKGBUILD files" % len(pkgbuild_paths))

    pool = ThreadPool(cpu_count() * 2)
    pool_iter = pool.imap_unordered(SrcInfoPackage.for_pkgbuild, pkgbuild_paths)
    print("Parsing PKGBUILD files...")
    with progress(len(pkgbuild_paths)) as update:
        for i, packages in enumerate(pool_iter):
            update(i + 1)
            for package in packages:
                yield package
    pool.close()
url_check.py 文件源码 项目:msys2-helpers 作者: lazka 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def main(args):
    sources = {}
    repo_path = os.path.abspath(args.path)

    repo_packages = PacmanPackage.get_all_packages()
    repo_package_names = set(p.pkgname for p in repo_packages)

    for package in iter_packages(repo_path):
        # only check packages which are in the repo, all others are many
        # times broken in other ways.
        if not args.all and package.pkgname not in repo_package_names:
            continue
        for source in package.sources:
            url = source_get_url(source)
            if url:
                sources.setdefault(url, set()).add(package.pkgbuild_path)

    print("Checking URLs...")
    work_items = sources.items()
    pool = ThreadPool(50)
    pool_iter = pool.imap_unordered(_check_url, work_items)
    broken = []
    with progress(len(work_items)) as update:
        for i, (url, pkgbuilds, error) in enumerate(pool_iter):
            update(i + 1)
            if error:
                broken.append((url, pkgbuilds, error))
    pool.close()
    pool.join()

    for url, pkgbuilds, error in broken:
        print("\n%s\n   %s\n   %s" % (
            url, " ".join(error.splitlines()), ", ".join(pkgbuilds)))


问题


面经


文章

微信
公众号

扫码关注公众号