def test_concurrent_ocsp_requests(tmpdir):
from multiprocessing.pool import ThreadPool
cache_file_name = path.join(str(tmpdir), 'cache_file.txt')
urls = [
'sfc-dev1-regression.s3.amazonaws.com',
'sfctest0.snowflakecomputing.com',
'sfc-ds2-customer-stage.s3.amazonaws.com',
'snowflake.okta.com',
'sfcdev1.blob.core.windows.net',
]
ocsp_pyopenssl.OCSP_VALIDATION_CACHE = {} # reset the memory cache
urls = urls + urls + urls + urls + urls + urls
pool = ThreadPool(len(urls))
for url in urls:
pool.apply_async(_validate_certs_using_ocsp, [url, cache_file_name])
pool.close()
pool.join()
python类ThreadPool()的实例源码
def test_concurrent_ocsp_requests(tmpdir):
from multiprocessing.pool import ThreadPool
cache_file_name = path.join(str(tmpdir), 'cache_file.txt')
urls = [
'sfc-dev1-regression.s3.amazonaws.com',
'sfctest0.snowflakecomputing.com',
'sfc-ds2-customer-stage.s3.amazonaws.com',
'snowflake.okta.com',
'sfcdev1.blob.core.windows.net',
]
ocsp_pyopenssl.OCSP_VALIDATION_CACHE = {} # reset the memory cache
urls = urls + urls + urls + urls + urls + urls
pool = ThreadPool(len(urls))
for url in urls:
pool.apply_async(_validate_certs_using_ocsp, [url, cache_file_name])
pool.close()
pool.join()
def download(self, batch):
if self.driver_pool_size:
pool = Pool(processes=self.driver_pool_size)
else:
pool = Pool(processes=default_settings.DRIVER_POOL_SIZE)
results = []
for request in batch:
results.append(pool.apply_async(self.download_one, (request,)))
pool.close()
pool.join()
true_responses = []
for result in results:
true_response = result.get()
true_responses.append(true_response)
logger.info(true_response)
return true_responses
def spawn_docker_instances(self):
# Must create shared secret beforehand otherwise the
# testcase does not know which instances are relevant
ensure_shared_secret('cjdns')
spawn_command = "spawn --no-assimilate " \
"--server-type headless " \
"--compute-type docker"
pool = ThreadPool(self.workers)
for _ in range(self.amount_of_instances):
pool.apply_async(
run_raptiformica_command,
args=(self.temp_cache_dir, spawn_command)
)
sleep(20)
pool.close()
pool.join()
def spawn_docker_instances(self):
# Must create shared secret beforehand otherwise the
# testcase does not know which instances are relevant
ensure_shared_secret('cjdns')
spawn_command = "spawn --no-assimilate " \
"--server-type headless " \
"--compute-type docker"
pool = ThreadPool(self.workers)
for _ in range(self.amount_of_instances):
pool.apply_async(
run_raptiformica_command,
args=(self.temp_cache_dir, spawn_command)
)
sleep(20)
pool.close()
pool.join()
def join_consul_neighbours(mapping):
"""
Consul join all known neighbours. Will join as many instances at
the same time as threads in the threadpool.
:param dict mapping: Key value mapping with the config data
:return None:
"""
ipv6_addresses = get_neighbour_hosts(mapping)
shuffle(ipv6_addresses)
new_ipv6_addresses = list(
filter(not_already_known_consul_neighbour, ipv6_addresses)
)
pool = ThreadPool()
groups = group_n_elements(
new_ipv6_addresses, CONSUL_JOIN_BATCH_SIZE
)
for ipv6_addresses in groups:
pool.apply_async(try_run_consul_join, args=(ipv6_addresses,))
pool.close()
pool.join()
def nmap_scan(to_scan, slow=False, threads=None, threads_per_cpu=1):
"""Scans the specified networks using `nmap`.
The `to_scan` dictionary must be in the format:
{<interface-name>: <iterable-of-cidr-strings>, ...}
If the `slow` option is specified, will limit the maximum rate nmap
uses to send out packets.
"""
jobs = yield_nmap_parameters(to_scan, slow)
if threads is None:
threads = cpu_count() * threads_per_cpu
if threads == 1:
yield from (run_nmap(job) for job in jobs)
with ThreadPool(processes=threads) as pool:
yield from pool.imap_unordered(run_nmap, jobs)
def ping_scan(to_scan: dict, threads=None, threads_per_cpu=4):
"""Scans the specified networks using `ping`.
The `to_scan` dictionary must be in the format:
{<interface_name>: <iterable-of-cidr-strings>, ...}
If the `threads` argument is supplied, the specified number of threads
will be used for concurrent scanning. If threads=1 is specified, scanning
will use a single process (and be very slow).
"""
jobs = yield_ping_parameters(to_scan)
if threads is None:
threads = cpu_count() * threads_per_cpu
if threads == 1:
yield from (run_ping(job) for job in jobs)
else:
with ThreadPool(processes=threads) as pool:
yield from pool.imap(run_ping, jobs)
def _ResolveTombstones(jobs, tombstones, tombstone_symbolizer):
"""Resolve a list of tombstones.
Args:
jobs: the number of jobs to use with multithread.
tombstones: a list of tombstones.
"""
if not tombstones:
logging.warning('No tombstones to resolve.')
return []
tombstone_symbolizer.UnzipAPKIfNecessary()
if len(tombstones) == 1:
data = [_ResolveTombstone([tombstones[0], tombstone_symbolizer])]
else:
pool = ThreadPool(jobs)
data = pool.map(
_ResolveTombstone,
[[tombstone, tombstone_symbolizer] for tombstone in tombstones])
resolved_tombstones = []
for tombstone in data:
resolved_tombstones.extend(tombstone)
return resolved_tombstones
def __init__(self, img_dir, img_names, pre_process_img_func,
extract_feat_func, batch_size, num_threads,
multi_thread_stacking=False):
"""
Args:
extract_feat_func: External model for extracting features. It takes a
batch of images and returns a batch of features.
multi_thread_stacking: bool, whether to use multi threads to speed up
`np.stack()` or not. When the system is memory overburdened, using
`np.stack()` to stack a batch of images takes ridiculously long time.
E.g. it may take several seconds to stack a batch of 64 images.
"""
self.img_dir = img_dir
self.img_names = img_names
self.pre_process_img_func = pre_process_img_func
self.extract_feat_func = extract_feat_func
self.prefetcher = utils.Prefetcher(
self.get_sample, len(img_names), batch_size, num_threads=num_threads)
self.epoch_done = True
self.multi_thread_stacking = multi_thread_stacking
if multi_thread_stacking:
self.pool = Pool(processes=8)
def __init__(self, img_dir, img_names, pre_process_img_func,
extract_feat_func, batch_size, num_threads,
multi_thread_stacking=False):
"""
Args:
extract_feat_func: External model for extracting features. It takes a
batch of images and returns a batch of features.
multi_thread_stacking: bool, whether to use multi threads to speed up
`np.stack()` or not. When the system is memory overburdened, using
`np.stack()` to stack a batch of images takes ridiculously long time.
E.g. it may take several seconds to stack a batch of 64 images.
"""
self.img_dir = img_dir
self.img_names = img_names
self.pre_process_img_func = pre_process_img_func
self.extract_feat_func = extract_feat_func
self.prefetcher = utils.Prefetcher(
self.get_sample, len(img_names), batch_size, num_threads=num_threads)
self.epoch_done = True
self.multi_thread_stacking = multi_thread_stacking
if multi_thread_stacking:
self.pool = Pool(processes=8)
def run_tp(n, body):
"""ThreadPool.map"""
from multiprocessing.pool import ThreadPool
global reused_pool, numthreads
if 'reused_pool' not in globals():
log.debug("Creating ThreadPool(%s)" % numthreads)
reused_pool = ThreadPool(int(numthreads))
reused_pool.map(body, n)
def run_tpaa(n, body):
"""ThreadPool.apply_async"""
from multiprocessing.pool import ThreadPool
global reused_pool, numthreads
if 'reused_pool' not in globals():
log.debug("Creating ThreadPool(%s) for apply_async()" % numthreads)
reused_pool = ThreadPool(int(numthreads))
reused_pool.map(body, range(n))
wait_list = []
for i in n:
b = tbb_job(i, body)
a = reused_pool.apply_async(b)
wait_list.append(a)
for a in wait_list:
a.wait()
def Pool(processes=None, initializer=None, initargs=()):
from multiprocessing.pool import ThreadPool
return ThreadPool(processes, initializer, initargs)
def __init__(self, config):
super(ThreadPoolClient, self).__init__(config)
self.pool = ThreadPool(processes=config['kinesis_concurrency'])
def run():
from multiprocessing.pool import ThreadPool
session = SubprocessSession('/bin/cat', EchoWriter, EchoReader)
pool = ThreadPool(50)
requests = pool.map(lambda j: session.put('message %d' % j), xrange(2000))
results = pool.map(lambda r: r.get(), requests)
print results == ['message %d' % j for j in xrange(2000)]
def _get_thread_pool(self):
# lazily initialized
if not self._thread_pool:
self._thread_pool = ThreadPool(os.cpu_count())
return self._thread_pool
def Pool(processes=None, initializer=None, initargs=()):
from multiprocessing.pool import ThreadPool
return ThreadPool(processes, initializer, initargs)
def iter_packages(repo_path):
pkgbuild_paths = []
if os.path.isfile(repo_path) and os.path.basename(repo_path) == "PKGBUILD":
pkgbuild_paths.append(repo_path)
else:
print("Searching for PKGBUILD files in %s" % repo_path)
for base, dirs, files in os.walk(repo_path):
for f in files:
if f == "PKGBUILD":
# in case we find a PKGBUILD, don't go deeper
del dirs[:]
path = os.path.join(base, f)
pkgbuild_paths.append(path)
pkgbuild_paths.sort()
if not pkgbuild_paths:
print("No PKGBUILD files found here")
return
else:
print("Found %d PKGBUILD files" % len(pkgbuild_paths))
pool = ThreadPool(cpu_count() * 2)
pool_iter = pool.imap_unordered(SrcInfoPackage.for_pkgbuild, pkgbuild_paths)
print("Parsing PKGBUILD files...")
with progress(len(pkgbuild_paths)) as update:
for i, packages in enumerate(pool_iter):
update(i + 1)
for package in packages:
yield package
pool.close()
def main(args):
sources = {}
repo_path = os.path.abspath(args.path)
repo_packages = PacmanPackage.get_all_packages()
repo_package_names = set(p.pkgname for p in repo_packages)
for package in iter_packages(repo_path):
# only check packages which are in the repo, all others are many
# times broken in other ways.
if not args.all and package.pkgname not in repo_package_names:
continue
for source in package.sources:
url = source_get_url(source)
if url:
sources.setdefault(url, set()).add(package.pkgbuild_path)
print("Checking URLs...")
work_items = sources.items()
pool = ThreadPool(50)
pool_iter = pool.imap_unordered(_check_url, work_items)
broken = []
with progress(len(work_items)) as update:
for i, (url, pkgbuilds, error) in enumerate(pool_iter):
update(i + 1)
if error:
broken.append((url, pkgbuilds, error))
pool.close()
pool.join()
for url, pkgbuilds, error in broken:
print("\n%s\n %s\n %s" % (
url, " ".join(error.splitlines()), ", ".join(pkgbuilds)))