def navigate(self, text, description=None):
"""A decorator to add a navigation menu entry for the actual view func.
This is a decorator like the "route()" from `flask.Flask` or
`flask.Blueprint`. It register a navigation menu entry for the
current view function.
The text argument is used as menu-text and the description sets an
optional anchor-title.
:param text: The menu entry text.
:param description: a anchor title.
"""
def decorator(f):
element = self._navigation_item(
bake_endpoint(self.blueprint, f),
text,
description)
self._elements.append(element)
return f
return decorator
python类blueprint()的实例源码
def register_on(self, app):
"""This registers the `BlueprintNavigation` for the Flask app.
This uses a `list` in the flask config object to register the
navigation. The key for the `list` is "blueprint_navigation".
The template iterates over this list and renders the navigation
elements. The order the elements are registered is the order the
elements will be shown.
:param app: A `flask.Flask` application instance.
"""
if self.blueprint.name not in app.blueprints:
raise Exception("Blueprint %s is not registred in Flask app" %
self.blueprint.name)
else:
assert app.blueprints[self.blueprint.name] is self.blueprint, \
"Blueprint resistred as %s in Flask app is not the one you " \
"register navigation for!"
if "blueprint_navigation" not in app.config:
app.config["blueprint_navigation"] = list()
app.config["blueprint_navigation"].append(self)
def __init__(self, rate_limiter):
super(LogApiCallCount, self).__init__()
if self.stats:
tags = ['version:'+str(Config.API_VERSION_MINOR),
'application:rest-api',
'api-key:'+rate_limiter._auth_key.id,
'method:'+request.environ['REQUEST_METHOD'],
'endpoint:'+request.url_rule.rule.split('<')[0].split(request.blueprint)[1]]
ip = get_remote_addr()
self.stats.track(get_remote_addr(),
'api_call_count',
properties=dict(tags=tags,
ip=ip,
ignore_time = True,
))
# self.stats.increment('api.call.count',
# tags=tags,)
def hashed_url_for_static_file(endpoint, values):
if 'static' == endpoint or endpoint.endswith('.static'):
filename = values.get('filename')
if filename:
if '.' in endpoint: # has higher priority
blueprint = endpoint.rsplit('.', 1)[0]
else:
blueprint = request.blueprint # can be None too
if blueprint:
static_folder = app.blueprints[blueprint].static_folder
else:
static_folder = app.static_folder
param_name = 'h'
while param_name in values:
param_name = '_' + param_name
values[param_name] = static_file_hash(os.path.join(static_folder, filename))
###########################################################################
# Routes ##################################################################
###########################################################################
def exempt(self, view):
"""Mark a view or blueprint to be excluded from CSRF protection.
::
@app.route('/some-view', methods=['POST'])
@csrf.exempt
def some_view():
...
::
bp = Blueprint(...)
csrf.exempt(bp)
"""
if isinstance(view, Blueprint):
self._exempt_blueprints.add(view.name)
return view
if isinstance(view, string_types):
view_location = view
else:
view_location = '%s.%s' % (view.__module__, view.__name__)
self._exempt_views.add(view_location)
return view
def exempt(self, view):
"""Mark a view or blueprint to be excluded from CSRF protection.
::
@app.route('/some-view', methods=['POST'])
@csrf.exempt
def some_view():
...
::
bp = Blueprint(...)
csrf.exempt(bp)
"""
if isinstance(view, Blueprint):
self._exempt_blueprints.add(view.name)
return view
if isinstance(view, string_types):
view_location = view
else:
view_location = '%s.%s' % (view.__module__, view.__name__)
self._exempt_views.add(view_location)
return view
def init_app(self, app):
"""
Initializes extension with app
:param app: Flask application instance
:type app: flask.Flask
"""
if app.debug: # don't mess with debug
return
@app.url_defaults
def hashed_url_for_static_file(endpoint, values):
if 'static' == endpoint or endpoint.endswith('.static'):
filename = values.get('filename')
if filename:
# has higher priority
blueprint = endpoint.rsplit('.', 1)[0] \
if '.' in endpoint else request.blueprint
# file from blueprint or project?
static_folder = app.blueprints[blueprint].static_folder \
if blueprint else app.static_folder
# avoids querystring key collision
param_name = 'h'
while param_name in values:
param_name = '_' + param_name
filepath = safe_join(static_folder, filename)
values[param_name] = self.get_file_hash(filepath)
def before_request():
g.log = dict()
g.log['out'] = list()
g.log['request'] = log_request(request)
g.endpoint = request.endpoint
if request.blueprint:
g.endpoint = g.endpoint[len(request.blueprint):].lstrip('.')
reseller_info = get_reseller_info()
g.reseller_name = reseller_info.name
g.company_name = 'N/A'
if not reseller_info.name:
allow_public_endpoints_only()
return
if not check_oauth_signature(request):
abort(401)
g.auth = reseller_info.auth
g.reseller = Reseller(reseller_info.name, reseller_info.id, None)
g.reseller.refresh()
if not g.reseller.token and not reseller_info.is_new:
abort(403)
def exempt(self, view):
"""Mark a view or blueprint to be excluded from CSRF protection.
::
@app.route('/some-view', methods=['POST'])
@csrf.exempt
def some_view():
...
::
bp = Blueprint(...)
csrf.exempt(bp)
"""
if isinstance(view, Blueprint):
self._exempt_blueprints.add(view.name)
return view
if isinstance(view, string_types):
view_location = view
else:
view_location = '%s.%s' % (view.__module__, view.__name__)
self._exempt_views.add(view_location)
return view
def before_request():
if current_user.is_authenticated:
current_user.ping()
if not current_user.confirmed \
and request.endpoint \
and request.blueprint != 'auth' \
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
def exempt(self, view):
"""Mark a view or blueprint to be excluded from CSRF protection.
::
@app.route('/some-view', methods=['POST'])
@csrf.exempt
def some_view():
...
::
bp = Blueprint(...)
csrf.exempt(bp)
"""
if isinstance(view, Blueprint):
self._exempt_blueprints.add(view.name)
return view
if isinstance(view, string_types):
view_location = view
else:
view_location = '%s.%s' % (view.__module__, view.__name__)
self._exempt_views.add(view_location)
return view
def _is_api(request):
''' Checks if the error comes from the api '''
return request.blueprint == 'api' or 'api' in request.url
def security_processer():
if not request.blueprint == 'security':
return
if request.endpoint == 'security.login':
form = CanellaLoginForm()
if form.validate_on_submit():
session['lang'] = form.lang.data
elif request.endpoint == 'security.logout' and 'lang' in session:
session.pop('lang')
def exempt(self, view):
"""Mark a view or blueprint to be excluded from CSRF protection.
::
@app.route('/some-view', methods=['POST'])
@csrf.exempt
def some_view():
...
::
bp = Blueprint(...)
csrf.exempt(bp)
"""
if isinstance(view, Blueprint):
self._exempt_blueprints.add(view.name)
return view
if isinstance(view, string_types):
view_location = view
else:
view_location = '%s.%s' % (view.__module__, view.__name__)
self._exempt_views.add(view_location)
return view
def exempt(self, view):
"""Mark a view or blueprint to be excluded from CSRF protection.
::
@app.route('/some-view', methods=['POST'])
@csrf.exempt
def some_view():
...
::
bp = Blueprint(...)
csrf.exempt(bp)
"""
if isinstance(view, Blueprint):
self._exempt_blueprints.add(view.name)
return view
if isinstance(view, string_types):
view_location = view
else:
view_location = '%s.%s' % (view.__module__, view.__name__)
self._exempt_views.add(view_location)
return view
def exempt(self, view):
"""Mark a view or blueprint to be excluded from CSRF protection.
::
@app.route('/some-view', methods=['POST'])
@csrf.exempt
def some_view():
...
::
bp = Blueprint(...)
csrf.exempt(bp)
"""
if isinstance(view, Blueprint):
self._exempt_blueprints.add(view.name)
return view
if isinstance(view, string_types):
view_location = view
else:
view_location = '%s.%s' % (view.__module__, view.__name__)
self._exempt_views.add(view_location)
return view
def schema(path=None):
"""Validate the request payload with a JSONSchema.
Decorator func that will be used to specify
the path to the schema that the route/endpoint
will be validated against.
:param path: path to the schema file
:type path: string
:returns: list of errors if there are any
:raises: InvalidAPIRequest if there are any errors
ex:
@schema('/path/to/schema.json')
@app.route('/app-route')
def app_route():
...
"""
def decorator(func):
@wraps(func)
def wrapped(*args, **kwargs):
_path = path.lstrip('/')
schema_path = os.path.join(SCHEMAS_PATH, request.blueprint, _path)
payload = get_request_payload(request.method)(request)
errors = validate_schema(payload, get_schema(schema_path))
if errors:
raise InvalidAPIRequest(message=errors)
return func(*args, **kwargs)
return wrapped
return decorator
def __init__(self, blueprint, text, description=None, blueprint_access=None):
"""Init the `BlueprintNavigation` instance.
:param blueprint: A `flask.Blueprint` instance.
:param text: The text for the top bar navigation.
:param description: An optional anchor title.
"""
self.blueprint = blueprint
self.text = text
self.description = description
self._elements = []
if blueprint_access is None:
blueprint_access = BlueprintAccess(blueprint)
self._access = blueprint_access
def is_allowed(self):
"""Checks if the user has general access to the blueprint.
This uses the `BlueprintAccess.has_general_access` to find out
if the user has access to this blueprint (and its navigation).
If there is no `BlueprintNavigation` given we assume everything
is granted.
"""
return self._access.has_general_access
def is_active(self):
"""Tells if the blueprint of this instance is active.
Active is a blueprint if a view func of it is shown.
:return: True if active, False else.
"""
return self.blueprint.name == request.blueprint
def exempt(self, view):
"""Mark a view or blueprint to be excluded from CSRF protection.
::
@app.route('/some-view', methods=['POST'])
@csrf.exempt
def some_view():
...
::
bp = Blueprint(...)
csrf.exempt(bp)
"""
if isinstance(view, Blueprint):
self._exempt_blueprints.add(view.name)
return view
if isinstance(view, string_types):
view_location = view
else:
view_location = '%s.%s' % (view.__module__, view.__name__)
self._exempt_views.add(view_location)
return view
def _show_toolbar(self):
"""Return a boolean to indicate if we need to show the toolbar."""
if request.blueprint == 'debugtoolbar':
return False
hosts = current_app.config['DEBUG_TB_HOSTS']
if hosts and request.remote_addr not in hosts:
return False
return True
def init_app(self, app):
app.extensions['csrf'] = self
app.config.setdefault('WTF_CSRF_ENABLED', True)
app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True)
app.config['WTF_CSRF_METHODS'] = set(app.config.get(
'WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH', 'DELETE']
))
app.config.setdefault('WTF_CSRF_FIELD_NAME', 'csrf_token')
app.config.setdefault(
'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token']
)
app.config.setdefault('WTF_CSRF_TIME_LIMIT', 3600)
app.config.setdefault('WTF_CSRF_SSL_STRICT', True)
app.jinja_env.globals['csrf_token'] = generate_csrf
app.context_processor(lambda: {'csrf_token': generate_csrf})
@app.before_request
def csrf_protect():
if not app.config['WTF_CSRF_ENABLED']:
return
if not app.config['WTF_CSRF_CHECK_DEFAULT']:
return
if request.method not in app.config['WTF_CSRF_METHODS']:
return
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
if request.blueprint in self._exempt_blueprints:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
self.protect()
def init_app(self, app):
app.extensions['csrf'] = self
app.config.setdefault('WTF_CSRF_ENABLED', True)
app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True)
app.config['WTF_CSRF_METHODS'] = set(app.config.get(
'WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH', 'DELETE']
))
app.config.setdefault('WTF_CSRF_FIELD_NAME', 'csrf_token')
app.config.setdefault(
'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token']
)
app.config.setdefault('WTF_CSRF_TIME_LIMIT', 3600)
app.config.setdefault('WTF_CSRF_SSL_STRICT', True)
app.jinja_env.globals['csrf_token'] = generate_csrf
app.context_processor(lambda: {'csrf_token': generate_csrf})
@app.before_request
def csrf_protect():
if not app.config['WTF_CSRF_ENABLED']:
return
if not app.config['WTF_CSRF_CHECK_DEFAULT']:
return
if request.method not in app.config['WTF_CSRF_METHODS']:
return
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
if request.blueprint in self._exempt_blueprints:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
self.protect()
def init_app(self, app):
self._app = app
app.jinja_env.globals['csrf_token'] = generate_csrf
app.config.setdefault(
'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token']
)
app.config.setdefault('WTF_CSRF_SSL_STRICT', True)
app.config.setdefault('WTF_CSRF_ENABLED', True)
app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True)
app.config.setdefault('WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH'])
# expose csrf_token as a helper in all templates
@app.context_processor
def csrf_token():
return dict(csrf_token=generate_csrf)
if not app.config['WTF_CSRF_ENABLED']:
return
if not app.config['WTF_CSRF_CHECK_DEFAULT']:
return
@app.before_request
def _csrf_protect():
# many things come from django.middleware.csrf
if request.method not in app.config['WTF_CSRF_METHODS']:
return
if self._exempt_views or self._exempt_blueprints:
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
if request.blueprint in self._exempt_blueprints:
return
self.protect()
def init_app(self, app):
app.extensions['csrf'] = self
app.config.setdefault('WTF_CSRF_ENABLED', True)
app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True)
app.config['WTF_CSRF_METHODS'] = set(app.config.get(
'WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH', 'DELETE']
))
app.config.setdefault('WTF_CSRF_FIELD_NAME', 'csrf_token')
app.config.setdefault(
'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token']
)
app.config.setdefault('WTF_CSRF_TIME_LIMIT', 3600)
app.config.setdefault('WTF_CSRF_SSL_STRICT', True)
app.jinja_env.globals['csrf_token'] = generate_csrf
app.context_processor(lambda: {'csrf_token': generate_csrf})
@app.before_request
def csrf_protect():
if not app.config['WTF_CSRF_ENABLED']:
return
if not app.config['WTF_CSRF_CHECK_DEFAULT']:
return
if request.method not in app.config['WTF_CSRF_METHODS']:
return
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
if request.blueprint in self._exempt_blueprints:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
self.protect()
def init_app(self, app):
app.extensions['csrf'] = self
app.config.setdefault('WTF_CSRF_ENABLED', True)
app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True)
app.config['WTF_CSRF_METHODS'] = set(app.config.get(
'WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH', 'DELETE']
))
app.config.setdefault('WTF_CSRF_FIELD_NAME', 'csrf_token')
app.config.setdefault(
'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token']
)
app.config.setdefault('WTF_CSRF_TIME_LIMIT', 3600)
app.config.setdefault('WTF_CSRF_SSL_STRICT', True)
app.jinja_env.globals['csrf_token'] = generate_csrf
app.context_processor(lambda: {'csrf_token': generate_csrf})
@app.before_request
def csrf_protect():
if not app.config['WTF_CSRF_ENABLED']:
return
if not app.config['WTF_CSRF_CHECK_DEFAULT']:
return
if request.method not in app.config['WTF_CSRF_METHODS']:
return
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
if request.blueprint in self._exempt_blueprints:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
self.protect()
def init_app(self, app):
self._app = app
app.jinja_env.globals['csrf_token'] = generate_csrf
app.config.setdefault(
'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token']
)
app.config.setdefault('WTF_CSRF_SSL_STRICT', True)
app.config.setdefault('WTF_CSRF_ENABLED', True)
app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True)
app.config.setdefault('WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH'])
# expose csrf_token as a helper in all templates
@app.context_processor
def csrf_token():
return dict(csrf_token=generate_csrf)
if not app.config['WTF_CSRF_ENABLED']:
return
if not app.config['WTF_CSRF_CHECK_DEFAULT']:
return
@app.before_request
def _csrf_protect():
# many things come from django.middleware.csrf
if request.method not in app.config['WTF_CSRF_METHODS']:
return
if self._exempt_views or self._exempt_blueprints:
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
if request.blueprint in self._exempt_blueprints:
return
self.protect()
def init_app(self, app):
self._app = app
app.jinja_env.globals['csrf_token'] = generate_csrf
app.config.setdefault(
'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token']
)
app.config.setdefault('WTF_CSRF_SSL_STRICT', True)
app.config.setdefault('WTF_CSRF_ENABLED', True)
app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True)
app.config.setdefault('WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH'])
# expose csrf_token as a helper in all templates
@app.context_processor
def csrf_token():
return dict(csrf_token=generate_csrf)
if not app.config['WTF_CSRF_ENABLED']:
return
if not app.config['WTF_CSRF_CHECK_DEFAULT']:
return
@app.before_request
def _csrf_protect():
# many things come from django.middleware.csrf
if request.method not in app.config['WTF_CSRF_METHODS']:
return
if self._exempt_views or self._exempt_blueprints:
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
if request.blueprint in self._exempt_blueprints:
return
self.protect()
def init_app(self, app):
self._app = app
app.jinja_env.globals['csrf_token'] = generate_csrf
app.config.setdefault(
'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token']
)
app.config.setdefault('WTF_CSRF_SSL_STRICT', True)
app.config.setdefault('WTF_CSRF_ENABLED', True)
app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True)
app.config.setdefault('WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH'])
# expose csrf_token as a helper in all templates
@app.context_processor
def csrf_token():
return dict(csrf_token=generate_csrf)
if not app.config['WTF_CSRF_ENABLED']:
return
if not app.config['WTF_CSRF_CHECK_DEFAULT']:
return
@app.before_request
def _csrf_protect():
# many things come from django.middleware.csrf
if request.method not in app.config['WTF_CSRF_METHODS']:
return
if self._exempt_views or self._exempt_blueprints:
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
if request.blueprint in self._exempt_blueprints:
return
self.protect()