def test_oc():
a = asc('a')
b = desc('a')
c = asc('b')
n = nullslast(desc('a'))
a = OC(a)
b = OC(b)
c = OC(c)
n = OC(n)
assert str(a) == str(OC('a'))
assert a.is_ascending
assert not b.is_ascending
assert not n.reversed.reversed.is_ascending
assert str(a.element) == str(b.element) == str(n.element)
assert str(a) == str(b.reversed)
assert str(n.reversed.reversed) == str(n)
assert a.name == 'a'
assert n.name == 'a'
assert n.quoted_full_name == 'a'
assert repr(n) == '<OC: a DESC NULLS LAST>'
python类nullslast()的实例源码
def __new__(cls, column_name, mData=None, search_like=True,
filter=str, searchable=True, filterarg='cell',
nulls_order=None):
"""Set default values for mData and filter.
On creation, sets default None values for mData and string value for
filter (cause: Object representation is not JSON serializable).
"""
# check if allowed value
if nulls_order and nulls_order not in ['nullsfirst', 'nullslast']:
raise ValueError('`%s` is not an allowed value for nulls_order.'
% nulls_order)
return super(ColumnDT, cls).__new__(
cls, column_name, mData, search_like, filter, searchable,
filterarg, nulls_order)
def get_conversion_candidate(ebook_id, to_format):
to_format_id = await get_format_id(to_format)
async with engine.acquire() as conn:
source = model.Source.__table__
format = model.Format.__table__
res = await conn.execute(select([source.c.id, format.c.extension]).where(and_(source.c.ebook_id == ebook_id,
source.c.format_id == to_format_id,
source.c.format_id == format.c.id))\
.order_by(nullslast(desc(source.c.quality))))
res = await res.first()
if res:
return res.as_tuple()
#TODO: Consider optimal selection of the source
# in previous version we first selected format (from available convertable in ebook)
# and then one with best quality - so actually the other way around
q=select([source.c.id, format.c.extension])\
.where(and_(source.c.format_id == format.c.id, source.c.ebook_id == ebook_id)).order_by(nullslast(desc(source.c.quality)))
async for row in conn.execute(q):
if row.extension in settings.CONVERTABLE_TYPES:
return row.id, row.extension
return None, None
def get_existing_conversion(ebook_id, user_id, to_format):
format_id = await get_format_id(to_format)
async with engine.acquire() as conn:
source = model.Source.__table__
conversion = model.Conversion.__table__
res = await conn.execute(select([conversion.c.id]).select_from(conversion.join(source))\
.where(and_(source.c.ebook_id == ebook_id,
conversion.c.created_by_id == user_id,
conversion.c.format_id == format_id))\
.order_by(nullslast(desc(source.c.quality))))
return await res.scalar()
def _sort_by_key_value(cls, query, column, descending=False):
expression = column
if descending:
expression = desc(expression)
if settings.db.url.startswith('sqlite'):
return query.order_by(expression)
return query.order_by((nullsfirst if descending else nullslast)(expression))
def version_at_time_q(cls, base_id, timestamp, db=None):
db = db or cls.default_db
# Version that can be used without first
# return db.query(cls).distinct(cls.base_id).filter(
# cls.base_id == self.base_id,
# (cls.tombstone_date == None) || (cls.tombstone_date > timestamp)
# ).order_by(cls.base_id, nullslast(asc(cls.tombstone_date)))
return db.query(cls).filter(
cls.base_id == base_id,
(cls.tombstone_date == None) | (cls.tombstone_date > timestamp)
).order_by(nullslast(asc(cls.tombstone_date)))