def upload_blocks(bucket, chunk_size, max_threads, lines):
session = botocore.session.get_session()
client = session.create_client('s3')
start = time.perf_counter()
futures = []
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# Start the load operations and mark each future with its URL
for line in lines:
raw_block, key = load_json_block(line)
futures.append(executor.submit(client.put_object,Bucket=bucket,
Key=key,
Body=raw_block,
ContentEncoding='UTF-8',
ContentType='application/json'))
end = time.perf_counter()
done, pending = concurrent.futures.wait(futures)
complete = time.perf_counter()
rate = 1 / ((complete - start) / len(done))
return len(done), int(rate)
python类append()的实例源码
def get(self, request):
alerts = []
try:
url = urljoin(settings.PROMGEN['alertmanager']['url'], '/api/v1/alerts')
response = util.get(url)
except requests.exceptions.ConnectionError:
logger.error('Error connecting to %s', url)
return JsonResponse({})
data = response.json().get('data', [])
if data is None:
# Return an empty alert-all if there are no active alerts from AM
return JsonResponse({})
for alert in data:
alert.setdefault('annotations', {})
# Humanize dates for frontend
for key in ['startsAt', 'endsAt']:
if key in alert:
alert[key] = parser.parse(alert[key])
# Convert any links to <a> for frontend
for k, v in alert['annotations'].items():
alert['annotations'][k] = defaultfilters.urlize(v)
alerts.append(alert)
return JsonResponse(alerts, safe=False)
def get(self, request, label):
data = set()
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
for host in models.Shard.objects.filter(proxy=True):
futures.append(executor.submit(util.get, '{}/api/v1/label/{}/values'.format(host.url, label), headers=self.headers))
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
# Need to try to decode the json BEFORE we raise_for_status
# so that we can pass back the error message from Prometheus
_json = result.json()
result.raise_for_status()
logger.debug('Appending data from %s', result.request.url)
data.update(_json['data'])
except:
logger.exception('Error with response')
_json['promgen_proxy_request'] = result.request.url
return JsonResponse(_json, status=result.status_code)
return JsonResponse({
'status': 'success',
'data': sorted(data)
})
list-checkins.py 文件源码
项目:serverless-southwest-check-in
作者: DavidWittman
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def main(args):
results = []
state_machine_arn = args.state_machine_arn
# TODO(dw): pagination for > 100 executions
executions = SFN.list_executions(
stateMachineArn=state_machine_arn,
statusFilter='RUNNING'
)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = []
for e in executions['executions']:
future = executor.submit(get_execution_details, e['executionArn'])
futures.append(future)
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
print(json.dumps(results))
def run_multiprocess(func, tasks, *args, **kwargs):
results = []
remains = list(enumerate(tasks))
while remains:
errors = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
futures = [executor.submit(func, *tuple(chain([task], args)), **kwargs) for _, task in remains]
concurrent.futures.wait(futures)
for future, t in zip(futures, remains):
n, task = t
try:
results.append((n, future.result()))
except Exception as e:
errors.append((n, task))
remains = errors
return list(map(lambda x: x[1], sorted(results, key=lambda x: x[0])))
def _make_request(self, edited_title, current_datetime, diff, imdbID):
from_date = (current_datetime - datetime.timedelta(days=7-diff)).strftime('%Y-%m-%d')
to_date = (current_datetime - datetime.timedelta(days=6-diff)).strftime('%Y-%m-%d')
tweets = []
## Make search request
## Request not to recieve tweets that contain links, follow the RT pattern of retweets
try:
response = self.api.GetSearch(term='"'+edited_title +'" -filter:links -RT', since=from_date, until=to_date, lang='en', result_type='mixed')
except Exception as e:
print(e)
for tweet in response:
## Tag movie with imdbID
tweet.imdbID = imdbID
## Only append Tweets in English
if tweet.lang == 'en' or tweet.user.lang == 'en':
tweets.append(tweet)
return tweets
# Make complicated title simple to improve search results
def extract(self, season, result):
self.default_url = result
with concurrent.futures.ThreadPoolExecutor(16) as executor:
# First step, we extract links to all the episodes
episodes, base_url, pages = self.page_worker(
self.default_url
)
futures = []
for page in pages:
self.logger.debug('Processing page {}'.format(page))
futures.append(executor.submit(
self.page_worker, base_url + str(page)
))
results = concurrent.futures.wait(futures)
for completed in results.done:
episodes += completed.result()
# Second step, we get all the available sources.
list(executor.map(self.episode_worker, episodes))
def _register(self):
futures = []
for message in self._register_requests():
self._stream.wait_for_ready()
future = self._stream.send(
message_type=Message.TP_REGISTER_REQUEST,
content=message.SerializeToString())
futures.append(future)
for future in futures:
resp = TpRegisterResponse()
try:
resp.ParseFromString(future.result().content)
LOGGER.info("register attempt: %s",
TpRegisterResponse.Status.Name(resp.status))
except ValidatorConnectionError as vce:
LOGGER.info("during waiting for response on registration: %s",
vce)
def do_load(args):
auth_info = _get_auth_info(args.auth_user, args.auth_password)
with open(args.filename, mode='rb') as fd:
batches = batch_pb2.BatchList()
batches.ParseFromString(fd.read())
start = time.time()
futures = []
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
for batch_list in _split_batch_list(batches):
fut = executor.submit(post_batches, args.url, auth_info, batch_list)
futures.append(fut)
# Wait until all futures are complete
wait(futures)
stop = time.time()
print("batches: {} batch/sec: {}".format(
str(len(batches.batches)),
len(batches.batches) / (stop - start)))
def run(self):
PluginHelpers.run_method_for_each('on_start', self.global_options, self.run_log)
max_workers = 1 if self.global_options.get('tests_sequential', False) else None
executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
futures = []
for set_item in self.settings:
if 'url' in set_item:
url = set_item['url']
futures.append(executor.submit(self.run_tests, url, set_item, self.global_options))
elif 'wait' in set_item:
now = time.time()
executor.shutdown(True) # wait for all current jobs to stop
then = time.time()
wait_more = set_item['wait'] - (then - now)
if wait_more > 0:
time.sleep(wait_more)
executor = concurrent.futures.ThreadPoolExecutor() # replace the executor
executor.shutdown(True) # wait for all current jobs to stop
for future in futures:
self.run_log.append(future.result())
PluginHelpers.run_method_for_each('on_end', self.global_options, self.run_log)
def detectValidExtension(self, future) :
if not self.stopThreads :
html = future.result()[0].text
ext = future.ext[0]
r = self.isASuccessfulUpload(html)
if r :
self.validExtensions.append(ext)
if self.shouldLog :
self.logger.info("\033[1m\033[42mExtension %s seems valid for this form.\033[m", ext)
if r != True :
self.logger.info("\033[1;32mTrue regex matched the following information : %s\033[m",r)
return r
else :
return None
#detects valid extensions for this upload form (sending legit files with legit mime types)
def tiles_urllib(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties,
progress_bar: bool, n_parallel, timeout: float) -> List[HipsTile]:
"""Generator function to fetch HiPS tiles from a remote URL."""
with concurrent.futures.ThreadPoolExecutor(max_workers=n_parallel) as executor:
futures = []
for meta in tile_metas:
url = hips_survey.tile_url(meta)
future = executor.submit(fetch_tile_urllib, url, meta, timeout)
futures.append(future)
futures = concurrent.futures.as_completed(futures)
if progress_bar:
from tqdm import tqdm
futures = tqdm(futures, total=len(tile_metas), desc='Fetching tiles')
tiles = []
for future in futures:
tiles.append(future.result())
return tiles
def fetch_all_tiles_aiohttp(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties,
progress_bar: bool, n_parallel: int, timeout: float) -> List[HipsTile]:
"""Generator function to fetch HiPS tiles from a remote URL using aiohttp."""
import aiohttp
connector = aiohttp.TCPConnector(limit=n_parallel)
async with aiohttp.ClientSession(connector=connector) as session:
futures = []
for meta in tile_metas:
url = hips_survey.tile_url(meta)
future = asyncio.ensure_future(fetch_tile_aiohttp(url, meta, session, timeout))
futures.append(future)
futures = asyncio.as_completed(futures)
if progress_bar:
from tqdm import tqdm
futures = tqdm(futures, total=len(tile_metas), desc='Fetching tiles')
tiles = []
for future in futures:
tiles.append(await future)
return tiles
def chunkify(iterable, chunksize=10000):
i = 0
chunk = []
for item in iterable:
chunk.append(item)
i += 1
if i == chunksize:
yield chunk
i = 0
chunk = []
if len(chunk) > 0:
yield chunk
def populate(filename, max_workers, max_threads, chunk_size, bucket=S3_BLOCKS_BUCKET):
with open(filename, mode='r') as f:
chunks = chunkify(f, chunksize=chunk_size)
start = time.perf_counter()
func = functools.partial(upload_blocks, bucket, chunk_size, max_threads)
counter = 0
samples = 20
chunk_rates = deque(maxlen=samples)
actual_rates = deque(maxlen=samples)
overheads = deque(maxlen=samples)
with Pool(processes=max_workers) as pool:
results = pool.imap_unordered(func, chunks, chunksize=1)
for count, rate in results:
elapsed = int(time.perf_counter() - start)
counter += count
chunk_rates.append(rate)
avg_chunk_rate = int(sum(chunk_rates)/samples)
perfect = avg_chunk_rate * max_workers
actual = int(counter / elapsed)
actual_rates.append(actual)
avg_actual_rate = int(sum(actual_rates)/samples)
overhead = int(100 - ((actual / perfect) * 100))
overheads.append(overhead)
avg_overhead = int(sum(overheads)/samples)
report_progress(counter, avg_chunk_rate, avg_actual_rate, avg_overhead)
end = time.perf_counter()
complete = time.perf_counter()
print('master scheduling time:%s complete_time:%s b/s: %s' % (
end - start, complete - start, 1 / ((complete - start) / chunk_size)
))
def get_context_data(self, **kwargs):
context = super(HostList, self).get_context_data(**kwargs)
context['host_groups'] = collections.defaultdict(list)
for host in context['object_list']:
context['host_groups'][host.name].append(host)
context['host_groups'] = dict(context['host_groups'])
return context
def form_valid(self, form):
project = get_object_or_404(models.Project, id=self.kwargs['pk'])
futures = []
context = {
'target': self.request.POST['target'].strip('#'),
'results': [],
'errors': [],
}
headers = {
'referer': project.get_absolute_url()
}
# Default /metrics path
if not form.cleaned_data['path']:
form.cleaned_data['path'] = 'metrics'
if not project.farm:
context['errors'].append({'url': headers['referer'], 'message': 'Missing Farm'})
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
for host in project.farm.host_set.all():
futures.append(executor.submit(util.get, 'http://{}:{}/{}'.format(
host.name, form.cleaned_data['port'], form.cleaned_data['path']
), headers=headers))
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
context['results'].append(result)
except:
result = future.exception()
logger.warning('Error with response')
context['errors'].append({'url': result.request.url, 'message': result})
return JsonResponse({'#' + context['target']: render_to_string('promgen/ajax_exporter.html', context)})
def post(self, request):
silences = collections.defaultdict(list)
try:
url = urljoin(settings.PROMGEN['alertmanager']['url'], '/api/v1/silences')
response = util.get(url)
except requests.exceptions.ConnectionError:
logger.error('Error connecting to %s', url)
return JsonResponse({})
data = response.json().get('data', [])
if data is None:
# Return an empty silence-all if there are no active silences from AM
return JsonResponse({})
currentAt = datetime.datetime.now(datetime.timezone.utc)
for silence in data:
if 'comment' in silence:
silence['comment'] = defaultfilters.urlize(silence['comment'])
# Since there is no status field, compare endsAt with the current time
if 'endsAt' in silence:
silence['endsAt'] = parser.parse(silence['endsAt'])
if silence['endsAt'] < currentAt:
continue
silences['silence-all'].append(silence)
for matcher in silence.get('matchers'):
if matcher.get('name') in ['service', 'project']:
silences['silence-{}-{}'.format(matcher.get('name'), matcher.get('value'))].append(silence)
context = {'#' + slugify(key): render_to_string('promgen/ajax_silence.html', {'silences': silences[key], 'key': key}, request).strip() for key in silences}
context['#silence-load'] = render_to_string('promgen/ajax_silence_button.html', {'silences': silences['silence-all'], 'key': 'silence-all'}).strip()
return JsonResponse(context)
def get(self, request):
data = []
futures = []
resultType = None
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
for host in models.Shard.objects.filter(proxy=True):
futures.append(executor.submit(util.get, '{}/api/v1/query_range?{}'.format(host.url, request.META['QUERY_STRING']), headers=self.headers))
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
# Need to try to decode the json BEFORE we raise_for_status
# so that we can pass back the error message from Prometheus
_json = result.json()
result.raise_for_status()
logger.debug('Appending data from %s', result.request.url)
data += _json['data']['result']
resultType = _json['data']['resultType']
except:
logger.exception('Error with response')
_json['promgen_proxy_request'] = result.request.url
return JsonResponse(_json, status=result.status_code)
return JsonResponse({
'status': 'success',
'data': {
'resultType': resultType,
'result': data,
}
})
def run_synchronize(func, tasks, *args, **kwargs):
results = []
for task in tasks:
results.append(func(task, *args, **kwargs))
return results
def run_raw_multiprocess(func, tasks, *args, **kwargs):
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), maxtasksperchild=1)
futures = []
for task in tasks:
futures.append(pool.apply_async(func, tuple(chain([task], args)), kwargs))
pool.close()
results = list(map(lambda x: x.get(), futures))
return results
def append_results(results, function, *args, **kwargs):
"""
Calls *function* with the given *args* and *kwargs* then appends the result
to *results* (which must be a list). If we're not in the main process the
given *function* will be called using `safe_call`.
"""
if os.getpid() != PID:
results.append(safe_call(function, *args, **kwargs))
else:
results.append(function(*args, **kwargs))
def callback_when_complete(futures, callback):
"""
Calls *callback* after all *futures* (list) have completed running.
"""
counter = count(1)
io_loop = IOLoop.current()
results = []
def add_one(f):
c = counter.next()
results.append(f.result())
if c >= len(futures):
return callback(results)
for future in futures:
io_loop.add_future(future, add_one)
def call_singleton(self, function, identifier, *args, **kwargs):
"""
Executes *function* if no other function with the given *identifier*
is already running. If a function is currently running with the given
*identifier* the passed *function* will be called when the first
function is complete.
In other words, functions called via this method will be executed in
sequence with each function being called after the first is complete.
The function will be passed any given *args* and *kwargs* just like
:meth:`AsyncRunner.call`.
If 'callback' is passed as a keyword argument (*kwargs*) it will be
called with the result when complete.
"""
callback = kwargs.pop('callback', None)
if identifier in ONE_CALLS:
ONE_CALLS[identifier]['queue'].append(
(function, args, kwargs, callback))
else:
from collections import deque
future = self.executor.submit(safe_call, function, *args, **kwargs)
ONE_CALLS[identifier] = {
'future': future,
'queue': deque()
}
if callback:
done_callback(
ONE_CALLS[identifier]['future'],
lambda f: callback(f.result()))
completed = partial(_call_complete, self, identifier)
done_callback(ONE_CALLS[identifier]['future'], completed)
#print 'ONE_CALLS',ONE_CALLS
#print 'identifier',identifier
return ONE_CALLS[identifier]['future']
def add_task(self, funcs):
"""
Adds the given *funcs* to this schedule.
"""
if not isinstance(funcs, list):
funcs = [funcs] # Make it a list
self.funcs.append(funcs)
def search_movie(self, movie):
"""
:param movie: a Movie object with valid fields
:return tweets: A List<twitter.models.Status> containing statuses posted between one year before the movie was
released and the current date if movie is a Movie object
Otherwise returns None
"""
if movie.Title == '' or not isinstance(movie, Movie) or not isinstance(movie.Title,str):
return None
edited_title = self.__clean_title(movie.Title)
imdbID = movie.imdbID
current_datetime = timezone.now()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=7)
futures = []
tweets = []
print('Searching Twitter for ' + edited_title)
for diff in range(0, 6):
futures.append(executor.submit(self._make_request, edited_title, current_datetime, diff, imdbID))
for future in futures:
tweets += future.result()
if tweets == []:
return None
return tweets
# Make the request to Twitter (is run by individual threads)
def search(self, title):
"""
:params title: a string holding the title of the movie
:return movie: if at least one movie with a similar title is found, this is a Movie object
created from the most relevant result returned by OMDb, otherwise it is empty list
"""
## Search for all movies with similar titles
try:
matching_movies = omdb.search_movie(title)
except:
raise ConnectionError
## Return most relevant movie
highestIMDB = 0
if matching_movies:
movie = matching_movies.pop(0)
print("MOVIE: " + movie.title)
try:
movieObj = Movie.objects.get(imdbID=movie.imdb_id)
except Movie.DoesNotExist:
movieObj = None
if not movieObj:
response = omdb.request(i=movie.imdb_id, tomatoes=True, type='movie').json()
movieObj = Movie().fillWithJsonObject(response)
if not movieObj:
return None
self.known_omdb_titles.append(movieObj.Title)
return movieObj
else:
return None
def search(self):
html = requests.post(
'http://animehaven.org/wp-admin/admin-ajax.php',
data={
'action': 'search_ajax',
'keyword': self.media.metadata['name']
}
).text
html = html.replace('\\n', '')
html = html.replace('\\t', '')
html = html.replace('\\', '')
soup = BeautifulSoup(html, 'html.parser')
results = []
for result in soup.find_all('div', {'class': 'sa_post'}):
title_block = result.find('h6')
link = title_block.find('a') # The first one is the good one
title, href = link.get('title'), link.get('href')
self.logger.debug('Found block {} ({})'.format(title, href))
versions_soup = self._get(href)
versions = list(
('Sub' if 'sub' in x.text.lower() else 'Dub', x.get('href'))
for x in versions_soup.find_all('a', {'class': 'ah_button'})
)
for version, url in versions:
self.logger.debug('-> Found version {}'.format(url))
results.append(('{} ({})'.format(title, version), url))
return SearchResult.from_tuples(self.media, results)
def search(self):
soup = self._get('http://www.icefilms.info/search.php', params={
'q': self.media.metadata['name'],
'x': 0,
'y': 0
})
results = []
for result in soup.select('.title a'):
results.append((result.text, result.get('href')))
return SearchResult.from_tuples(self.media, results)
def extract(self, result):
soup = self._get(self._get_url() + result)
sources_soup = self._get(
self._get_url() + soup.select('iframe#videoframe')[0].get('src')
)
referer = self._get_url() + \
soup.select('iframe#videoframe')[0].get('src')
with concurrent.futures.ThreadPoolExecutor(16) as executor:
futures = []
for quality_div in sources_soup.select('.ripdiv'):
self.logger.debug('=> ' + quality_div.find('b').text)
t = re.search('t=(\d+?)"', sources_soup.prettify()).group(1)
results = re.search(
'var s=(\d+?),m=(\d+?);', sources_soup.prettify())
s, m = results.group(1), results.group(2)
sec = re.search(
'f.lastChild.value="(.+?)"', sources_soup.prettify()
).group(1)
for source in quality_div.select('a[onclick]'):
futures.append(executor.submit(
self._source_worker, referer, t, sec, source
))
list(concurrent.futures.as_completed(futures))