def test_zero_timeout(self):
future1 = self.executor.submit(time.sleep, 2)
completed_futures = set()
try:
for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1],
timeout=0):
completed_futures.add(future)
except futures.TimeoutError:
pass
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
completed_futures)
python类as_completed()的实例源码
def get_elb_bucket_locations(self):
elbs = self.manager.get_resource_manager('elb').resources()
get_elb_attrs = functools.partial(
_query_elb_attrs, self.manager.session_factory)
with self.executor_factory(max_workers=2) as w:
futures = []
for elb_set in chunks(elbs, 100):
futures.append(w.submit(get_elb_attrs, elb_set))
for f in as_completed(futures):
if f.exception():
log.error("Error while scanning elb log targets: %s" % (
f.exception()))
continue
for tgt in f.result():
yield tgt
def _process_with_futures(self, helper, buckets, max_workers=3):
results = []
with self.executor_factory(max_workers) as w:
futures = {}
for b in buckets:
futures[w.submit(helper, b)] = b
for f in as_completed(futures):
if f.exception():
b = futures[f]
self.log.error(
"Error on bucket:%s region:%s policy:%s error: %s",
b['Name'], b.get('Location', 'unknown'),
self.manager.data.get('name'), f.exception())
self.denied_buckets.add(b['Name'])
continue
result = f.result()
if result:
results.append(result)
return results
def get_elb_bucket_locations(self):
elbs = self.manager.get_resource_manager('elb').resources()
get_elb_attrs = functools.partial(
_query_elb_attrs, self.manager.session_factory)
with self.executor_factory(max_workers=2) as w:
futures = []
for elb_set in chunks(elbs, 100):
futures.append(w.submit(get_elb_attrs, elb_set))
for f in as_completed(futures):
if f.exception():
log.error("Error while scanning elb log targets: %s" % (
f.exception()))
continue
for tgt in f.result():
yield tgt
def process(self, buckets):
with self.executor_factory(max_workers=3) as w:
futures = {}
results = []
for b in buckets:
futures[w.submit(self.process_bucket, b)] = b
for future in as_completed(futures):
if future.exception():
bucket = futures[future]
self.log.error('error modifying bucket lifecycle: %s\n%s',
bucket['Name'], future.exception())
results += filter(None, [future.result()])
return results
def main(workers=None):
if workers:
workers = int(workers)
t0 = time.time()
with futures.ProcessPoolExecutor(workers) as executor:
actual_workers = executor._max_workers
to_do = []
for i in range(JOBS, 0, -1):
size = SIZE + int(SIZE / JOBS * (i - JOBS/2))
job = executor.submit(arcfour_test, size, KEY)
to_do.append(job)
for future in futures.as_completed(to_do):
res = future.result()
print('{:.1f} KB'.format(res/2**10))
print(STATUS.format(actual_workers, time.time() - t0))
def download_many(cc_list):
cc_list = cc_list[:5] # <1>
with futures.ThreadPoolExecutor(max_workers=3) as executor: # <2>
to_do = []
for cc in sorted(cc_list): # <3>
future = executor.submit(download_one, cc) # <4>
to_do.append(future) # <5>
msg = 'Scheduled for {}: {}'
print(msg.format(cc, future)) # <6>
results = []
for future in futures.as_completed(to_do): # <7>
res = future.result() # <8>
msg = '{} result: {!r}'
print(msg.format(future, res)) # <9>
results.append(res)
return len(results)
# END FLAGS_THREADPOOL_AS_COMPLETED
def save_month(year_month, verbose):
year, month = [int(s) for s in year_month.split('-')]
total_size = 0
img_count = 0
dates = potd.list_days_of_month(year, month)
with futures.ProcessPoolExecutor(max_workers=100) as executor:
downloads = dict((executor.submit(potd.save_one, date, verbose), date)
for date in dates)
for future in futures.as_completed(downloads):
date = downloads[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (date,
future.exception()))
else:
img_size = future.result()
total_size += img_size
img_count += 1
print('%r OK: %r' % (date, img_size))
return img_count, total_size
def save_month(year_month, verbose):
year, month = [int(s) for s in year_month.split('-')]
total_size = 0
img_count = 0
dates = potd.list_days_of_month(year, month)
with futures.ThreadPoolExecutor(max_workers=100) as executor:
downloads = dict((executor.submit(potd.save_one, date, verbose), date)
for date in dates)
for future in futures.as_completed(downloads):
date = downloads[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (date,
future.exception()))
else:
img_size = future.result()
total_size += img_size
img_count += 1
print('%r OK: %r' % (date, img_size))
return img_count, total_size
def test_execution_concurrency_no_scale(self):
self._create_function(name='test_python_sleep.py')
def _create_execution():
resp, body = self.client.create_execution(self.function_id)
return resp, body
futs = []
with futurist.ThreadPoolExecutor(max_workers=10) as executor:
for _ in range(3):
fut = executor.submit(_create_execution)
futs.append(fut)
for f in futures.as_completed(futs):
# Wait until we get the response
resp, body = f.result()
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('success', body['status'])
resp, body = self.admin_client.get_function_workers(self.function_id)
self.assertEqual(200, resp.status)
self.assertEqual(1, len(body['workers']))
def test_execution_concurrency_scale_up(self):
self.await_runtime_available(self.runtime_id)
self._create_function(name='test_python_sleep.py')
def _create_execution():
resp, body = self.client.create_execution(self.function_id)
return resp, body
futs = []
with futurist.ThreadPoolExecutor(max_workers=10) as executor:
for _ in range(6):
fut = executor.submit(_create_execution)
futs.append(fut)
for f in futures.as_completed(futs):
# Wait until we get the response
resp, body = f.result()
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('success', body['status'])
resp, body = self.admin_client.get_function_workers(self.function_id)
self.assertEqual(200, resp.status)
self.assertEqual(2, len(body['workers']))
def _check_executor(self, dt):
start = time()
try:
for future in as_completed(self._futures[:], 0):
self._futures.remove(future)
try:
result = future.result()
except:
traceback.print_exc()
# make an error tile?
continue
if result is None:
continue
callback, args = result
callback(*args)
# capped executor in time, in order to prevent too much slowiness.
# seems to works quite great with big zoom-in/out
if time() - start > self.cap_time:
break
except TimeoutError:
pass
def getloc():
allloc = []
u"""??????api???????????
http://lbs.amap.com/api/webservice/guide/api/search/#text
"""
with ThreadPoolExecutor(max_workers=5) as executor:
url = 'http://lbs.amap.com/api/webservice/guide/api/search/#text'
param = {
'key': '22d6f93f929728c10ed86258653ae14a',
'keywords': u'??',
'city': '027',
'citylimit': 'true',
'output': 'json',
'page': '',
}
future_to_url = {executor.submit(load_url, url, merge_dicts(param, {'page': i}), 60): url for i in range(1, 46)}
for future in futures.as_completed(future_to_url):
if future.exception() is not None:
print(future.exception())
elif future.done():
data = future.result()['pois']
allloc.extend([x['location'] for x in data])
with open('allloc1.pk', 'wb') as f:
pickle.dump(allloc, f, True)
def mobai(loc):
allmobai = []
with ThreadPoolExecutor(max_workers=5) as executor:
url = 'https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do'
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Mobile/14E304 MicroMessenger/6.5.7 NetType/WIFI Language/zh_CN',
'Content-Type': 'application/x-www-form-urlencoded',
'Referer': 'https://servicewechat.com/wx80f809371ae33eda/23/page-frame.html',
}
data = {
'longitude': '',
'latitude': '',
'citycode': '027',
}
future_to_url = {
executor.submit(load_url, url, merge_dicts(data, {'longitude': i.split(',')[0]}, {'latitude': i.split(',')[1]}), 60, headers): url for i in loc}
for future in futures.as_completed(future_to_url):
if future.exception() is not None:
print(future.exception())
elif future.done():
data = future.result()['object']
allmobai.extend(data)
# ??mongodb
result = collection.insert_many(data)
def run(self):
"""Concurrently invoke `get_response` for all of instance's `requests`.
"""
with futures.ThreadPoolExecutor(
max_workers=min(self.max_workers, len(self.requests))
) as executor:
to_do = []
for i, request in enumerate(self.requests):
future = executor.submit(self.get_response, request, i)
to_do.append(future)
for future in futures.as_completed(to_do):
result = future.result()
# `responses` and `pending_requests` are instance properties, which means
# client code can inspect instance to read responses as they are completed
if result.req.error is not None or result.err == 'skwarg':
continue
try:
self.pending_requests.remove(result.req)
except KeyError:
print('{} was not in pending requests, this is weird...'.format(result.req))
self.responses.append(result)
self.is_done = True
def test_zero_timeout(self):
future1 = self.executor.submit(time.sleep, 2)
completed_futures = set()
try:
for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1],
timeout=0):
completed_futures.add(future)
except futures.TimeoutError:
pass
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
completed_futures)
def get_node_health_mt(nodes_dict, check_type="normal", n_threads=8, print_out=False):
"""use multithreading to check each node health
Arguments:
nodes_dict {dict} -- [nodesIP(domainName)->(username, mem, CPU)]
Keyword Arguments:
check_type {str} -- [description] (default: {"normal"})
n_threads {number} -- [description] (default: {8})
"""
with ThreadPoolExecutor(max_workers=n_threads) as executor:
futures = {executor.submit(check_node_health, nodeinfo[0], node, check_type, print_out): node
for node, nodeinfo in nodes_dict.items()}
for future in as_completed(futures):
node = futures[future]
nodeinfo = nodes_dict[node]
result = future.result()
nodes_dict[node] = (nodeinfo[0], result)
# print("{} {}".format(node, nodes_dict[node]))
def _check_executor(self, dt):
start = time()
try:
for future in as_completed(self._futures[:], 0):
self._futures.remove(future)
try:
result = future.result()
except:
traceback.print_exc()
# make an error tile?
continue
if result is None:
continue
callback, args = result
callback(*args)
# capped executor in time, in order to prevent too much slowiness.
# seems to works quite great with big zoom-in/out
if time() - start > self.cap_time:
break
except TimeoutError:
pass
def _check_executor(self, dt):
start = time()
try:
for future in as_completed(self._futures[:], 0):
self._futures.remove(future)
try:
result = future.result()
except Exception:
traceback.print_exc()
# make an error tile?
continue
if result is None:
continue
callback, args = result
callback(*args)
# capped executor in time, in order to prevent too much
# slowiness.
# seems to works quite great with big zoom-in/out
if time() - start > self.cap_time:
break
except TimeoutError:
pass
def test_zero_timeout(self):
future1 = self.executor.submit(time.sleep, 2)
completed_futures = set()
try:
for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1],
timeout=0):
completed_futures.add(future)
except futures.TimeoutError:
pass
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
completed_futures)
def process(self, resources):
client = local_session(self.manager.session_factory).client('rds')
# restore up to 10 in parallel, we have to wait on each.
with self.executor_factory(
max_workers=min(10, len(resources) or 1)) as w:
futures = {}
for r in resources:
tags = {t['Key']: t['Value'] for t in r['Tags']}
if not set(tags).issuperset(self.restore_keys):
self.log.warning(
"snapshot:%s missing restore tags",
r['DBSnapshotIdentifier'])
continue
futures[w.submit(self.process_instance, client, r)] = r
for f in as_completed(futures):
r = futures[f]
if f.exception():
self.log.warning(
"Error restoring db:%s from:%s error:\n%s",
r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'],
f.exception())
continue
def _process_with_futures(self, helper, buckets, max_workers=3):
results = []
with self.executor_factory(max_workers) as w:
futures = {}
for b in buckets:
futures[w.submit(helper, b)] = b
for f in as_completed(futures):
if f.exception():
b = futures[f]
self.log.error(
"Error on bucket:%s region:%s policy:%s error: %s",
b['Name'], b.get('Location', 'unknown'),
self.manager.data.get('name'), f.exception())
self.denied_buckets.add(b['Name'])
continue
result = f.result()
if result:
results.append(result)
return results
def get_elb_bucket_locations(self):
elbs = self.manager.get_resource_manager('elb').resources()
get_elb_attrs = functools.partial(
_query_elb_attrs, self.manager.session_factory)
with self.executor_factory(max_workers=2) as w:
futures = []
for elb_set in chunks(elbs, 100):
futures.append(w.submit(get_elb_attrs, elb_set))
for f in as_completed(futures):
if f.exception():
log.error("Error while scanning elb log targets: %s" % (
f.exception()))
continue
for tgt in f.result():
yield tgt
def process(self, buckets, event=None):
results = []
with self.executor_factory(max_workers=2) as w:
futures = {}
for b in buckets:
futures[w.submit(self.process_bucket, b)] = b
for f in as_completed(futures):
b = futures[f]
if f.exception():
self.log.error(
"Error processing bucket: %s error: %s",
b['Name'], f.exception())
continue
if f.result():
results.append(b)
return results
def process(self, buckets):
with self.executor_factory(max_workers=3) as w:
futures = {}
results = []
for b in buckets:
futures[w.submit(self.process_bucket, b)] = b
for future in as_completed(futures):
if future.exception():
bucket = futures[future]
self.log.error('error modifying bucket lifecycle: %s\n%s',
bucket['Name'], future.exception())
results += filter(None, [future.result()])
return results
def process(self, resources, event=None):
self.accounts = self.get_accounts()
results = []
with self.executor_factory(max_workers=3) as w:
futures = []
for resource_set in chunks(resources, 50):
futures.append(w.submit(
self.process_resource_set, resource_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception checking cross account access \n %s" % (
f.exception()))
continue
results.extend(f.result())
return results
def process(self, snapshots):
self.image_snapshots = set()
# Be careful re image snapshots, we do this by default
# to keep things safe by default, albeit we'd get an error
# if we did try to delete something associated to an image.
pre = len(snapshots)
snapshots = list(filter(None, _filter_ami_snapshots(self, snapshots)))
post = len(snapshots)
log.info("Deleting %d snapshots, auto-filtered %d ami-snapshots",
post, pre - post)
with self.executor_factory(max_workers=2) as w:
futures = []
for snapshot_set in chunks(reversed(snapshots), size=50):
futures.append(
w.submit(self.process_snapshot_set, snapshot_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception deleting snapshot set \n %s" % (
f.exception()))
return snapshots
def process(self, volumes):
vol_count = len(volumes)
volumes = [v for v in volumes if v['Attachments']]
if len(volumes) != vol_count:
self.log.warning(
"ebs copy tags action implicitly filtered from %d to %d",
vol_count, len(volumes))
self.initialize(volumes)
with self.executor_factory(max_workers=10) as w:
futures = []
for instance_set in chunks(sorted(
self.instance_map.keys(), reverse=True), size=100):
futures.append(
w.submit(self.process_instance_set, instance_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception copying instance tags \n %s" % (
f.exception()))
def create_elb_active_attributes_tuples(self, elb_policy_tuples):
"""
creates a list of tuples for all attributes that are marked
as "true" in the load balancer's polices, e.g.
(myelb,['Protocol-SSLv1','Protocol-SSLv2'])
"""
active_policy_attribute_tuples = []
with self.executor_factory(max_workers=2) as w:
futures = []
for elb_policy_set in chunks(elb_policy_tuples, 50):
futures.append(
w.submit(self.process_elb_policy_set, elb_policy_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception processing elb policies \n %s" % (
f.exception()))
continue
for elb_policies in f.result():
active_policy_attribute_tuples.append(elb_policies)
return active_policy_attribute_tuples
def process(self, asgs):
error = False
key = self.data.get('key', DEFAULT_TAG)
with self.executor_factory(max_workers=3) as w:
futures = {}
for asg_set in chunks(asgs, self.batch_size):
futures[w.submit(self.process_asg_set, asg_set, key)] = asg_set
for f in as_completed(futures):
asg_set = futures[f]
if f.exception():
error = f.exception()
self.log.exception(
"Exception untagging asg:%s tag:%s error:%s" % (
", ".join([a['AutoScalingGroupName']
for a in asg_set]),
self.data.get('key', DEFAULT_TAG),
f.exception()))
if error:
raise error