def advisory_atom():
last_recent_entries = 15
data = get_advisory_data()['published'][:last_recent_entries]
feed = AtomFeed('Arch Linux Security - Recent advisories',
feed_url=request.url, url=request.url_root)
for entry in data:
advisory = entry['advisory']
package = entry['package']
title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type)
feed.add(title=title,
content=render_template('feed.html', content=advisory.content),
content_type='html',
summary=render_template('feed.html', content=advisory.impact),
summary_tpe='html',
author='Arch Linux Security Team',
url=TRACKER_ISSUE_URL.format(advisory.id),
published=advisory.created,
updated=advisory.created)
return feed.get_response()
python类url_root()的实例源码
def rss_feed():
feed = AtomFeed('White House Briefing Room Releases', feed_url=request.url, url=request.url_root)
documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc())
for document in documents:
feed.add(document.title, document.tweet,
content_type='text',
author="@presproject2017",
url=make_external(document.full_url),
updated=document.document_date,
published=document.document_date)
return feed.get_response()
def run_tensorboard(run_id, tflog_id):
"""Launch TensorBoard for a given run ID and log ID of that run."""
data = current_app.config["data"]
# optimisticaly suppose the run exists...
run = data.get_run(run_id)
base_dir = Path(run["experiment"]["base_dir"])
log_dir = Path(run["info"]["tensorflow"]["logdirs"][tflog_id])
# TODO ugly!!!
if log_dir.is_absolute():
path_to_log_dir = log_dir
else:
path_to_log_dir = base_dir.joinpath(log_dir)
port = int(tensorboard.run_tensorboard(str(path_to_log_dir)))
url_root = request.url_root
url_parts = re.search("://([^:/]+)", url_root)
redirect_to_address = url_parts.group(1)
return redirect("http://%s:%d" % (redirect_to_address, port))
def get(self):
LOG.debug("API CALL: %s GET" % str(self.__class__.__name__))
resp = dict()
resp['versions'] = dict()
versions = [{
"status": "CURRENT",
"id": "v2",
"links": [
{
"href": request.url_root + '/v2',
"rel": "self"
}
]
}]
resp['versions'] = versions
return Response(json.dumps(resp), status=200, mimetype='application/json')
def get(self):
"""
Lists API versions.
:return: Returns a json with API versions.
:rtype: :class:`flask.response`
"""
LOG.debug("API CALL: Neutron - List API Versions")
resp = dict()
resp['versions'] = dict()
versions = [{
"status": "CURRENT",
"id": "v2.0",
"links": [
{
"href": request.url_root + '/v2.0',
"rel": "self"
}
]
}]
resp['versions'] = versions
return Response(json.dumps(resp), status=200, mimetype='application/json')
def get_feed():
from mhn.common.clio import Clio
from mhn.auth import current_user
authfeed = mhn.config['FEED_AUTH_REQUIRED']
if authfeed and not current_user.is_authenticated():
abort(404)
feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url,
url=request.url_root)
sessions = Clio().session.get(options={'limit': 1000})
for s in sessions:
feedtext = u'Sensor "{identifier}" '
feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.'
feedtext = feedtext.format(**s.to_dict())
feed.add('Feed', feedtext, content_type='text',
published=s.timestamp, updated=s.timestamp,
url=makeurl(url_for('api.get_session', session_id=str(s._id))))
return feed
def post(year, month, day, post_name):
rel_url = request.path[len('/post/'):]
fixed_rel_url = storage.fix_post_relative_url(rel_url)
if rel_url != fixed_rel_url:
return redirect(request.url_root + 'post/' + fixed_rel_url) # it's not the correct relative url, so redirect
post_ = storage.get_post(rel_url, include_draft=False)
if post_ is None:
abort(404)
post_d = post_.to_dict()
del post_d['raw_content']
post_d['content'] = get_parser(post_.format).parse_whole(post_.raw_content)
post_d['content'], post_d['toc'], post_d['toc_html'] = parse_toc(post_d['content'])
post_d['url'] = make_abs_url(post_.unique_key)
post_ = post_d
return custom_render_template(post_['layout'] + '.html', entry=post_)
def streaming_video(url_root):
'''Video streaming generator function'''
try:
while True:
if remote_control_cozmo:
image = get_annotated_image()
img_io = io.BytesIO()
image.save(img_io, 'PNG')
img_io.seek(0)
yield (b'--frame\r\n'
b'Content-Type: image/png\r\n\r\n' + img_io.getvalue() + b'\r\n')
else:
asyncio.sleep(.1)
except cozmo.exceptions.SDKShutdown:
# Tell the main flask thread to shutdown
requests.post(url_root + 'shutdown')
def atom():
""" of the news page.
"""
resp = render_template('news/atom.xml', news=latest_news(current_session))
response = make_response(resp)
response.headers['Content-Type'] = 'application/atom+xml; charset=utf-8; filename=news-ATOM'
return response
# This makes output which crashes a feed validator.
# from werkzeug.contrib.atom import AtomFeed
# news=latest_news(current_session)
# feed = AtomFeed('pygame news', feed_url=request.url, url=request.url_root)
# for new in news:
# feed.add(new.title, new.description_html,
# content_type='html',
# author='pygame',
# url='https://www.pygame.org/news.html',
# updated=new.datetimeon,
# published=new.datetimeon)
# return feed.get_response()
def get(self, hash):
path = 'static' + os.sep + 'client.zip'
try:
os.remove(path)
except:
None
zip = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk(CLIENT_FOLDER):
for f in files:
zip.write(os.path.join(root, f))
zip.close()
client = open(path).read()
if hash == hashlib.md5(client).hexdigest():
return {"err": "invalid request"}, 400
else:
return {"url": request.url_root + path}, 200
def server_netloc():
"""
Figure out the name of the server end of the request, punting if it's
the local host or not available.
"""
return urlparse.urlparse(request.url_root).netloc
#
# URLs
#
def internal_url(path):
return request.url_root + path
def root_url(path = None):
return request.url_root + ("" if path is None else path)
def make_external(url):
return urljoin(request.url_root, url)
def href_for(self, operation, qs=None, **kwargs):
"""
Construct an full href for an operation against a resource.
:parm qs: the query string dictionary, if any
:param kwargs: additional arguments for path expansion
"""
url = urljoin(request.url_root, self.url_for(operation, **kwargs))
qs_character = "?" if url.find("?") == -1 else "&"
return "{}{}".format(
url,
"{}{}".format(qs_character, urlencode(qs)) if qs else "",
)
def feed():
"""Return an atom feed for the blog."""
feed = AtomFeed(
'%s: Recent Posts' % app.config.get('SITENAME', 'akamatsu'),
feed_url=request.url,
url=request.url_root
)
posts = (
Post.query
.filter_by(is_published=True, ghost='')
.order_by(Post.timestamp.desc())
.limit(15)
)
for post in posts:
# unicode conversion is needed for the content
feed.add(
post.title,
markdown.render(post.content).unescape(),
content_type='html',
author=post.author.username,
url=url_for('blog.show', slug=post.slug, _external=True),
updated=post.timestamp
)
return feed.get_response()
def get_sign_in_view(target):
signin_url = request.url_root + target
oauth_service = OAuth2Service(
name="google",
client_id=current_app.config["GOOGLE_LOGIN_CLIENT_ID"],
client_secret=current_app.config["GOOGLE_LOGIN_CLIENT_SECRET"],
authorize_url=google_params.get("authorization_endpoint"),
base_url=google_params.get("userinfo_endpoint"),
access_token_url=google_params.get("token_endpoint"))
if "code" in request.args:
oauth_session = oauth_service.get_auth_session(
data={"code": request.args["code"],
"grant_type": "authorization_code",
"redirect_uri": signin_url},
decoder=json.loads)
user_data = oauth_session.get("").json()
user = load_user(user_data["email"])
if user:
flask_login.login_user(user)
return redirect(url_for("index"))
else:
error_message = "Not an authorized user ({})".format(user_data["email"])
return render_template("/sign_in.html", error_message=error_message)
elif "authorize" in request.args:
return redirect(oauth_service.get_authorize_url(
scope="email",
response_type="code",
prompt="select_account",
redirect_uri=signin_url))
else:
return render_template("/sign_in.html")
def get_next_url(self):
"""Returns the URL where we want to redirect to. This will
always return a valid URL.
"""
return (
self.check_safe_root(request.values.get('next')) or
self.check_safe_root(request.referrer) or
(self.fallback_endpoint and
self.check_safe_root(url_for(self.fallback_endpoint))) or
request.url_root
)
def check_safe_root(self, url):
if url is None:
return None
if self.safe_roots is None:
return url
if url.startswith(request.url_root) or url.startswith('/'):
# A URL inside the same app is deemed to always be safe
return url
for safe_root in self.safe_roots:
if url.startswith(safe_root):
return url
return None
def csrf_protect():
if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
referer = request.headers.get('Referer')
if referer is None or different_origin(referer, request.url_root):
raise Forbidden(description="Referer check failed.")
def required(self, price, **kwargs):
"""API route decorator to request payment for a resource.
This function stores the resource price in a closure. It will verify
the validity of a payment, and allow access to the resource if the
payment is successfully accepted.
"""
def decorator(fn):
"""Validates payment and returns the original API route."""
@wraps(fn)
def _fn(*fn_args, **fn_kwargs):
# Calculate resource cost
nonlocal price
_price = price(request, *fn_args, **fn_kwargs) if callable(price) else price
# Need better way to pass server url to payment methods (FIXME)
if 'server_url' not in kwargs:
url = urlparse(request.url_root)
kwargs.update({'server_url': url.scheme + '://' + url.netloc})
# Continue to the API view if payment is valid or price is 0
if _price == 0:
return fn(*fn_args, **fn_kwargs)
try:
contains_payment = self.contains_payment(_price, request.headers, **kwargs)
except BadRequest as e:
return Response(e.description, BAD_REQUEST)
if contains_payment:
return fn(*fn_args, **fn_kwargs)
else:
# Get headers for initial 402 response
payment_headers = {}
for method in self.allowed_methods:
payment_headers.update(method.get_402_headers(_price, **kwargs))
# Accessing the .files attribute of a request
# drains the input stream.
request.files
raise PaymentRequiredException(payment_headers)
return _fn
return decorator
def siteURL(config,request):
u = current_app.config.get('SITE_URL')
return u if u is not None else request.url_root[0:-1]
def page_not_found(error):
return render_template_string(generate_template(current_app.config,'error.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], path=request.path, entry=None, error="I'm sorry. I can't find that page.")
def send_doc(path):
siteURL = current_app.config.get('SITE_URL')
location = current_app.config.get('DOCS')
if location is None:
abort(404)
if location[0:4]=='http':
url = location + path
req = requests.get(url, stream = True,headers={'Connection' : 'close'})
if req.headers['Content-Type'][0:9]=='text/html':
return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=req.text, entry=None)
else:
return Response(stream_with_context(req.iter_content()), headers = dict(req.headers))
else:
dir = os.path.abspath(location)
if path.endswith('.html'):
glob = StringIO()
try:
with open(os.path.join(dir,path), mode='r', encoding='utf-8') as doc:
peeked = doc.readline()
if peeked.startswith('<!DOCTYPE'):
return send_from_directory(dir, path)
glob.write(peeked)
for line in doc:
glob.write(line)
return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=glob.getvalue(), entry=None)
except FileNotFoundError:
abort(404)
return send_from_directory(dir, path)
def ipxe_boot(node):
response = config_renderer.ipxe.render(node, request.url_root)
return Response(response, mimetype='text/plain')
def report(node):
if not node.maintenance_mode:
try:
node.active_config_version = int(request.args.get('version'))
except (ValueError, TypeError):
return abort(400)
if request.content_type != 'application/json':
return abort(400)
provision = models.Provision()
provision.node = node
provision.config_version = node.active_config_version
provision.ignition_config = request.data
if node.target_config_version == node.active_config_version:
provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root)
models.db.session.add(provision)
models.db.session.add(node)
node.disks.update({
models.Disk.wipe_next_boot: False
})
if node.cluster.are_etcd_nodes_configured:
node.cluster.assert_etcd_cluster_exists = True
models.db.session.add(node.cluster)
models.db.session.commit()
return Response('ok', mimetype='application/json')
def get_content(self):
packages = [P(self.node, request.url_root) for P in self.get_package_classes()]
files = list(itertools.chain.from_iterable(p.get_files() for p in packages))
units = list(itertools.chain.from_iterable(p.get_units() for p in packages))
networkd_units = list(itertools.chain.from_iterable(p.get_networkd_units() for p in packages))
ssh_keys = self.get_ssh_keys()
return {
'ignition': {
'version': '2.0.0',
'config': {},
},
'storage': self.get_storage_config(files),
'networkd': {
'units': networkd_units
},
'passwd': {
'users': [{
'name': 'root',
'sshAuthorizedKeys': ssh_keys,
}, {
'name': 'core',
'sshAuthorizedKeys': ssh_keys,
}],
},
'systemd': {
'units': units
},
}
def target_ipxe_config_view(self):
node = self.get_one(request.args.get('id'))
response = config_renderer.ipxe.render(node, request.url_root)
return Response(response, mimetype='text/plain')
def handle_cozmoImage():
if is_microsoft_browser(request):
return serve_single_image()
return flask_helpers.stream_video(streaming_video, request.url_root)
def login():
if g.auth:
return redirect(url_for("index"))
else:
query = {"sso": True,
"sso_r": SpliceURL.Modify(request.url_root, "/sso/").geturl,
"sso_p": SSO["SSO.PROJECT"],
"sso_t": md5("%s:%s" %(SSO["SSO.PROJECT"], SpliceURL.Modify(request.url_root, "/sso/").geturl))
}
SSOLoginURL = SpliceURL.Modify(url=SSO["SSO.URL"], path="/login/", query=query).geturl
logger.info("User request login to SSO: %s" %SSOLoginURL)
return redirect(SSOLoginURL)