def logout():
SSOLogoutURL = SSO.get("SSO.URL") + "/sso/?nextUrl=" + request.url_root.strip("/")
resp = make_response(redirect(SSOLogoutURL))
resp.set_cookie(key='logged_in', value='', expires=0)
resp.set_cookie(key='username', value='', expires=0)
resp.set_cookie(key='sessionId', value='', expires=0)
resp.set_cookie(key='time', value='', expires=0)
resp.set_cookie(key='Azone', value='', expires=0)
return resp
python类url_root()的实例源码
def atom():
feed = AtomFeed(title='Recent rules', feed_url=request.url, url=request.url_root, author='Spike',
icon=url_for('static', filename='favicon.ico'))
_rules = NaxsiRules.query.order_by(NaxsiRules.sid.desc()).limit(15).all()
if _rules:
for rule in _rules:
feed.add(rule.msg, str(rule), updated=datetime.fromtimestamp(rule.timestamp), id=rule.sid)
return feed.get_response()
def authorize():
questradeAPI = OAuth2Session(client_id, redirect_uri=__get_redirect_uri__(request.url_root))
user_authorization_url, state = questradeAPI.authorization_url(authorization_url)
session['oauth_state'] = state
return redirect(user_authorization_url)
def callback():
questradeAPI = OAuth2Session(client_id, redirect_uri=__get_redirect_uri__(request.url_root), state=session['oauth_state'])
token = questradeAPI.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url)
__set_session_token__(token)
return redirect(url_for('.token'))
def make_blog_feed(order=None, limit=15):
feed = AtomFeed(
title="{} - Blog Feed".format(settings.title),
subtitle=settings.tagline,
feed_url=request.url,
url=request.url_root,
author="Musharraf Omer",
icon=None,
logo=None,
rights="Copyright 2000-2016 - Mushy.ltd",
)
order = order or Post.publish_date.desc()
items = Post.query.order_by(order).limit(limit).all()
for item in items:
item.url = url_for('canella-blog.post', slug=item.slug)
feed.add(
title=item.title,
url=item.url,
content=make_summary(item.body),
content_type='html',
summary=item.meta_description,
updated=item.updated or item.created,
author="Musharraf Omer",
published=item.publish_date,
categories=[{'term': t.slug, 'label': t.title} for t in item.tags]
)
return feed
def delhistory():
if not str(request.referrer).startswith(request.url_root):
#app.logger.info("referer:", str(request.referrer))
return "CSRF ATTEMPT!"
os.unlink('ua-history.txt')
return "ok"
def _preemptive_unless(base_url=None, additional_unless=None):
if base_url is None:
base_url = request.url_root
disabled_for_root = not settings().getBoolean(["devel", "cache", "preemptive"]) \
or base_url in settings().get(["server", "preemptiveCache", "exceptions"]) \
or not (base_url.startswith("http://") or base_url.startswith("https://"))
recording_disabled = request.headers.get("X-Preemptive-Record", "yes") == "no"
if callable(additional_unless):
return recording_disabled or disabled_for_root or additional_unless()
else:
return recording_disabled or disabled_for_root
def _preemptive_data(key, path=None, base_url=None, data=None, additional_request_data=None):
if path is None:
path = request.path
if base_url is None:
base_url = request.url_root
d = dict(path=path,
base_url=base_url,
query_string="l10n={}".format(g.locale.language if g.locale else "en"))
if key != "_default":
d["plugin"] = key
# add data if we have any
if data is not None:
try:
if callable(data):
data = data()
if data:
if "query_string" in data:
data["query_string"] = "l10n={}&{}".format(g.locale.language, data["query_string"])
d.update(data)
except:
_logger.exception("Error collecting data for preemptive cache from plugin {}".format(key))
# add additional request data if we have any
if callable(additional_request_data):
try:
ard = additional_request_data()
if ard:
d.update(dict(
_additional_request_data=ard
))
except:
_logger.exception("Error retrieving additional data for preemptive cache from plugin {}".format(key))
return d
def dict(self):
_id = str(self._id)
return {
"_id": _id,
"name": self.name,
"email": self.email,
"links": {
"self": "{}api/rsvps/{}".format(request.url_root, _id)
}
}
def set_webhook():
"""
Sets the BicingBot webhook in its Telegram Bot
:return: HTTP_RESPONSE with 200 OK status and a status message.
"""
response = 'Webhook configured'
if request.url_root.startswith('https'):
bot_response = get_bot().setWebhook('{}/bicingbot'.format(request.url_root))
logger.debug(bot_response)
else:
response = 'Bad webhook: https url must be provided for webhook'
logger.warn(response)
return response
def ImageUrlToFile(image_url):
"""images are stored as ad-213213.png in the db. We get them from the website as
/static/upload-debug/ad-213213.png so we trim them.
Returns True, filepath"""
BAD_RETURN = False, ''
prefix = request.url_root + "static/%s/" % app.config[Constants.KEY_UPLOAD_DIR]
if not image_url.startswith(prefix):
return BAD_RETURN
a = image_url.split('/')
if not a or (image_url != prefix + a[-1]):
return BAD_RETURN
return True, a[-1]
def FileToImageUrl(image_file):
return request.url_root + "static/%s/%s" % (app.config[Constants.KEY_UPLOAD_DIR], image_file)
def feeds_blogs():
"""Global feed generator for latest blogposts across all projects"""
@current_app.cache.cached(60*5)
def render_page():
feed = AtomFeed('Blender Cloud - Latest updates',
feed_url=request.url, url=request.url_root)
# Get latest blog posts
api = system_util.pillar_api()
latest_posts = Node.all({
'where': {'node_type': 'post', 'properties.status': 'published'},
'embedded': {'user': 1},
'sort': '-_created',
'max_results': '15'
}, api=api)
# Populate the feed
for post in latest_posts._items:
author = post.user.fullname
updated = post._updated if post._updated else post._created
url = url_for_node(node=post)
content = post.properties.content[:500]
content = '<p>{0}... <a href="{1}">Read more</a></p>'.format(content, url)
feed.add(post.name, str(content),
content_type='html',
author=author,
url=url,
updated=updated,
published=post._created)
return feed.get_response()
return render_page()
def get_next_url(self):
"""Returns the URL where we want to redirect to. This will
always return a valid URL.
"""
return (
self.check_safe_root(request.values.get('next')) or
self.check_safe_root(request.referrer) or
(self.fallback_endpoint and
self.check_safe_root(url_for(self.fallback_endpoint))) or
request.url_root
)
def check_safe_root(self, url):
if url is None:
return None
if self.safe_roots is None:
return url
if url.startswith(request.url_root) or url.startswith('/'):
# A URL inside the same app is deemed to always be safe
return url
for safe_root in self.safe_roots:
if url.startswith(safe_root):
return url
return None
def insert_bucket(project, conn):
""" Creates a new bucket.
Args:
project: A string specifying a project ID.
conn: An S3Connection instance.
Returns:
A JSON string representing a bucket.
"""
bucket_info = request.get_json()
# TODO: Do the following lookup and create under a lock.
if conn.lookup(bucket_info['name']) is not None:
return error('Sorry, that name is not available. '
'Please try a different one.', HTTP_CONFLICT)
index_bucket(bucket_info['name'], project)
conn.create_bucket(bucket_info['name'])
# The HEAD bucket request does not return creation_date. This is an
# inefficient way of retrieving it.
try:
bucket = next(bucket for bucket in conn.get_all_buckets()
if bucket.name == bucket_info['name'])
except StopIteration:
return error('Unable to find bucket after creating it.')
bucket_url = url_for('get_bucket', bucket_name=bucket.name)
response = {
'kind': 'storage#bucket',
'id': bucket.name,
'selfLink': request.url_root[:-1] + bucket_url,
'name': bucket.name,
'timeCreated': bucket.creation_date,
'updated': bucket.creation_date
}
return Response(json.dumps(response), mimetype='application/json')
def get_bucket(bucket_name, conn):
""" Returns metadata for the specified bucket.
Args:
bucket_name: A string specifying a bucket name.
conn: An S3Connection instance.
Returns:
A JSON string representing a bucket.
"""
projection = request.args.get('projection') or 'noAcl'
if projection != 'noAcl':
return error('projection: {} not supported.'.format(projection),
HTTP_NOT_IMPLEMENTED)
try:
bucket = next(bucket for bucket in conn.get_all_buckets()
if bucket.name == bucket_name)
except StopIteration:
return error('Not Found', HTTP_NOT_FOUND)
bucket_url = url_for('get_bucket', bucket_name=bucket.name)
response = {
'kind': 'storage#bucket',
'id': bucket.name,
'selfLink': request.url_root[:-1] + bucket_url,
'name': bucket.name,
'timeCreated': bucket.creation_date,
'updated': bucket.creation_date
}
return Response(json.dumps(response), mimetype='application/json')
def get(self, provider):
if provider == 'github':
return github.authorize(callback=request.url_root + 'api/v1/auth/callback/github')
raise ProviderInvalid(provider)
def _url_for(self, path):
return request.url_root + self.api_prefix + path
def index_discovery():
host = request.url_root
domain = request.headers['Host']
return """<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="appr-package" content="{domain}/{{name}} {host}/appr/api/v1/packages/{{name}}/pull">
</head>
<body>
</body>
</html>""".format(domain=domain, host=host)