def find_synonym(name, what):
async with engine.acquire() as conn:
synonym = model.Synonym.__table__
res=await conn.execute(select([synonym.c.our_name]).where(and_(func.lower(synonym.c.other_name) == name.lower(),
synonym.c.category == what)))
s = await res.fetchone()
if s: return s[0]
python类and_()的实例源码
def get_conversion_id(source_id, user_id, format):
async with engine.acquire() as conn:
conversion = model.Conversion.__table__
format_id = await get_format_id(format)
res = await conn.execute(select([conversion.c.id]).where(and_(conversion.c.source_id==source_id,
conversion.c.created_by_id == user_id,
conversion.c.format_id == format_id)))
res= await res.fetchone()
if res:
return res[0]
def get_existing_conversion(ebook_id, user_id, to_format):
format_id = await get_format_id(to_format)
async with engine.acquire() as conn:
source = model.Source.__table__
conversion = model.Conversion.__table__
res = await conn.execute(select([conversion.c.id]).select_from(conversion.join(source))\
.where(and_(source.c.ebook_id == ebook_id,
conversion.c.created_by_id == user_id,
conversion.c.format_id == format_id))\
.order_by(nullslast(desc(source.c.quality))))
return await res.scalar()
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def in_grid(cls, grid):
# check if a point is within the boundaries of the grid
return or_(and_(cls.lon_min > grid.lon_min,
cls.lon_min < grid.lon_max,
cls.lat_min > grid.lat_min,
cls.lat_min < grid.lat_max),
and_(cls.lon_min > grid.lon_min,
cls.lon_min < grid.lon_max,
cls.lat_max > grid.lat_min,
cls.lat_max < grid.lat_max),
and_(cls.lon_max > grid.lon_min,
cls.lon_max < grid.lon_max,
cls.lat_min > grid.lat_min,
cls.lat_min < grid.lat_max),
and_(cls.lon_max > grid.lon_min,
cls.lon_max < grid.lon_max,
cls.lat_max > grid.lat_min,
cls.lat_max < grid.lat_max),
and_(cls.lon_min < grid.lon_min,
cls.lon_max > grid.lon_min,
cls.lat_min < grid.lat_min,
cls.lat_max > grid.lat_min),
and_(cls.lon_min < grid.lon_min,
cls.lon_max > grid.lon_min,
cls.lat_min < grid.lat_max,
cls.lat_max > grid.lat_max),
and_(cls.lon_min < grid.lon_max,
cls.lon_max > grid.lon_max,
cls.lat_min < grid.lat_min,
cls.lat_max > grid.lat_min),
and_(cls.lon_min < grid.lon_max,
cls.lon_max > grid.lon_max,
cls.lat_min < grid.lat_max,
cls.lat_max > grid.lat_max))
def in_grid(cls, grid):
# check if a point is within the boundaries of the grid
return or_(and_(cls.lon_min > grid.lon_min,
cls.lon_min < grid.lon_max,
cls.lat_min > grid.lat_min,
cls.lat_min < grid.lat_max),
and_(cls.lon_min > grid.lon_min,
cls.lon_min < grid.lon_max,
cls.lat_max > grid.lat_min,
cls.lat_max < grid.lat_max),
and_(cls.lon_max > grid.lon_min,
cls.lon_max < grid.lon_max,
cls.lat_min > grid.lat_min,
cls.lat_min < grid.lat_max),
and_(cls.lon_max > grid.lon_min,
cls.lon_max < grid.lon_max,
cls.lat_max > grid.lat_min,
cls.lat_max < grid.lat_max),
and_(cls.lon_min < grid.lon_min,
cls.lon_max > grid.lon_min,
cls.lat_min < grid.lat_min,
cls.lat_max > grid.lat_min),
and_(cls.lon_min < grid.lon_min,
cls.lon_max > grid.lon_min,
cls.lat_min < grid.lat_max,
cls.lat_max > grid.lat_max),
and_(cls.lon_min < grid.lon_max,
cls.lon_max > grid.lon_max,
cls.lat_min < grid.lat_min,
cls.lat_max > grid.lat_min),
and_(cls.lon_min < grid.lon_max,
cls.lon_max > grid.lon_max,
cls.lat_min < grid.lat_max,
cls.lat_max > grid.lat_max))
def point_inside(cls, point):
return and_(cls.lat_min <= point.lat,
cls.lat_max >= point.lat,
cls.lon_min <= point.lon,
cls.lon_max >= point.lon)
def old_maps(self):
"""
Returns 0 for false and an integer count of old shakemaps for true
"""
stmt = (select([ShakeMap.__table__.c.shakecast_id])
.where(and_(ShakeMap.__table__.c.shakemap_id == self.shakemap_id,
ShakeMap.__table__.c.shakemap_version < self.shakemap_version)))
result = engine.execute(stmt)
old_shakemaps = [row for row in result]
return len(old_shakemaps)
def is_new(self):
stmt = (select([ShakeMap.__table__.c.shakecast_id])
.where(and_(ShakeMap.__table__.c.shakemap_id == self.shakemap_id,
ShakeMap.__table__.c.shakemap_version == self.shakemap_version)))
result = engine.execute(stmt)
shakemaps = [row for row in result]
if shakemaps:
return False
else:
return True
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def toolbar_icon_clicked(self, widget, movie):
#
# remove unused posters
#
session = self.db.Session()
delete_posters = delete(posters_table)
delete_posters = delete_posters.where(not_(exists([movies_table.c.movie_id], and_(posters_table.c.md5sum==movies_table.c.poster_md5)).correlate(posters_table)))
log.debug(delete_posters)
session.execute(delete_posters)
session.commit()
#
# compressing sqlite databases
#
if self.app.config.get('type', 'sqlite', section='database') == 'sqlite':
databasefilename = "%s.db" % os.path.join(self.app.locations['home'], self.app.config.get('name', section='database'))
pagesize = gutils.get_filesystem_pagesize(databasefilename)
# works since sqlite 3.5.8
# python 2.5 doesn't include 3.x but perhaps in future versions
# another way is the installation of pysqlite2 with 2.5.6/2.6.0 or higher
try:
from pysqlite2 import dbapi2 as sqlite3
con = sqlite3.connect(databasefilename)
try:
con.isolation_level = None
cur = con.cursor()
cur.execute('PRAGMA page_size=' + str(pagesize))
cur.execute('VACUUM;')
finally:
con.close()
except:
log.error('fallback to default driver')
self.app.db.engine.execute('PRAGMA page_size=' + str(pagesize))
self.app.db.engine.execute('VACUUM;')
gutils.info(_("Finished"))
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def _get_nonansi_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == sql_operators.eq:
if join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
elif join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(
join.onclause, {}, {'binary': visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, expression.Join):
visit_join(j)
elif isinstance(j, expression.FromGrouping):
visit_join(j.element)
for f in froms:
if isinstance(f, expression.Join):
visit_join(f)
if not clauses:
return None
else:
return sql.and_(*clauses)
def has_table(self, connection, table_name, schema=None):
if schema is None:
schema=self.default_schema_name
stmt = select([column('tablename')],
from_obj=[text('dbc.tablesvx')]).where(
and_(text('DatabaseName=:schema'),
text('TableName=:table_name')))
res = connection.execute(stmt, schema=schema, table_name=table_name).fetchone()
return res is not None
def get_columns(self, connection, table_name, schema=None, **kw):
helpView=False
if schema is None:
schema = self.default_schema_name
if int(self.server_version_info.split('.')[0])<16:
dbc_columninfo='dbc.ColumnsV'
#Check if the object us a view
stmt = select([column('tablekind')],\
from_obj=[text('dbc.tablesV')]).where(\
and_(text('DatabaseName=:schema'),\
text('TableName=:table_name'),\
text("tablekind='V'")))
res = connection.execute(stmt, schema=schema, table_name=table_name).rowcount
helpView = (res==1)
else:
dbc_columninfo='dbc.ColumnsQV'
stmt = select([column('columnname'), column('columntype'),\
column('columnlength'), column('chartype'),\
column('decimaltotaldigits'), column('decimalfractionaldigits'),\
column('columnformat'),\
column('nullable'), column('defaultvalue'), column('idcoltype')],\
from_obj=[text(dbc_columninfo)]).where(\
and_(text('DatabaseName=:schema'),\
text('TableName=:table_name')))
res = connection.execute(stmt, schema=schema, table_name=table_name).fetchall()
#If this is a view in pre-16 version, get types for individual columns
if helpView:
res=[self._get_column_help(connection, schema,table_name,r['columnname']) for r in res]
return [self._get_column_info(row) for row in res]