def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
python类get()的实例源码
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def logging_levels():
"""
Context manager to conditionally set logging levels.
Supports setting per-request debug logging using the `X-Request-Debug` header.
"""
enabled = strtobool(request.headers.get("x-request-debug", "false"))
level = None
try:
if enabled:
level = getLogger().getEffectiveLevel()
getLogger().setLevel(DEBUG)
yield
finally:
if enabled:
getLogger().setLevel(level)
def post_process_request_body(self, dct):
if g.get("hide_body") or not self.request_body:
return
for name, new_name in g.get("show_request_fields", {}).items():
try:
value = self.request_body.pop(name)
self.request_body[new_name] = value
except KeyError:
pass
for field in g.get("hide_request_fields", []):
try:
del self.request_body[field]
except KeyError:
pass
dct.update(
request_body=self.request_body,
)
def post_process_response_body(self, dct):
if g.get("hide_body") or not self.response_body:
return
for name, new_name in g.get("show_response_fields", {}).items():
try:
value = self.response_body.pop(name)
self.response_body[new_name] = value
except KeyError:
pass
for field in g.get("hide_response_fields", []):
try:
del self.response_body[field]
except KeyError:
pass
dct.update(
response_body=self.response_body,
)
def name_for_changeset(self):
address = self.address
n = self.name
if not address:
return self.name
if isinstance(address, list):
d = {a['type']: a['name'] for a in address}
elif isinstance(address, dict):
d = address
if d.get('country_code') == 'us':
state = d.get('state')
if state and n != state:
return n + ', ' + state
country = d.get('country')
if country and self.name != country:
return '{} ({})'.format(self.name, country)
return self.name
def close_changeset(osm_type, osm_id):
Place.get_or_abort(osm_type, osm_id)
osm_backend, auth = get_backend_and_auth()
changeset_id = request.form['changeset_id']
update_count = request.form['update_count']
if really_save:
osm_backend.request(osm_api_base + '/changeset/{}/close'.format(changeset_id),
method='PUT',
auth=auth,
headers=user_agent_headers())
change = Changeset.query.get(changeset_id)
change.update_count = update_count
database.session.commit()
# mail.announce_change(change)
return Response('done', mimetype='text/plain')
def search_results():
q = request.args.get('q') or ''
if not q:
return render_template('results_page.html', results=[], q=q)
m = re_qid.match(q.strip())
if m:
return redirect(url_for('item_page', wikidata_id=m.group(1)[1:]))
try:
results = nominatim.lookup(q)
except nominatim.SearchError:
message = 'nominatim API search error'
return render_template('error_page.html', message=message)
update_search_results(results)
for hit in results:
add_hit_place_detail(hit)
return render_template('results_page.html', results=results, q=q)
def criteria_page():
entity_types = matcher.load_entity_types()
taginfo = get_taginfo(entity_types)
for t in entity_types:
t.setdefault('name', t['cats'][0].replace(' by country', ''))
for tag in t['tags']:
if '=' not in tag:
continue
image = taginfo.get(tag, {}).get('image')
if image:
t['image'] = image
break
entity_types.sort(key=lambda t: t['name'].lower())
cat_counts = {cat.name: cat.page_count for cat in Category.query}
return render_template('criteria.html',
entity_types=entity_types,
cat_counts=cat_counts,
taginfo=taginfo)
def saved_places():
abort(404)
if 'filter' in request.args:
arg_filter = request.args['filter'].strip().replace(' ', '_')
if arg_filter:
return redirect(url_for('saved_with_filter', name_filter=arg_filter))
else:
return redirect(url_for('saved_places'))
sort = request.args.get('sort') or 'name'
name_filter = g.get('filter') or None
if name_filter:
place_tbody = render_template('place_tbody.html',
existing=get_existing(sort, name_filter))
else:
place_tbody = get_place_tbody(sort)
return render_template('saved.html', place_tbody=place_tbody, sort_link=sort_link)
def tracer_decorator(span_name, pay_load=None):
def _decorator(func):
@wraps(func)
def create_span(*args, **kwargs):
span = g.get("tracer_span")
if not span:
new_span = opentracing.tracer.start_span(func.__name__)
else:
new_span = opentracing.tracer.start_span(func.__name__, child_of=span)
new_span.log_event(span_name, payload=pay_load)
g.tracer_span = new_span
result = func(*args, **kwargs)
if span:
g.tracer_span = span
new_span.finish()
return result
return create_span
return _decorator
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def make_error_page(app, name, code, sentry=None, data=None, exception=None):
''' creates the error page dictionary for web errors '''
shortname = name.lower().replace(' ', '_')
error = {}
error['title'] = 'Marvin | {0}'.format(name)
error['page'] = request.url
error['event_id'] = g.get('sentry_event_id', None)
error['data'] = data
error['name'] = name
error['code'] = code
error['message'] = exception.description if exception and hasattr(exception, 'description') else None
if app.config['USE_SENTRY'] and sentry:
error['public_dsn'] = sentry.client.get_public_dsn('https')
app.logger.error('{0} Exception {1}'.format(name, error))
return render_template('errors/{0}.html'.format(shortname), **error), code
# ----------------
# Error Handling
# ----------------
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def _execute_for_all_tables(self, app, bind, operation, skip_tables=False):
app = self.get_app(app)
if bind == '__all__':
binds = [None] + list(app.config.get('SQLALCHEMY_BINDS') or ())
elif isinstance(bind, string_types) or bind is None:
binds = [bind]
else:
binds = bind
for bind in binds:
extra = {}
if not skip_tables:
tables = self.get_tables_for_bind(bind)
extra['tables'] = tables
op = getattr(self.Model.metadata, operation)
op(bind=self.get_engine(app, bind), **extra)
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None
def after_request(param):
response_param = {'charset': param.charset,
'content_length': param.content_length,
'content_type': param.content_type,
'content_encoding': param.content_encoding,
'mimetype': param.mimetype,
'response': g.result if hasattr(g, 'result') else None,
'status': param.status,
'status_code': param.status_code}
g.response_time = datetime.now()
time_consuming = str(g.response_time - g.request_time)
log_info = {'api_method': g.get('api_method'), 'api_version': g.get('api_version'),
'request_param': g.get('request_param'), 'request_form': g.get('request_form'),
'querystring': g.get('request_param', {}).get('query_string'), 'request_json': g.get('request_json'),
'response_param': response_param, 'request_raw_data': g.request_raw_data,
'request_time': g.get('request_time').strftime(current_app.config['APIZEN_DATETIME_FMT']),
'response_time': g.get('response_time').strftime(current_app.config['APIZEN_DATETIME_FMT']),
'time_consuming': time_consuming}
if param.status_code >= 400:
from app.tasks import send_mail_async
# send_mail_async.delay(current_app.config['ADMIN_EMAIL'], 'Web Api Request Error', 'api_error', **log_info)
current_app.logger.error(log_info)
else:
current_app.logger.debug(log_info)
return param
def new_revision(self, *fields):
"""Save a new revision of the document"""
# Ensure this document is a draft
if not self._id:
assert g.get('draft'), \
'Only draft documents can be assigned new revisions'
else:
with self.draft_context():
assert self.count(Q._id == self._id) == 1, \
'Only draft documents can be assigned new revisions'
# Set the revision
if len(fields) > 0:
fields.append('revision')
self.revision = datetime.now()
# Update the document
self.upsert(*fields)
def get_collection(cls):
"""Return a reference to the database collection for the class"""
# By default the collection returned will be the published collection,
# however if the `draft` flag has been set against the global context
# (e.g `g`) then the collection returned will contain draft documents.
if g.get('draft'):
return getattr(
cls.get_db(),
'{collection}_draft'.format(collection=cls._collection)
)
return getattr(cls.get_db(), cls._collection)
# Contexts
def _get_config(
value, config_name, default=None,
required=True, message='CSRF is not configured.'
):
"""Find config value based on provided value, Flask config, and default
value.
:param value: already provided config value
:param config_name: Flask ``config`` key
:param default: default value if not provided or configured
:param required: whether the value must not be ``None``
:param message: error message if required config is not found
:raises KeyError: if required config is not found
"""
if value is None:
value = current_app.config.get(config_name, default)
if required and value is None:
raise KeyError(message)
return value
def _get_csrf_token(self):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be
# ``{prefix}-csrf_token``
field_name = current_app.config['WTF_CSRF_FIELD_NAME']
for key in request.form:
if key.endswith(field_name):
csrf_token = request.form[key]
if csrf_token:
return csrf_token
for header_name in current_app.config['WTF_CSRF_HEADERS']:
csrf_token = request.headers.get(header_name)
if csrf_token:
return csrf_token
return None