def _execute_for_all_tables(self, app, bind, operation, skip_tables=False):
app = self.get_app(app)
if bind == '__all__':
binds = [None] + list(app.config.get('SQLALCHEMY_BINDS') or ())
elif isinstance(bind, string_types) or bind is None:
binds = [bind]
else:
binds = bind
for bind in binds:
extra = {}
if not skip_tables:
tables = self.get_tables_for_bind(bind)
extra['tables'] = tables
op = getattr(self.Model.metadata, operation)
op(bind=self.get_engine(app, bind), **extra)
python类get()的实例源码
def generate_csrf(secret_key=None, token_key=None):
"""Generate a CSRF token. The token is cached for a request, so multiple
calls to this function will generate the same token.
During testing, it might be useful to access the signed token in
``g.csrf_token`` and the raw token in ``session['csrf_token']``.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
if field_name not in g:
if field_name not in session:
session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
setattr(g, field_name, s.dumps(session[field_name]))
return g.get(field_name)
def validate_csrf_token(self, form, field):
if g.get('csrf_valid', False):
# already validated by CSRFProtect
return
try:
validate_csrf(
field.data,
self.meta.csrf_secret,
self.meta.csrf_time_limit,
self.meta.csrf_field_name
)
except ValidationError as e:
logger.info(e.args[0])
raise
def validate_csrf_token(self, form, field):
if g.get('csrf_valid', False):
# already validated by CSRFProtect
return
try:
validate_csrf(
field.data,
self.meta.csrf_secret,
self.meta.csrf_time_limit,
self.meta.csrf_field_name
)
except ValidationError as e:
logger.info(e.args[0])
raise
def should_skip_logging(func):
"""
Should we skip logging for this handler?
"""
disabled = strtobool(request.headers.get("x-request-nolog", "false"))
return disabled or getattr(func, SKIP_LOGGING, False)
def get_address_key(self, key):
if isinstance(self.address, dict):
return self.address.get(key)
for line in self.address or []:
if line['type'] == key:
return line['name']
def update_from_nominatim(self, hit):
keys = ('display_name', 'place_rank', 'category', 'type', 'icon',
'extratags', 'namedetails')
for n in keys:
setattr(self, n, hit.get(n))
self.address = [dict(name=n, type=t) for t, n in hit['address'].items()]
def name_for_change_comment(self):
n = self.name
if self.address:
if isinstance(self.address, list):
address = {a['type']: a['name'] for a in self.address}
elif isinstance(self.address, dict):
address = self.address
if address.get('country_code') == 'us':
state = address.get('state')
if state and n != state:
return n + ', ' + state
return 'the ' + n if ' of ' in n else n
def get_or_add_place(cls, hit):
place = cls.query.filter_by(osm_type=hit['osm_type'],
osm_id=hit['osm_id']).one_or_none()
if place and place.place_id != hit['place_id']:
place.update_from_nominatim(hit)
elif not place:
place = Place.query.get(hit['place_id'])
if place:
place.update_from_nominatim(hit)
else:
place = cls.from_nominatim(hit)
session.add(place)
session.commit()
return place
def candidates_url(self, **kwargs):
if g.get('filter'):
kwargs['name_filter'] = g.filter
endpoint = 'candidates_with_filter'
else:
endpoint = 'candidates'
return url_for(endpoint,
osm_type=self.osm_type,
osm_id=self.osm_id,
**kwargs)
def filter_urls():
name_filter = g.get('filter')
try:
if name_filter:
url = url_for('saved_with_filter',
name_filter=name_filter.replace(' ', '_'))
else:
url = url_for('saved_places')
except RuntimeError:
return {} # maybe we don't care
return dict(url_for_saved=url)
def logout():
next_url = request.args.get('next') or url_for('index')
logout_user()
flash('you are logged out')
return redirect(next_url)
def export_osm(osm_type, osm_id, name):
place = Place.get_or_abort(osm_type, osm_id)
items = place.items_with_candidates()
items = list(matcher.filter_candidates_more(items, bad=get_bad(items)))
if not any('candidate' in match for _, match in items):
abort(404)
items = [(item, match['candidate']) for item, match in items if 'candidate' in match]
lookup = {}
for item, osm in items:
lookup[(osm.osm_type, osm.osm_id)] = item
filename = cache_filename('{}_{}_overpass_export.xml'.format(osm_type, osm_id))
if os.path.exists(filename):
overpass_xml = open(filename, 'rb').read()
else:
overpass_xml = overpass.items_as_xml(items)
with open(filename, 'wb') as f:
f.write(overpass_xml)
root = etree.fromstring(overpass_xml)
for e in root:
if e.tag not in {'way', 'node', 'relation'}:
continue
for f in 'uid', 'user', 'timestamp', 'changeset':
del e.attrib[f]
pair = (e.tag, int(e.attrib['id']))
item = lookup.get(pair)
if not item:
continue
e.attrib['action'] = 'modify'
tag = etree.Element('tag', k='wikidata', v=item.qid)
e.append(tag)
xml = etree.tostring(root, pretty_print=True)
return Response(xml, mimetype='text/xml')
def add_tags(osm_type, osm_id):
place = Place.get_or_abort(osm_type, osm_id)
include = request.form.getlist('include')
items = Item.query.filter(Item.item_id.in_([i[1:] for i in include])).all()
table = [(item, match['candidate'])
for item, match in matcher.filter_candidates_more(items, bad=get_bad(items))
if 'candidate' in match]
items = [{'row_id': '{:s}-{:s}-{:d}'.format(i.qid, c.osm_type, c.osm_id),
'qid': i.qid,
'osm_type': c.osm_type,
'osm_id': c.osm_id,
'description': '{} {}: adding wikidata={}'.format(c.osm_type, c.osm_id, i.qid),
'post_tag_url': url_for('.post_tag',
item_id=i.item_id,
osm_id=c.osm_id,
osm_type=c.osm_type)} for i, c in table]
if False and request.form.get('confirm') == 'yes':
update_count = do_add_tags(place, table)
flash('{:,d} wikidata tags added to OpenStreetMap'.format(update_count))
return redirect(place.candidates_url())
return render_template('add_tags.html',
place=place,
osm_id=osm_id,
items=items,
table=table)
def index():
q = request.args.get('q')
if q:
return redirect(url_for('search_results', q=q))
if 'filter' in request.args:
arg_filter = request.args['filter'].strip().replace(' ', '_')
if arg_filter:
return redirect(url_for('saved_with_filter', name_filter=arg_filter))
else:
return redirect(url_for('saved_places'))
return render_template('index.html', place_cards=get_place_cards())
def tag_list():
q = get_tag_list(request.args.get('sort'))
return render_template('tag_list.html', q=q)
def item_page(wikidata_id):
item = Item.query.get(wikidata_id)
try:
return build_item_page(wikidata_id, item)
except wikidata.QueryError:
return render_template('error_page.html',
message="query.wikidata.org isn't working")
def delete_place(place_id):
place = Place.query.get(place_id)
place.clean_up()
flash('{} deleted'.format(place.display_name))
to_next = request.args.get('next', 'space')
return redirect(url_for(to_next))
def current_span():
return g.get("tracer_span", None)
def form_is_valid():
return not g.get('hasfailures', False)