def build_menu_item(endpoint, name):
is_active = "active" if request.endpoint == endpoint else ""
return Markup(
f'<li class="nav-item {is_active}">'
f'<a class="nav-link" href="{url_for(endpoint)}">{name}</a>'
f'</li>'
)
python类endpoint()的实例源码
def build_menu_item_as_filter(endpoint):
is_active = "active" if request.endpoint == endpoint else ""
return Markup(
f'<li class="nav-item {is_active}">'
f'<a class="nav-link" href="{url_for(endpoint)}">{endpoint.upper()}</a>'
f'</li>'
)
def url_for_other_page(page):
args = request.view_args.copy()
args.update(request.args)
args['page'] = page
return url_for(request.endpoint, **args)
def navbar():
try:
return dict(navbar_pages=navbar_pages,
active=request.endpoint)
except RuntimeError:
return {} # maybe we don't care
def sort_link(order):
args = request.view_args.copy()
args['sort'] = order
return url_for(request.endpoint, **args)
def url_for_other_page(page):
args = request.view_args.copy()
args.update(request.args)
args['page'] = page
return url_for(request.endpoint, **args)
def before_request():
if current_user.is_authenticated:
current_user.ping()
if not current_user.confirmed \
and request.endpoint[:5] != 'auth.' \
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
def ask_login():
if not current_user.is_anonymous:
return
if getattr(current_app.view_functions[request.endpoint], 'no_login', False):
return
return redirect(url_for('ares.aresLogin', next=request.endpoint))
def ask_login():
if not current_user.is_anonymous:
return
if getattr(current_app.view_functions[request.endpoint], 'no_login', False):
return
return redirect(url_for('ares.aresLogin', next=request.endpoint))
def app_after_request(response):
if request.endpoint != 'static':
return response
response.cache_control.max_age = 15552000
return response
# jinja_env
def _finish_span(self, response=None, exception=None):
""" Close and finish the active span if it exists. """
span = getattr(g, 'flask_datadog_span', None)
if span:
if span.sampled:
error = 0
code = response.status_code if response else None
method = request.method if request else None
# if we didn't get a response, but we did get an exception, set
# codes accordingly.
if not response and exception:
code = 500
# The 3 next lines might not be strictly required, since `set_traceback`
# also get the exception from the sys.exc_info (and fill the error meta).
# Since we aren't sure it always work/for insuring no BC break, keep
# these lines which get overridden anyway.
error = 1
span.set_tag(errors.ERROR_TYPE, type(exception))
span.set_tag(errors.ERROR_MSG, exception)
# The provided `exception` object doesn't have a stack trace attached,
# so attach the stack trace with `set_traceback`.
span.set_traceback()
# the endpoint that matched the request is None if an exception
# happened so we fallback to a common resource
resource = code if not request.endpoint else request.endpoint
span.resource = compat.to_unicode(resource).lower()
span.set_tag(http.URL, compat.to_unicode(request.base_url or ''))
span.set_tag(http.STATUS_CODE, code)
span.set_tag(http.METHOD, method)
span.error = error
span.finish()
# Clear our span just in case.
g.flask_datadog_span = None
# Request hook methods
def before_request():
if current_user.is_authenticated\
and not current_user.confirmed \
and request.endpoint[:5] != 'auth.' \
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
def test_index(app, bitmap, client):
with app.test_request_context('/bitmapist/'):
assert request.endpoint == 'bitmapist.index'
def test_cohort(app, bitmap, client):
with app.test_request_context('/bitmapist/cohort'):
assert request.endpoint == 'bitmapist.cohort'
def allow_public_endpoints_only():
public_endpoints = (HealthCheck.__name__.lower(),)
if g.endpoint not in public_endpoints:
abort(401)
def set_name_for_reseller(reseller_id):
if not reseller_id:
return None
if g.endpoint == ApplicationList.__name__.lower():
return generate_reseller_name()
return get_reseller_name(reseller_id)
def get_reseller_info():
reseller_id = request.headers.get('Aps-Instance-Id')
is_new = g.endpoint == ApplicationList.__name__.lower()
reseller_name = set_name_for_reseller(reseller_id)
oauth = get_oauth()
return ResellerInfo(id=reseller_id, name=reseller_name, is_new=is_new, auth=oauth)
def before_request():
g.log = dict()
g.log['out'] = list()
g.log['request'] = log_request(request)
g.endpoint = request.endpoint
if request.blueprint:
g.endpoint = g.endpoint[len(request.blueprint):].lstrip('.')
reseller_info = get_reseller_info()
g.reseller_name = reseller_info.name
g.company_name = 'N/A'
if not reseller_info.name:
allow_public_endpoints_only()
return
if not check_oauth_signature(request):
abort(401)
g.auth = reseller_info.auth
g.reseller = Reseller(reseller_info.name, reseller_info.id, None)
g.reseller.refresh()
if not g.reseller.token and not reseller_info.is_new:
abort(403)