def review(title):
"""
This URL only exists for legacy reasons so try to find the article where
it is in the new scheme and return 301 to indicate moved.
"""
branch = request.args.get('branch', u'master')
article = models.search_for_article(title)
if article is not None:
return redirect(filters.url_for_article(article, branch=branch), 301)
return missing_article(request.base_url, title=title, branch=branch)
# Note this URL is directly linked to the filters.url_for_article filter.
# These must be changed together!
python类base_url()的实例源码
def headerize(func):
"""The decorator adds header links to response for paginator"""
@wraps(func)
def wrapper(*args, **kwargs):
data = func(*args, **kwargs)
resp = jsonify(data)
fmt = '<{base}?page={page}&per_page={per_page}>; rel="{rel}"'
links = [{'page': data['page'] - 1, 'rel':'prev'},
{'page': data['page'] + 1, 'rel':'next'},
{'page': 1, 'rel': 'first'},
{'page': data['num_pages'], 'rel':'last'}]
header = ', '.join(fmt.format(base=request.base_url,
page=i['page'],
per_page=data['per_page'],
rel=i['rel'])
for i in links)
resp.headers.extend({'Link': header})
return resp
return wrapper
def __init__(self, data):
self._structure = data
self._dispatcher = {
'get': requests.get,
'post': requests.post,
'put': requests.put,
'delete': requests.delete}
self._get_system_settings()
dgst = data.get('password-digest')
if (dgst is not None and isinstance(dgst, basestring) and
hasattr(hashlib, dgst.lower())):
m = operator.methodcaller(dgst.lower(),
self.billing_password)(hashlib)
self.billing_password = m.hexdigest()
_url = urlparse(request.base_url)
self.master_url = '{0}://{1}'.format(_url.scheme, _url.netloc)
def spark_application(app_id):
"""Mock of the Spark jobs REST resource."""
if 'last' in request.args:
return jsonify(redis.get(request.base_url))
d = st.fixed_dictionaries({
'jobId': st.integers(0),
'name': st.text(),
'submissionTime': st.text(),
'completionTime': st.text(),
'stageIds': st.lists(st.integers(0), average_size=3),
'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']),
'numTasks': st.integers(0),
'numActiveTasks': st.integers(0),
'numCompletedTasks': st.integers(0),
'numSkippedTasks': st.integers(0),
'numFailedTasks': st.integers(0),
'numActiveStages': st.integers(0),
'numCompletedStages': st.integers(0),
'numSkippedStages': st.integers(0),
'numFailedStages': st.integers(0),
})
result = json.dumps(st.lists(d, average_size=3).example())
redis.set(request.base_url, result)
return jsonify(result)
def _cache_key(ui, url=None, locale=None, additional_key_data=None):
if url is None:
url = request.base_url
if locale is None:
locale = g.locale.language if g.locale else "en"
k = "ui:{}:{}:{}".format(ui, url, locale)
if callable(additional_key_data):
try:
ak = additional_key_data()
if ak:
# we have some additional key components, let's attach them
if not isinstance(ak, (list, tuple)):
ak = [ak]
k = "{}:{}".format(k, ":".join(ak))
except:
_logger.exception("Error while trying to retrieve additional cache key parts for ui {}".format(ui))
return k
def process_request():
"""
Process request.
- Set api_url
"""
base_url = request.base_url
referrer = request.headers.get('referer')
if referrer:
# we use referrer as base url
parts = urlparse(referrer)
base_url = urlunparse((parts.scheme, parts.netloc, '', '', '', ''))
elif APP_URL:
base_url = APP_URL
# Used in building full URIs
request.api_url = urljoin(base_url, API_PREFIX + '/')
request.user = flask_session.get('user')
request.realm = flask_session.get('realm', 'employees')
def verify_token(token):
""" Verify the supplied token and check user role is correct for the requested resource"""
if not token:
current_app.logger.debug(f'Token not supplied {request.base_url}')
return False
try:
decoded_token = base64.b64decode(token).decode('utf-8')
except UnicodeDecodeError:
current_app.logger.debug(f'Unable to decode token {request.base_url}')
return False # Can't decode token, so fail login
valid_token, user_id = AuthenticationService.is_valid_token(decoded_token, 604800)
if not valid_token:
current_app.logger.debug(f'Token not valid {request.base_url}')
return False
if tm.is_pm_only_resource:
if not UserService.is_user_a_project_manager(user_id):
current_app.logger.debug(f'User {user_id} is not a PM {request.base_url}')
return False
tm.authenticated_user_id = user_id # Set the user ID on the decorator as a convenience
return True # All tests passed token is good for the requested resource
def before_request():
request.start_time = datetime.now()
# @bp.after_request
# def after_request(resp):
# try:
# if '_' in request.endpoint:
# dbcon = influx_db.connection
# point = [{"measurement": config.APP_NAME,
# "tags": {"method": request.method, "status": resp.status_code, "endpoint":
# request.endpoint},
# "fields": {"base_url": request.base_url, "remote_address": request.remote_addr,
# 'response_time': (datetime.now() - request.start_time).microseconds}}]
# dbcon.write_points(point)
# except Exception as e:
# pass
# logger.debug('Write api statistics data to influxdb failed, error?' + e.message)
# return resp
def forbidden(message="Forbidden."):
log.debug("Response 403: %s", message)
log.info("Denied %s %s %s", request.remote_addr, request.method, request.base_url)
return Response(message + "\n", status=403, mimetype="text/plain")
def base_url(path = None):
return request.base_url + ("" if path is None else "/" + path)
def index():
return jsonify(name='Payments REST API Service',
version='1.0',
docs=request.base_url + 'apidocs/index.html',
site=request.base_url + 'payments'), status.HTTP_200_OK
######################################################################
# LIST ALL PAYMENTS
######################################################################
def partner(article_path):
"""
URL for articles from hackhands blog -- these articles are not
editable.
"""
try:
repo_path = '%s/%s' % (app.config['SECONDARY_REPO_OWNER'],
app.config['SECONDARY_REPO_NAME'])
except KeyError:
flash('No secondary guide configuration', category='error')
return redirect(url_for('index'))
if article_path is None:
articles = models.get_available_articles(status=PUBLISHED,
repo_path=repo_path)
return render_template('review.html', articles=articles)
article = models.read_article(article_path, repo_path=repo_path)
if article is None:
flash('Failed reading guide', category='error')
return redirect(url_for('index'))
# Use http as canonical protocol for url to avoid having two separate
# comment threads for an article. Disqus uses this variable to save
# comments.
canonical_url = request.base_url.replace('https://', 'http://')
form = forms.SignupForm()
return render_template('article.html',
article=article,
allow_edits=False,
canonical_url=canonical_url,
form=form,
disclaimer=True)
def _finish_span(self, response=None, exception=None):
""" Close and finish the active span if it exists. """
span = getattr(g, 'flask_datadog_span', None)
if span:
if span.sampled:
error = 0
code = response.status_code if response else None
method = request.method if request else None
# if we didn't get a response, but we did get an exception, set
# codes accordingly.
if not response and exception:
code = 500
# The 3 next lines might not be strictly required, since `set_traceback`
# also get the exception from the sys.exc_info (and fill the error meta).
# Since we aren't sure it always work/for insuring no BC break, keep
# these lines which get overridden anyway.
error = 1
span.set_tag(errors.ERROR_TYPE, type(exception))
span.set_tag(errors.ERROR_MSG, exception)
# The provided `exception` object doesn't have a stack trace attached,
# so attach the stack trace with `set_traceback`.
span.set_traceback()
# the endpoint that matched the request is None if an exception
# happened so we fallback to a common resource
resource = code if not request.endpoint else request.endpoint
span.resource = compat.to_unicode(resource).lower()
span.set_tag(http.URL, compat.to_unicode(request.base_url or ''))
span.set_tag(http.STATUS_CODE, code)
span.set_tag(http.METHOD, method)
span.error = error
span.finish()
# Clear our span just in case.
g.flask_datadog_span = None
# Request hook methods
def after_request(response):
""" called after every request """
# log the endpoint hit and any errors
delta = int((time.time() - g.start_time) * 1000)
start_utc = datetime.datetime.utcfromtimestamp(g.start_time)
username = request.authorization.username if request.authorization else None
err_msg = response.get_data(as_text=True) if response.status_code // 100 >= 4 else None
Logger.endpoint_hit(start_utc, delta, request.base_url, username, request.method,
response.status_code, err_msg)
return response
def create_link_string(page, last_page, per_page):
"""Returns a string representing the value of the ``Link`` header.
`page` is the number of the current page, `last_page` is the last page in
the pagination, and `per_page` is the number of results per page.
"""
linkstring = ''
if page < last_page:
next_page = page + 1
linkstring = LINKTEMPLATE.format(request.base_url, next_page,
per_page, 'next') + ', '
linkstring += LINKTEMPLATE.format(request.base_url, last_page,
per_page, 'last')
return linkstring
def get_current_url(self):
"""the current URL + next."""
return request.base_url + '?next=' + url_quote(self.get_next_url())
def metrics():
"""Mock of the YARN cluster metrics REST resource."""
if 'last' in request.args:
return jsonify(redis.get(request.base_url))
d = st.fixed_dictionaries({
'activeNodes': st.integers(0),
'allocatedMB': st.integers(0),
'allocatedVirtualCores': st.integers(0),
'appsCompleted': st.integers(0),
'appsFailed': st.integers(0),
'appsKilled': st.integers(0),
'appsPending': st.integers(0),
'appsRunning': st.integers(0),
'appsSubmitted': st.integers(0),
'availableMB': st.integers(0),
'availableVirtualCores': st.integers(0),
'containersAllocated': st.integers(0),
'containersPending': st.integers(0),
'containersReserved': st.integers(0),
'decommissionedNodes': st.integers(0),
'lostNodes': st.integers(0),
'rebootedNodes': st.integers(0),
'reservedMB': st.integers(0),
'reservedVirtualCores': st.integers(0),
'totalMB': st.integers(0),
'totalNodes': st.integers(0),
'totalVirtualCores': st.integers(0),
'unhealthyNodes': st.integers(0)
})
result = json.dumps({
'clusterMetrics': d.example()
})
redis.set(request.base_url, result)
return jsonify(result)
def get(self, uri=None):
urlbase = request.base_url
return dict([(urls[i+1].__name__.split(".")[-1].lower(),
urlbase + urls[i])
for i in range(len(urls))[::2]])
def home():
html = 'HERE CAN BE PHISHING PAGE FOR {}'.format(request.base_url)
return html
def index():
#wishlist_url = request.base_url + 'wishlists'
#return (jsonify(service='wishlists', version='0.1',
# url=wishlist_url), HTTP_200_OK)
return app.send_static_file('index.html')
def index():
#wishlist_url = request.base_url + 'wishlists'
#return (jsonify(service='wishlists', version='0.1',
# url=wishlist_url), HTTP_200_OK)
return app.send_static_file('index.html')
def _preemptive_unless(base_url=None, additional_unless=None):
if base_url is None:
base_url = request.url_root
disabled_for_root = not settings().getBoolean(["devel", "cache", "preemptive"]) \
or base_url in settings().get(["server", "preemptiveCache", "exceptions"]) \
or not (base_url.startswith("http://") or base_url.startswith("https://"))
recording_disabled = request.headers.get("X-Preemptive-Record", "yes") == "no"
if callable(additional_unless):
return recording_disabled or disabled_for_root or additional_unless()
else:
return recording_disabled or disabled_for_root
def _preemptive_data(key, path=None, base_url=None, data=None, additional_request_data=None):
if path is None:
path = request.path
if base_url is None:
base_url = request.url_root
d = dict(path=path,
base_url=base_url,
query_string="l10n={}".format(g.locale.language if g.locale else "en"))
if key != "_default":
d["plugin"] = key
# add data if we have any
if data is not None:
try:
if callable(data):
data = data()
if data:
if "query_string" in data:
data["query_string"] = "l10n={}&{}".format(g.locale.language, data["query_string"])
d.update(data)
except:
_logger.exception("Error collecting data for preemptive cache from plugin {}".format(key))
# add additional request data if we have any
if callable(additional_request_data):
try:
ard = additional_request_data()
if ard:
d.update(dict(
_additional_request_data=ard
))
except:
_logger.exception("Error retrieving additional data for preemptive cache from plugin {}".format(key))
return d
def next_url(limit=None, offset=None):
limit = limit or request.values.get('limit', 30, type=int)
offset = offset or request.values.get('offset', 0, type=int)
values = request.values.to_dict()
values['offset'] = limit + offset
return request.base_url + '?' + urllib.urlencode(values)
def _get_email_validated_url(is_valid: bool) -> str:
""" Helper function to generate redirect url for email verification """
base_url = current_app.config['APP_BASE_URL']
verification_params = {'is_valid': is_valid}
verification_url = '{0}/validate-email?{1}'.format(base_url, urllib.parse.urlencode(verification_params))
return verification_url
def get_authentication_failed_url():
""" Generates the auth-failed URL for the running app """
base_url = current_app.config['APP_BASE_URL']
auth_failed_url = f'{base_url}/auth-failed'
return auth_failed_url
def generate_authorized_url(username, session_token, redirect_to):
""" Generate URL that we'll redirect the user to once authenticated """
base_url = current_app.config['APP_BASE_URL']
redirect_query = ''
if redirect_to:
redirect_query = f'&redirect_to={urllib.parse.quote(redirect_to)}'
# Trailing & added as Angular a bit flaky with parsing querystring
authorized_url = f'{base_url}/authorized?username={urllib.parse.quote(username)}&session_token={session_token}&ng=0' \
f'{redirect_query}'
return authorized_url
def get_current_url(self):
"""the current URL + next."""
return request.base_url + '?next=' + url_quote(self.get_next_url())
def get_current_url(self):
"""the current URL + next."""
return request.base_url + '?next=' + url_quote(self.get_next_url())
def web_node_all():
nodes_info = BP.nodes
for node in nodes_info:
status = get_node_status(node.coordinate)
if status:
node.manifest = status['manifest']
node.status = status['status']
return render_template(
_ERS_element + '_all.tpl',
label=__doc__,
nodes=BP.nodes,
base_url=request.url)