def tag(self, asgs, key, value):
error = None
with self.executor_factory(max_workers=3) as w:
futures = {}
for asg_set in chunks(asgs, self.batch_size):
futures[w.submit(
self.process_asg_set, asg_set, key, value)] = asg_set
for f in as_completed(futures):
asg_set = futures[f]
if f.exception():
self.log.exception(
"Exception untagging tag:%s error:%s asg:%s" % (
self.data.get('key', DEFAULT_TAG),
f.exception(),
", ".join([a['AutoScalingGroupName']
for a in asg_set])))
if error:
raise error
python类as_completed()的实例源码
def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client('iam')
with self.executor_factory(max_workers=2) as w:
futures = []
for user_set in chunks(
[r for r in resources if 'c7n:Groups' not in r], size=50):
futures.append(
w.submit(self.get_user_groups, client, user_set))
for f in as_completed(futures):
pass
matched = []
for r in resources:
for p in r['c7n:Groups']:
if self.match(p) and r not in matched:
matched.append(r)
return matched
def process(self, clusters):
with self.executor_factory(max_workers=3) as w:
futures = []
for cluster in clusters:
if not _cluster_eligible_for_snapshot(cluster):
continue
futures.append(w.submit(
self.process_cluster_snapshot,
cluster))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception creating cache cluster snapshot \n %s",
f.exception())
return clusters
def _common_tag_processer(executor_factory, batch_size, concurrency,
process_resource_set, id_key, resources, tags,
log):
with executor_factory(max_workers=concurrency) as w:
futures = []
for resource_set in utils.chunks(resources, size=batch_size):
futures.append(
w.submit(process_resource_set, resource_set, tags))
for f in as_completed(futures):
if f.exception():
log.error(
"Exception with tags: %s on resources: %s \n %s" % (
tags,
", ".join([r[id_key] for r in resource_set]),
f.exception()))
def process(self, resources):
count = len(resources)
resources = self.filter_resources(resources)
self.log.info(
"Filtered from %s resources to %s" % (count, len(resources)))
self.id_key = self.manager.get_model().id
resource_set = self.create_set(resources)
with self.executor_factory(max_workers=3) as w:
futures = []
for r in resource_set:
futures.append(
w.submit(self.process_rename, r, resource_set[r]))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception renaming tag set \n %s" % (
f.exception()))
return resources
def run(config, start, end, accounts):
"""run export across accounts and log groups specified in config."""
config = validate.callback(config)
destination = config.get('destination')
start = start and parse(start) or start
end = end and parse(end) or datetime.now()
executor = debug and MainThreadExecutor or ThreadPoolExecutor
with executor(max_workers=32) as w:
futures = {}
for account in config.get('accounts', ()):
if accounts and account['name'] not in accounts:
continue
futures[
w.submit(process_account, account, start, end, destination)] = account
for f in as_completed(futures):
account = futures[f]
if f.exception():
log.error("Error on account %s err: %s",
account['name'], f.exception())
log.info("Completed %s", account['name'])
def _check_executor(self, dt):
start = time()
try:
for future in as_completed(self._futures[:], 0):
self._futures.remove(future)
try:
result = future.result()
except Exception:
traceback.print_exc()
# make an error tile?
continue
if result is None:
continue
callback, args = result
callback(*args)
# capped executor in time, in order to prevent too much
# slowiness.
# seems to works quite great with big zoom-in/out
if time() - start > self.cap_time:
break
except TimeoutError:
pass
def sched_downloads(d_set,dl_dir,num_threads,vids):
d_set_dir = dl_dir+'/'+d_set+'/'
# Make the directory for this dataset
check_call(' '.join(['mkdir', '-p', d_set_dir]), shell=True)
# Tell the user when downloads were started
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Download and cut in parallel threads giving
with futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
fs = [executor.submit(dl_and_cut,vid) for vid in vids]
for i, f in enumerate(futures.as_completed(fs)):
# Write progress to error so that it can be seen
sys.stderr.write( \
"Downloaded video: {} / {} \r".format(i, len(vids)))
print( d_set+': All videos downloaded' )
def parallelize(self, fn, number_of_threads=None):
"""
Parallelize a function call. Number of threads defaults to your cpu count + 1.
"""
number_of_threads = number_of_threads or (cpu_count() + 1)
def _inner():
with ThreadPoolExecutor(number_of_threads) as tpe:
tasks = [tpe.submit(fn, item) for item in self._items]
for future in as_completed(tasks):
try:
yield future.result()
except Exception as exception:
yield exception
return Slinkie(_inner())
def upload_template_files(*args):
print('\n== UPLOAD ARM TEMPLATES ==')
parser = argparse.ArgumentParser(description='Upload ARM Templates')
parser.add_argument('--name', metavar='NAME', required=True, help='Name of the thing being uploaded (in CamelCase)')
parser.add_argument('--src', metavar='PATH', required=True, help='Path to the directory containing ARM templates to upload. Subdirectories will automatically be crawled.')
parser.add_argument('--api-version', metavar='VERSION', required=True, help='API version for the templates being uploaded in yyyy-MM-dd format. (ex: 2016-07-01)')
args = parser.parse_args(args)
name = args.name
api_version = args.api_version
src = args.src
_upload_templates(name, api_version, src)
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=40) as executor:
tasks = [executor.submit(lambda cmd: os.system(cmd), u) for u in uploads]
for t in as_completed(tasks):
t.result() # don't use the result but expose exceptions from the threads
def cli_build(args):
assert check_output(['docker', 'ps']), "Docker required."
build_types = args.build_types
git_url = args.git_clone_url
git_branch = args.git_clone_branch
cli_version = args.cli_version
artifact_dir = tempfile.mkdtemp(prefix='cli-build-{}-'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')), dir=os.getcwd())
if len(build_types) == 1 and build_types[0] == '*':
build_types = BUILD_TYPES
print_heading('Building for {} from branch {} of {} '
'and version number will be {}\n'
'Build artifacts will be in {}'.format(', '.join(build_types), git_branch, git_url, cli_version, artifact_dir))
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=len(build_types)) as executor:
tasks = {executor.submit(build_dispatch, bt, git_url, git_branch, cli_version, artifact_dir, arg_ns=args) for bt in build_types}
for t in as_completed(tasks):
t.result()
print('Done.')
def run(self, data, max=4):
results = []
with futures.ThreadPoolExecutor(max_workers=max) as executor:
future_to_url = {}
for i, payload in enumerate(data):
payload['chrome_id'] = i
future_to_url[executor.submit(self.run1, payload)] = payload
# future_to_url[executor.submit(self.run1_core, payload, browser, begin_time)] = payload
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
data['chrome_id'] = url['chrome_id']
results.append(data)
sorted_results = sorted(results, key=lambda tup: tup['chrome_id'])
return sorted_results
def map_to_bids(self, bids_map, bids_dir, dicom_dir, biopac_dir, nthreads, overwrite):
# Parse bids_map csv table, and create execution list for BIDS generation
mapping = pd.read_csv(bids_map, header=0, index_col=None)
mapping.replace(np.nan, '', regex=True, inplace=True)
with ThreadPoolExecutor(max_workers=nthreads) as executor:
futures = []
for _, row in mapping.iterrows():
futures.append(executor.submit(self._process_map_row, row, bids_dir, dicom_dir, self.conversion_tool,
biopac_dir, overwrite))
success = True
for future in as_completed(futures):
if not future.result():
success = False
break
if not success:
self.log.error("There were errors converting the provided datasets to BIDS format. See log for more"
" information.")
def repeat(f, reps, cpus, **kwargs):
if reps == 1:
f(**kwargs)
return
fname = f.__name__
print("Starting {} {} times with:".format(fname, reps))
print(kwargs)
if cpus == 1:
for _ in range(reps):
try:
f(**kwargs)
except Exception as e:
warnings.warn(str(e))
else:
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor, as_completed
if cpus < 1:
cpus = cpu_count()
with ProcessPoolExecutor(cpus) as executor:
futures = [executor.submit(f, **kwargs) for _ in range(reps)]
for fut in as_completed(futures):
if fut.exception():
warnings.warn(str(fut.exception()))
print("Finished")
def test_zero_timeout(self):
future1 = self.executor.submit(time.sleep, 2)
completed_futures = set()
try:
for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1],
timeout=0):
completed_futures.add(future)
except futures.TimeoutError:
pass
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
completed_futures)
def concurrent_find(func, params, **kw):
timeout = kw.pop("concurrent_timeout", None)
with async(func, list(params), **kw) as futures:
future = None
try:
for future in futures.as_completed(timeout=timeout):
if not future.exception() and future.result():
futures.kill()
return future.result()
else:
if future:
return future.result()
except FutureTimeoutError as exc:
if not timeout:
# ??
raise
futures.kill()
_logger.warning("Concurrent future timed out (%s)", exc)
def test_zero_timeout(self):
future1 = self.executor.submit(time.sleep, 2)
completed_futures = set()
try:
for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1],
timeout=0):
completed_futures.add(future)
except futures.TimeoutError:
pass
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
completed_futures)
def main(keys):
t0 = time.time()
executor = futures.ThreadPoolExecutor(max_workers=len(keys))
to_do = []
for key in keys:
config = copy.deepcopy(krx.load_config(key))
future = executor.submit(download, key, config)
to_do.append(future)
done = 0
total_records = 0
for future in futures.as_completed(to_do):
done += 1
key, records = future.result()
total_records += records
print(f'[{done:,d}/{len(keys):,d}] {key} records fetched {records:>8,} records', file=sys.stderr)
elapsed = time.time() - t0
print(f'{total_records:,} Records fetched in {elapsed:.2f}s', file=sys.stderr)
def main(keys):
t0 = time.time()
ARGS = dict(zip(['command', 'date'], sys.argv))
date = ARGS.get('date', f'{datetime.date.today():%Y%m%d}')
executor = futures.ThreadPoolExecutor(max_workers=len(keys))
to_do = []
for key in keys:
config = copy.deepcopy(krx.load_config(key))
config['contents']['data']['schdate'] = date
future = executor.submit(download, key, config)
to_do.append(future)
done = 0
total_records = 0
for future in futures.as_completed(to_do):
done += 1
key, records = future.result()
total_records += records
print(f'[{done:,d}/{len(keys):,d}] {key} records fetched {records:>8,} records', file=sys.stderr)
elapsed = time.time() - t0
print(f'{total_records:,} Records fetched in {elapsed:.2f}s', file=sys.stderr)
def collect(self, count, dst_port=util.DEFAULT_DST_PORT,
timeout=util.DEFAULT_TIMEOUT):
"""Collects latency against a set of hosts.
Args:
count: (int) number of datagrams to send each host
timeout: (float) seconds to wait for probes to return
"""
jobs = []
with futures.ThreadPoolExecutor(max_workers=50) as executor:
for host in self.metrics.keys():
logging.info('Assigning target host: %s', host)
jobs.append(executor.submit(self.method, host,
count=count,
port=dst_port,
timeout=timeout,
))
for job in futures.as_completed(jobs):
loss, rtt, host = job.result()
self.metrics[host].loss = loss
self.metrics[host].rtt = rtt
logging.info('Summary {:16}:{:>3}% loss, {:>4} ms rtt'.format(
host, loss, rtt))