def bookmarklet_js():
base_url = request.url.replace(
"browser-tools/bookmarklet.js",
"static/browser-tools/"
)
if "localhost:" not in base_url:
# seems like this shouldn't be necessary. but i think
# flask's request.url is coming in with http even when
# we asked for https on the server. weird.
base_url = base_url.replace("http://", "https://")
rendered = render_template(
"browser-tools/bookmarklet.js",
base_url=base_url
)
resp = make_response(rendered, 200)
resp.mimetype = "application/javascript"
return resp
python类url()的实例源码
def showTimeMap(urir, format):
urir = getCompleteURI(urir)
s = surt.surt(urir, path_strip_trailing_slash_unless_empty=False)
indexPath = ipwbConfig.getIPWBReplayIndexPath()
cdxjLinesWithURIR = getCDXJLinesWithURIR(urir, indexPath)
tmContentType = ''
if format == 'link':
tm = generateLinkTimeMapFromCDXJLines(
cdxjLinesWithURIR, s, request.url)
tmContentType = 'application/link-format'
elif format == 'cdxj':
tm = generateCDXJTimeMapFromCDXJLines(
cdxjLinesWithURIR, s, request.url)
tmContentType = 'application/cdxj+ors'
resp = Response(tm)
resp.headers['Content-Type'] = tmContentType
return resp
def update_content_in_local_cache(url, content, method='GET'):
"""?? local_cache ??????, ??content
?stream?????"""
if local_cache_enable and method == 'GET' and cache.is_cached(url):
info_dict = cache.get_info(url)
resp = cache.get_obj(url)
resp.set_data(content)
# ???????????content?, without_content ????true
# ?????????, ???content????, ????????
# ?stream???, ??????http?, ???????, ????????????????
# ?????????????????????, ???????????????
info_dict['without_content'] = False
if verbose_level >= 4: dbgprint('LocalCache_UpdateCache', url, content[:30], len(content))
cache.put_obj(
url,
resp,
obj_size=len(content),
expires=get_expire_from_mime(parse.mime),
last_modified=info_dict.get('last_modified'),
info_dict=info_dict,
)
def extract_url_path_and_query(full_url=None, no_query=False):
"""
Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y
:param no_query:
:type full_url: str
:param full_url: full url
:return: str
"""
if full_url is None:
full_url = request.url
split = urlsplit(full_url)
result = split.path or "/"
if not no_query and split.query:
result += '?' + split.query
return result
# ################# End Client Request Handler #################
# ################# Begin Middle Functions #################
def request_remote_site():
"""
???????(high-level), ????404/500??? domain_guess ??
"""
# ????????
# ??: ?zmirror?????????, ??????????????
parse.remote_response = send_request(
parse.remote_url,
method=request.method,
headers=parse.client_header,
data=parse.request_data_encoded,
)
if parse.remote_response.url != parse.remote_url:
warnprint("requests's remote url", parse.remote_response.url,
'does no equals our rewrited url', parse.remote_url)
if 400 <= parse.remote_response.status_code <= 599:
# ??url????????
dbgprint("Domain guessing for", request.url)
result = guess_correct_domain()
if result is not None:
parse.remote_response = result
def advisory_atom():
last_recent_entries = 15
data = get_advisory_data()['published'][:last_recent_entries]
feed = AtomFeed('Arch Linux Security - Recent advisories',
feed_url=request.url, url=request.url_root)
for entry in data:
advisory = entry['advisory']
package = entry['package']
title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type)
feed.add(title=title,
content=render_template('feed.html', content=advisory.content),
content_type='html',
summary=render_template('feed.html', content=advisory.impact),
summary_tpe='html',
author='Arch Linux Security Team',
url=TRACKER_ISSUE_URL.format(advisory.id),
published=advisory.created,
updated=advisory.created)
return feed.get_response()
def predict():
import ipdb; ipdb.set_trace(context=20)
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
try:
pokemon_name = predict_mlp(file).capitalize()
pokemon_desc = pokemon_entries.get(pokemon_name)
msg = ""
except Exception as e:
pokemon_name = None
pokemon_desc = None
msg = str(e)
return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
def rss_feed():
feed = AtomFeed('White House Briefing Room Releases', feed_url=request.url, url=request.url_root)
documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc())
for document in documents:
feed.add(document.title, document.tweet,
content_type='text',
author="@presproject2017",
url=make_external(document.full_url),
updated=document.document_date,
published=document.document_date)
return feed.get_response()
def retrieveAlertsCountWithType():
""" Retrieve number of alerts in timeframe (GET-Parameter time as decimal or "day") and divide into honypot types"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
# query ES
else:
# Retrieve Number of Alerts from ElasticSearch and return as xml / json
if not request.args.get('time'):
app.logger.error('No time GET-parameter supplied in retrieveAlertsCountWithType. Must be decimal number (in minutes) or string "day"')
return app.config['DEFAULTRESPONSE']
else:
returnResult = formatAlertsCountWithType(queryAlertsCountWithType(request.args.get('time'), checkCommunityIndex(request)))
setCache(request.url, returnResult, 13, "url")
app.logger.debug('UNCACHED %s' % str(request.url))
return jsonify(returnResult)
def retrieveDatasetAlertsPerMonth():
""" Retrieve the attacks / day in the last x days from elasticsearch
and return as JSON for the last months, defaults to last month,
if no GET parameter days is given
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
# query ES
else:
if not request.args.get('days'):
# Using default : within the last month
returnResult = formatDatasetAlertsPerMonth(queryDatasetAlertsPerMonth(None, checkCommunityIndex(request)))
else:
returnResult = formatDatasetAlertsPerMonth(queryDatasetAlertsPerMonth(request.args.get('days'), checkCommunityIndex(request)))
setCache(request.url, returnResult, 600, "url")
return jsonify(returnResult)
def retrieveDatasetAlertTypesPerMonth():
""" Retrieve the attacks / day in the last x days from elasticsearch,
split by attack group
and return as JSON for the last x months, defaults to last month,
if no GET parameter days is given
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
# query ES
else:
if not request.args.get('days'):
# Using default : within the last month
returnResult = formatDatasetAlertTypesPerMonth(queryDatasetAlertTypesPerMonth(None, checkCommunityIndex(request)))
else:
returnResult = formatDatasetAlertTypesPerMonth(queryDatasetAlertTypesPerMonth(request.args.get('days'), checkCommunityIndex(request)))
setCache(request.url, returnResult, 3600, "url")
return jsonify(returnResult)
def retrieveAlertStats():
""" Retrieve combined statistics
AlertsLastMinute, AlertsLastHour, AlertsLast24Hours
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
# query ES
else:
returnResult = formatAlertStats(queryAlertStats(checkCommunityIndex(request)))
setCache(request.url, returnResult, 13, "url")
app.logger.debug('UNCACHED %s' % str(request.url))
return jsonify(returnResult)
def get_inbox():
pyldnlog.debug("Requested inbox data of {} in {}".format(request.url, request.headers['Accept']))
if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
resp = make_response(inbox_graph.serialize(format='application/ld+json'))
resp.headers['Content-Type'] = 'application/ld+json'
elif request.headers['Accept'] in ACCEPTED_TYPES:
resp = make_response(inbox_graph.serialize(format=request.headers['Accept']))
resp.headers['Content-Type'] = request.headers['Accept']
else:
return 'Requested format unavailable', 415
resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
resp.headers['Allow'] = "GET, HEAD, OPTIONS, POST"
resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type", <http://www.w3.org/ns/ldp#RDFSource>; rel="type", <http://www.w3.org/ns/ldp#Container>; rel="type", <http://www.w3.org/ns/ldp#BasicContainer>; rel="type"'
resp.headers['Accept-Post'] = 'application/ld+json, text/turtle'
return resp
def get_notification(id):
pyldnlog.debug("Requested notification data of {}".format(request.url))
pyldnlog.debug("Headers: {}".format(request.headers))
# Check if the named graph exists
pyldnlog.debug("Dict key is {}".format(pyldnconf._inbox_url + id))
if pyldnconf._inbox_url + id not in graphs:
return 'Requested notification does not exist', 404
if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format='application/ld+json'))
resp.headers['Content-Type'] = 'application/ld+json'
elif request.headers['Accept'] in ACCEPTED_TYPES:
resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format=request.headers['Accept']))
resp.headers['Content-Type'] = request.headers['Accept']
else:
return 'Requested format unavailable', 415
resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
resp.headers['Allow'] = "GET"
return resp
def register_extension(app):
@app.before_request
def add_correlation_id(*args, **kw):
correlation_id = request.headers.get(CORRELATION_ID)
log.debug("%s %s", request.method, request.url)
if not correlation_id:
correlation_id = str(uuid.uuid4())
if request.method != "GET":
"""
TODO: remove sensitive information such as username/password
"""
log.debug({
"message": "Tracking request",
"correlation_id": correlation_id,
"method": request.method,
"uri": request.url,
"data": request.data,
})
request.correlation_id = correlation_id
@app.after_request
def save_correlation_id(response):
if CORRELATION_ID not in response.headers:
response.headers[CORRELATION_ID] = getattr(request, "correlation_id", None)
return response
def _provide_client_handler(self, section, name, kwargs_updator=None):
def _wrapper(func):
@wraps(func)
def _handler(**kwargs):
client_key = self.auth.authenticate(
request.method,
request.url,
request.headers)
client = self.client_class.load(client_key)
if not client:
abort(401)
g.ac_client = client
kwargs['client'] = client
if kwargs_updator:
kwargs.update(kwargs_updator(**kwargs))
ret = func(**kwargs)
if ret is not None:
return ret
return '', 204
self._add_handler(section, name, _handler)
return func
return _wrapper
def error_mail(subject, data, r, via_web=True):
body = '''
remote URL: {r.url}
status code: {r.status_code}
request data:
{data}
status code: {r.status_code}
content-type: {r.headers[content-type]}
reply:
{r.text}
'''.format(r=r, data=data)
if not has_request_context():
via_web = False
if via_web:
user = get_username()
body = 'site URL: {}\nuser: {}\n'.format(request.url, user) + body
send_mail(subject, body)
def announce_change(change):
place = change.place
body = '''
user: {change.user.username}
name: {name}
page: {url}
items: {change.update_count}
comment: {change.comment}
https://www.openstreetmap.org/changeset/{change.id}
'''.format(name=place.display_name,
url=place.candidates_url(_external=True),
change=change)
send_mail('tags added: {}'.format(place.name_for_changeset), body)
def open_changeset_error(place, changeset, r):
template = '''
user: {change.user.username}
name: {name}
page: {url}
sent:
{sent}
reply:
{reply}
'''
body = template.format(name=place.display_name,
url=place.candidates_url(_external=True),
sent=changeset,
reply=r.text)
send_mail('error creating changeset:' + place.name, body)
def log_exception(self, exc_info):
self.logger.error("""
Path: %s
HTTP Method: %s
Client IP Address: %s
User Agent: %s
User Platform: %s
User Browser: %s
User Browser Version: %s
GET args: %s
view args: %s
URL: %s
""" % (
request.path,
request.method,
request.remote_addr,
request.user_agent.string,
request.user_agent.platform,
request.user_agent.browser,
request.user_agent.version,
dict(request.args),
request.view_args,
request.url
), exc_info=exc_info)
def authenticated(fn):
"""Mark a route as requiring authentication."""
@wraps(fn)
def decorated_function(*args, **kwargs):
if not session.get('is_authenticated'):
return redirect(url_for('login', next=request.url))
if request.path == '/logout':
return fn(*args, **kwargs)
if (not session.get('name') or
not session.get('email') or
not session.get('institution')) and request.path != '/profile':
return redirect(url_for('profile', next=request.url))
return fn(*args, **kwargs)
return decorated_function
def login_required(func):
"""
Decorator to require login and save URL for redirecting user after login
"""
@wraps(func)
def decorated_function(*args, **kwargs):
"""decorator args"""
if not is_logged_in():
# Save off the page so we can redirect them to what they were
# trying to view after logging in.
session['previously_requested_page'] = request.url
return redirect(url_for('login'))
return func(*args, **kwargs)
return decorated_function
def set_featured_title():
"""Form POST to update featured title"""
title = request.form['title']
stack = request.form['stack']
article = models.search_for_article(title, stacks=[stack], status=PUBLISHED)
if article is None:
flash('Cannot find published guide "%s" stack "%s"' % (title, stack),
category='error')
url = session.pop('previously_requested_page', None)
if url is None:
url = url_for('index')
return redirect(url)
models.set_featured_article(article)
flash('Featured guide updated', category='info')
return redirect(url_for('index'))
def get_social_redirect_url(article, share_domain):
"""
Get social redirect url for po.st to enable all counts to follow us
regardless of where we're hosted.
"""
# Strip of trailing / to avoid having two slashes together in resulting url
if share_domain.endswith('/'):
share_domain = share_domain[:-1]
redirect_url = filters.url_for_article(article)
# Use full domain for redirect_url b/c this controls the po.st social
# sharing numbers. We want these numbers to stick with the domain
# we're running on so counts go with us.
url = url_for_domain(redirect_url, domain=share_domain)
return strip_subfolder(url)
def strip_subfolder(url):
"""
Strip off the subfolder if it exists so we always use the exact same
share url for saving counts.
"""
subfolder = app.config.get('SUBFOLDER', None)
if not subfolder:
return url
p = urlparse.urlparse(url)
if not p.path.startswith(subfolder):
return url
new_path = p.path.replace('%s' % (subfolder), '', 1)
new_url = urlparse.ParseResult(p.scheme, p.netloc, new_path, p.params,
p.query, p.fragment)
return new_url.geturl()
def require_login_frontend(only_if=True):
"""
Same logic as the API require_login, but this decorator is intended for use for frontend interfaces.
It returns a redirect to the login page, along with a post-login redirect_url as a GET parameter.
:param only_if: Optionally specify a boolean condition that needs to be true for the frontend login to be required.
This is semantically equivalent to "require login for this view endpoint only if <condition>,
otherwise, no login is required"
"""
def decorator(func):
@wraps(func)
def decorated_view(*args, **kwargs):
if not current_user.is_authenticated and only_if:
return redirect(UserLoginInterfaceURI.uri(redirect_url=quote(request.url, safe='')))
return func(*args, **kwargs)
return decorated_view
return decorator
def curl():
form = MyForm.MyForm_input()
if form.submit.data:
urls = form.text.data.strip().splitlines()
urls = set(urls)
for url in urls:
Purge = purge.Purged()
if not url or url.startswith('#'):
continue
else:
url = url.strip()
if not url.startswith('http'):
flash('url begin with http(s)://')
return render_template('Message_static.html',Main_Infos=g.main_infos)
url_rep=Purge.purge_cdn(url)
flash(url+' purge CDN '+url_rep)
return render_template('Message_static.html',Main_Infos=g.main_infos)
return render_template('cdn.html',form=form,Main_Infos=g.main_infos)
def chart_center_traffic():
try:
Tra_cli_url_minute_datas = collections.OrderedDict()
Tra_ser_url_minute_datas = collections.OrderedDict()
for i in range(1,5):
Tm = datetime.datetime.now() - datetime.timedelta(minutes=i)
Tm = Tm.strftime('%H:%M')
Tra_cli_url_minute_Key = 'traffic.cli.url_%s' % Tm
Tra_ser_url_minute_Key = 'traffic.ser.url_%s' % Tm
Tra_cli_url_minute_datas[Tm] = [[str(url), int(RC.zscore(Tra_cli_url_minute_Key, url)) * 8 / 1024 / 1024] for url in RC.zrevrange(Tra_cli_url_minute_Key, 0,4)]
Tra_ser_url_minute_datas[Tm] = [[str(url), int(RC.zscore(Tra_ser_url_minute_Key,url)) * 8 / 1024 / 1024] for url in RC.zrevrange(Tra_ser_url_minute_Key, 0,4) ]
return render_template('chart_center_traffic.html',Main_Infos=g.main_infos,Tra_cli_url_minute_datas=Tra_cli_url_minute_datas,Tra_ser_url_minute_datas=Tra_ser_url_minute_datas)
except Exception as e:
logging.error(e)
flash('??????!')
return render_template('Message_static.html', Main_Infos=g.main_infos)
def gateway_domain():
try:
DATA = [eval(v) for v in RC.lrange('top_url_%s'%time.strftime('%Y-%m-%d',time.localtime()), 0, -1)]
TOP_URL_DATA = [{'data': DATA, 'name': 'conn'}]
values = collections.OrderedDict()
for k in range(1,7):
td = datetime.datetime.now()-datetime.timedelta(minutes=k)
tt = td.strftime("%H:%M")
tm = td.strftime('%Y%m%d%H%M')
tables = ('????','????')
httpry_Key = 'httpry_domain.%s' % tm
values[tt] = [[url,int(RC.zscore(httpry_Key, url))] for url in RC.zrevrange(httpry_Key, 0, 10)]
return render_template('gateway_domain.html',Main_Infos=g.main_infos,tables = tables,values=values,TOP_URL_DATA=TOP_URL_DATA)
except Exception as e:
logging.error(e)
flash('??????!')
return render_template('Message_static.html', Main_Infos=g.main_infos)
def backup_mysql_results():
produce.Async_log(g.user, request.url)
try:
if Redis.exists('finish_backup'):
Infos = Redis.lrange('finish_backup',0,-1)
if Infos:
Infos = [eval(info) for info in set(Infos)]
tt = time.strftime('%Y-%m-%d', time.localtime())
tables = ('??','?????','MYSQL???','?????',' ??')
return render_template('backup_mysql_results.html',Main_Infos=g.main_infos,Infos=Infos,tt=tt,tables=tables)
else:
raise flash('????:?????????!')
else:
raise flash('????:?????????!')
except Exception as e:
if 'old' not in str(e):
flash(str(e))
return render_template('Message_static.html',Main_Infos=g.main_infos)