def get_region_by_id_or_name(self, region_id_or_name):
logger.debug("Get region by id or name: {}".format(region_id_or_name))
try:
session = self._engine_facade.get_session()
with session.begin():
record = session.query(Region)
record = record.filter(or_(Region.region_id == region_id_or_name,
Region.name == region_id_or_name))
if record.first():
return record.first().to_wsme()
return None
except Exception as exp:
logger.exception("DB error filtering by id/name")
raise
python类or_()的实例源码
def get_relationships(self, with_package=None, type=None, active=True,
direction='both'):
'''Returns relationships this package has.
Keeps stored type/ordering (not from pov of self).'''
assert direction in ('both', 'forward', 'reverse')
if with_package:
assert isinstance(with_package, Package)
from package_relationship import PackageRelationship
forward_filters = [PackageRelationship.subject==self]
reverse_filters = [PackageRelationship.object==self]
if with_package:
forward_filters.append(PackageRelationship.object==with_package)
reverse_filters.append(PackageRelationship.subject==with_package)
if active:
forward_filters.append(PackageRelationship.state==core.State.ACTIVE)
reverse_filters.append(PackageRelationship.state==core.State.ACTIVE)
if type:
forward_filters.append(PackageRelationship.type==type)
reverse_type = PackageRelationship.reverse_type(type)
reverse_filters.append(PackageRelationship.type==reverse_type)
q = meta.Session.query(PackageRelationship)
if direction == 'both':
q = q.filter(or_(
and_(*forward_filters),
and_(*reverse_filters),
))
elif direction == 'forward':
q = q.filter(and_(*forward_filters))
elif direction == 'reverse':
q = q.filter(and_(*reverse_filters))
return q.all()
def in_grid(cls, grid):
# check if a point is within the boundaries of the grid
return or_(and_(cls.lon_min > grid.lon_min,
cls.lon_min < grid.lon_max,
cls.lat_min > grid.lat_min,
cls.lat_min < grid.lat_max),
and_(cls.lon_min > grid.lon_min,
cls.lon_min < grid.lon_max,
cls.lat_max > grid.lat_min,
cls.lat_max < grid.lat_max),
and_(cls.lon_max > grid.lon_min,
cls.lon_max < grid.lon_max,
cls.lat_min > grid.lat_min,
cls.lat_min < grid.lat_max),
and_(cls.lon_max > grid.lon_min,
cls.lon_max < grid.lon_max,
cls.lat_max > grid.lat_min,
cls.lat_max < grid.lat_max),
and_(cls.lon_min < grid.lon_min,
cls.lon_max > grid.lon_min,
cls.lat_min < grid.lat_min,
cls.lat_max > grid.lat_min),
and_(cls.lon_min < grid.lon_min,
cls.lon_max > grid.lon_min,
cls.lat_min < grid.lat_max,
cls.lat_max > grid.lat_max),
and_(cls.lon_min < grid.lon_max,
cls.lon_max > grid.lon_max,
cls.lat_min < grid.lat_min,
cls.lat_max > grid.lat_min),
and_(cls.lon_min < grid.lon_max,
cls.lon_max > grid.lon_max,
cls.lat_min < grid.lat_max,
cls.lat_max > grid.lat_max))
def in_grid(cls, grid):
# check if a point is within the boundaries of the grid
return or_(and_(cls.lon_min > grid.lon_min,
cls.lon_min < grid.lon_max,
cls.lat_min > grid.lat_min,
cls.lat_min < grid.lat_max),
and_(cls.lon_min > grid.lon_min,
cls.lon_min < grid.lon_max,
cls.lat_max > grid.lat_min,
cls.lat_max < grid.lat_max),
and_(cls.lon_max > grid.lon_min,
cls.lon_max < grid.lon_max,
cls.lat_min > grid.lat_min,
cls.lat_min < grid.lat_max),
and_(cls.lon_max > grid.lon_min,
cls.lon_max < grid.lon_max,
cls.lat_max > grid.lat_min,
cls.lat_max < grid.lat_max),
and_(cls.lon_min < grid.lon_min,
cls.lon_max > grid.lon_min,
cls.lat_min < grid.lat_min,
cls.lat_max > grid.lat_min),
and_(cls.lon_min < grid.lon_min,
cls.lon_max > grid.lon_min,
cls.lat_min < grid.lat_max,
cls.lat_max > grid.lat_max),
and_(cls.lon_min < grid.lon_max,
cls.lon_max > grid.lon_max,
cls.lat_min < grid.lat_min,
cls.lat_max > grid.lat_min),
and_(cls.lon_min < grid.lon_max,
cls.lon_max > grid.lon_max,
cls.lat_min < grid.lat_max,
cls.lat_max > grid.lat_max))
def text_filter(query, value, table):
pairs = ((n, c) for n, c in table.c.items()
if isinstance(c.type, sa.sql.sqltypes.String))
sub_queries = []
for name, column in pairs:
do_compare = op("like", column)
sub_queries.append(do_compare(column, value))
query = query.where(or_(*sub_queries))
return query
# TODO: validate that value supplied in filter has same type as in table
# TODO: use functional style to create query
def get_user_filters(user):
clauses=[]
sfq = db.session.query(SampleFilter)
clauses.append(SampleFilter.user_id==user.user_id)
if not user.is_admin:
clauses.append(SampleFilter.is_public==True)
sfq.filter(or_(*clauses))
sfs = sfq.all()
data=[{'name':x.sample_filter_name,'set':x.sample_filter_tag, 'id':x.sample_filter_id, 'filters':json.loads(x.sample_filter_data)} for x in sfs]
return data
def _get_comparison(self, values, op):
columns = self.__clause_element__().clauses
if op in strict_op_map:
stricter_op = strict_op_map[op]
else:
stricter_op = op
return sql.or_(stricter_op(columns[0], values[0]),
sql.and_(columns[0] == values[0],
op(columns[1], values[1])))
def get_flavor_by_id_or_name(self, id_or_name):
try:
flavor = self.session.query(Flavor).filter(or_(Flavor.id == id_or_name, Flavor.name == id_or_name))
return flavor.first()
except Exception as exception:
message = "Failed to get_flavor_by_id_or_name: id or name: {0}".format(id_or_name)
LOG.log_exception(message, exception)
raise
def bookmark_list():
'''
Returns a list of bookmarks
'''
search_form = SearchForm(request.args)
# Create the base query
# After this query is created, we keep iterating over it until we reach the desired level of filtering
query = Bookmark.query.filter_by(user=current_user.id, deleted=False).order_by(Bookmark.added_on.desc())
# Get list of tags we'll be filtering by
tag_args = request.values.getlist('tags')
if len(tag_args) == 0:
tag_args = None
else:
for tag in tag_args:
# Check is any of the tags for the bookmark match up with tag
query = query.filter(Bookmark.tags.any(tag))
# This means that the search form has been used
if search_form.query.data is not None:
# Search query type can be either basic, full text, or url
# This is basic search, which searches in the bookmark titles and descriptions
if search_form.parameter.data == 'basic':
query = search(query, search_form.query.data, vector=Bookmark.search_vector) # XXX is this safe?
user_count = Bookmark.query.filter_by(user=current_user.id, deleted=False).count()
# Postgres full text search seems to fail when using non-ASCII characters
# When the failure happens, all the bookmarks are returned instead
# We check if this has happened, and if it has, we fall back to non-indexed search instead
if query.count() == user_count:
query = query.filter(or_(Bookmark.title.contains(search_form.query.data),
Bookmark.description.contains(search_form.query.data)))
elif search_form.parameter.data == 'ft':
# We will search over the entire contents of the page here
query = search(query, search_form.query.data, vector=Bookmark.fulltext_vector)
user_count = Bookmark.query.filter_by(user=current_user.id, deleted=False).count()
if query.count() == user_count:
query = query.filter(Bookmark.full_text.contains(search_form.query.data))
# URL search lets you filter by domains or other parts of the url
elif search_form.parameter.data == 'url':
query = query.filter(Bookmark.main_url.contains(search_form.query.data))
else:
pass
# Context view takes you to the page the bookmark with a specific id is present on
# Here the id is used to know which bookmark should be highlighted
try:
context_id = request.args['bid']
except KeyError:
context_id = 0
# Pagination, with defaulting to the first page
page = request.args.get('page', 1, type=int)
# Users are allowed to specify how many bookmarks they want per page
bookmarks_per_page = User.query.get(current_user.id).bookmarks_per_page
# Paginate the results of our query
pagination = query.paginate(page, per_page=bookmarks_per_page, error_out=False)
delete_form = DeleteForm()
return render_template("manager/bookmark_list.html",
pagination=pagination,
search_form=search_form,
delete_form=delete_form,
context_id=context_id,
tag_args=tag_args)