def search_reserveds(search_text, **kwargs):
"""Search the Reserved table."""
is_first = True
where_clause = None
if search_text:
search_values = re.sub('[~`%&-+=]', '', search_text).split()
for s in search_values:
s_ = u'%'+s+u'%'
if is_first:
where_clause = (Reserved.name ** s_)
is_first = False
else:
where_clause = where_clause & \
(Reserved.name ** s_)
if is_first:
where_clause = (~(Reserved.id >> None))
select_clause = pwe.SelectQuery(Reserved, Reserved.id, Reserved.name)
order_by_clause = Reserved.name
reserved_list = search_rows(select_clause, where_clause, order_by_clause)
return reserved_list
python类SelectQuery()的实例源码
def add(self, value, clear_existing=False):
if clear_existing:
self.clear()
fd = self._field_descriptor
if isinstance(value, SelectQuery):
query = value.select(
SQL(str(self._instance.get_id())),
fd.rel_model._meta.primary_key)
fd.through_model.insert_from(
fields=[fd.src_fk, fd.dest_fk],
query=query).execute()
else:
if not isinstance(value, (list, tuple)):
value = [value]
if not value:
return
inserts = [{
fd.src_fk.name: self._instance.get_id(),
fd.dest_fk.name: rel_id}
for rel_id in self._id_list(value)]
fd.through_model.insert_many(inserts).execute()
def remove(self, value):
fd = self._field_descriptor
if isinstance(value, SelectQuery):
subquery = value.select(value.model_class._meta.primary_key)
return (fd.through_model
.delete()
.where(
(fd.dest_fk << subquery) &
(fd.src_fk == self._instance.get_id()))
.execute())
else:
if not isinstance(value, (list, tuple)):
value = [value]
if not value:
return
return (fd.through_model
.delete()
.where(
(fd.dest_fk << self._id_list(value)) &
(fd.src_fk == self._instance.get_id()))
.execute())
def __set__(self, instance, value):
mtv = instance._meta.db_table
miv = instance._get_pk_value()
if (isinstance(value, SelectQuery) and
value.model_class == self.model_class):
UpdateQuery(self.model_class, {
self.model_type_field: mtv,
self.model_id_field: miv,
}).where(value._where).execute()
elif all(map(lambda i: isinstance(i, self.model_class), value)):
for obj in value:
setattr(obj, self.model_type_field.name, mtv)
setattr(obj, self.model_id_field.name, miv)
obj.save()
else:
raise ValueError('ReverseGFK field unable to handle "%s"' % value)
def dataClean():
if EdulistModel.table_exists() == False:
EdulistModel.create_table()
schools = SelectQuery(EdulistModel).select().where(EdulistModel.postcode == '')
print(schools.count())
for school in schools:
# category = school.category
print(school.postcode)
# category = category.replace(' ', '')
# category = category.replace('\t', '')
# category = category.replace('\n', '')
# category = category.replace('\r', '?')
# print(category)
# school.category = category
# school.save()
def add(self, value, clear_existing=False):
if clear_existing:
self.clear()
fd = self._field_descriptor
if isinstance(value, SelectQuery):
query = value.select(
SQL(str(self._instance.get_id())),
fd.rel_model._meta.primary_key)
fd.through_model.insert_from(
fields=[fd.src_fk, fd.dest_fk],
query=query).execute()
else:
if not isinstance(value, (list, tuple)):
value = [value]
if not value:
return
inserts = [{
fd.src_fk.name: self._instance.get_id(),
fd.dest_fk.name: rel_id}
for rel_id in self._id_list(value)]
fd.through_model.insert_many(inserts).execute()
def remove(self, value):
fd = self._field_descriptor
if isinstance(value, SelectQuery):
subquery = value.select(value.model_class._meta.primary_key)
return (fd.through_model
.delete()
.where(
(fd.dest_fk << subquery) &
(fd.src_fk == self._instance.get_id()))
.execute())
else:
if not isinstance(value, (list, tuple)):
value = [value]
if not value:
return
return (fd.through_model
.delete()
.where(
(fd.dest_fk << self._id_list(value)) &
(fd.src_fk == self._instance.get_id()))
.execute())
def __set__(self, instance, value):
mtv = instance._meta.db_table
miv = instance._get_pk_value()
if (isinstance(value, SelectQuery) and
value.model_class == self.model_class):
UpdateQuery(self.model_class, {
self.model_type_field: mtv,
self.model_id_field: miv,
}).where(value._where).execute()
elif all(map(lambda i: isinstance(i, self.model_class), value)):
for obj in value:
setattr(obj, self.model_type_field.name, mtv)
setattr(obj, self.model_id_field.name, miv)
obj.save()
else:
raise ValueError('ReverseGFK field unable to handle "%s"' % value)
def add(self, value, clear_existing=False):
if clear_existing:
self.clear()
fd = self._field_descriptor
if isinstance(value, SelectQuery):
query = value.select(
SQL(str(self._instance.get_id())),
fd.rel_model._meta.primary_key)
fd.through_model.insert_from(
fields=[fd.src_fk, fd.dest_fk],
query=query).execute()
else:
if not isinstance(value, (list, tuple)):
value = [value]
if not value:
return
inserts = [{
fd.src_fk.name: self._instance.get_id(),
fd.dest_fk.name: rel_id}
for rel_id in self._id_list(value)]
fd.through_model.insert_many(inserts).execute()
def remove(self, value):
fd = self._field_descriptor
if isinstance(value, SelectQuery):
subquery = value.select(value.model_class._meta.primary_key)
return (fd.through_model
.delete()
.where(
(fd.dest_fk << subquery) &
(fd.src_fk == self._instance.get_id()))
.execute())
else:
if not isinstance(value, (list, tuple)):
value = [value]
if not value:
return
return (fd.through_model
.delete()
.where(
(fd.dest_fk << self._id_list(value)) &
(fd.src_fk == self._instance.get_id()))
.execute())
def __set__(self, instance, value):
mtv = instance._meta.db_table
miv = instance._get_pk_value()
if (isinstance(value, SelectQuery) and
value.model_class == self.model_class):
UpdateQuery(self.model_class, {
self.model_type_field: mtv,
self.model_id_field: miv,
}).where(value._where).execute()
elif all(map(lambda i: isinstance(i, self.model_class), value)):
for obj in value:
setattr(obj, self.model_type_field.name, mtv)
setattr(obj, self.model_id_field.name, miv)
obj.save()
else:
raise ValueError('ReverseGFK field unable to handle "%s"' % value)
def add(self, value, clear_existing=False):
if clear_existing:
await self.clear()
fd = self._field_descriptor
if isinstance(value, SelectQuery):
query = value.select(
SQL(str(self._instance.get_id())),
fd.rel_model._meta.primary_key)
await fd.through_model.insert_from(
fields=[fd.src_fk, fd.dest_fk],
query=query).execute()
else:
if not isinstance(value, (list, tuple)):
value = [value]
if not value:
return
inserts = [{
fd.src_fk.name: self._instance.get_id(),
fd.dest_fk.name: rel_id}
for rel_id in self._id_list(value)]
await fd.through_model.insert_many(inserts).execute()
def remove(self, value):
fd = self._field_descriptor
if isinstance(value, SelectQuery):
subquery = value.select(value.model_class._meta.primary_key)
return await (fd.through_model
.delete()
.where(
(fd.dest_fk << subquery) &
(fd.src_fk == self._instance.get_id()))
.execute())
else:
if not isinstance(value, (list, tuple)):
value = [value]
if not value:
return
return await (fd.through_model
.delete()
.where(
(fd.dest_fk << self._id_list(value)) &
(fd.src_fk == self._instance.get_id()))
.execute())
def get_page_context(self):
"""
Return current page context
"""
try:
page = int(self.get_argument('page', 1))
except ValueError:
page = 1
try:
count = peewee.SelectQuery(Record).count()
except peewee.IntegrityError:
count = 0
page_count = int(count/env.ADMIN_ITEMS_PER_PAGE) + \
int(bool(count % env.ADMIN_ITEMS_PER_PAGE))
prev_page, page, next_page = self.paging(page, page_count)
try:
records = Record\
.select()\
.order_by(
Record.active.desc(),
Record.uts.desc())\
.paginate(page, paginate_by=env.ADMIN_ITEMS_PER_PAGE)
except peewee.IntegrityError:
records = []
return dict(records=records,
count=count,
page_count=page_count,
prev_page=prev_page,
page=page,
next_page=next_page)
def jobs(project, page):
""" For when no job id was given. """
g.selected_tab = "jobs"
# Check if project argument is correct
project_query = Project.select().where(Project.slug == project).first()
if project_query is None:
flash("Invalid project.")
return redirect(url_for("projects"))
session["project"] = project_query
page = int(page)
if page <= 0:
flash("invalid page number")
return redirect(project + "/jobs/1")
count = peewee.SelectQuery(Job).count()
query = (Job
.select()
.where(Job.project == project_query)
.order_by(-Job.started.is_null(), -Job.started)
.paginate(page, JOBSPERPAGE))
entries = []
for job in query:
span = job.ended
if job.started is not None:
if job.ended is not None:
span = job.ended - job.started
entries.append(dict(id=job.id, status=job.get_status(), name=job.name,
start=job.started, end=job.ended, span=span))
more = count > (page * JOBSPERPAGE)
less = page > 1
pagedata = dict(id=page, more=more, less=less)
return render_template("/jobs.html", entries=entries, page=pagedata)
def search_identifiers(search_text, **kwargs):
"""Fetch identifier rows.
Parameters
----------
search_text : str or None
Returns
-------
identifier_list
"""
is_first = True
where_clause = None
if search_text:
search_values = re.sub('[~`%&-+=]', '', search_text).split()
for s in search_values:
s_ = u'%'+s+u'%'
if is_first:
where_clause = (Identifier.name ** s_)
is_first = False
else:
where_clause = where_clause & \
(Identifier.name ** s_)
if is_first:
where_clause = (~(Identifier.id >> None))
select_clause = pwe.SelectQuery(Identifier, Identifier.id, Identifier.name)
order_by_clause = Identifier.name
identifier_list = search_rows(select_clause, where_clause, order_by_clause)
return identifier_list
def __init__(self, query_or_model, paginate_by, page_var='page',
check_bounds=False):
self.paginate_by = paginate_by
self.page_var = page_var
self.check_bounds = check_bounds
if isinstance(query_or_model, SelectQuery):
self.query = query_or_model
self.model = self.query.model_class
else:
self.model = query_or_model
self.query = self.model.select()
def get_object_or_404(query_or_model, *query):
if not isinstance(query_or_model, SelectQuery):
query_or_model = query_or_model.select()
try:
return query_or_model.where(*query).get()
except DoesNotExist:
abort(404)
def __init__(self, query_or_model, paginate_by, page_var='page',
check_bounds=False):
self.paginate_by = paginate_by
self.page_var = page_var
self.check_bounds = check_bounds
if isinstance(query_or_model, SelectQuery):
self.query = query_or_model
self.model = self.query.model_class
else:
self.model = query_or_model
self.query = self.model.select()
def get_object_or_404(query_or_model, *query):
if not isinstance(query_or_model, SelectQuery):
query_or_model = query_or_model.select()
try:
return query_or_model.where(*query).get()
except DoesNotExist:
abort(404)
def get_item(cls, id=None, order_by='id', order_type='desc', to_dict=True):
if id is not None:
data = get_object_or_404(cls, cls.id == id)
else:
order_field = getattr(cls, order_by)
if order_type.lower() == 'asc':
data = cls.select().order_by(-order_field)
else:
data = cls.select().order_by(+order_field)
if to_dict:
if isinstance(data, peewee.SelectQuery):
data = [model2dict(i) for i in data]
else:
data = model2dict(data)
return data
def fixData():
# p = Pool(100)
schools = SelectQuery(EdulistModel).select().where(EdulistModel.postcode == '')
print(schools.count())
# for school in schools:
# p.apply_async(updatePostCode, args=(school,))
#
# p.close()
# p.join()
# dataClean()
def __init__(self, query_or_model, paginate_by, page_var='page',
check_bounds=False):
self.paginate_by = paginate_by
self.page_var = page_var
self.check_bounds = check_bounds
if isinstance(query_or_model, SelectQuery):
self.query = query_or_model
self.model = self.query.model_class
else:
self.model = query_or_model
self.query = self.model.select()
def get_object_or_404(query_or_model, *query):
if not isinstance(query_or_model, SelectQuery):
query_or_model = query_or_model.select()
try:
return query_or_model.where(*query).get()
except DoesNotExist:
abort(404)
def get_object_or_404(query_or_model, *query):
if not isinstance(query_or_model, SelectQuery):
query_or_model = query_or_model.select()
try:
return query_or_model.where(*query).get()
except DoesNotExist:
abort(404)
def test_search_fields_must_be_provided(post_model):
"""Ensure that search fields need to be set."""
# Nothing is defined to search on, so error out
with pytest.raises(AttributeError):
post_model.search('hello')
# If fields are provided, return a query
assert isinstance(post_model.search('hello', fields=('body',)),
peewee.SelectQuery)
def base_query(cls):
"""Method that should return the basic query that all queries will use.
The default base query that just returns a blank ``SELECT`` statement.
It should be overridden in child classes when ``JOIN`` or ``WHERE``
will be needed for all queries.
Returns:
:py:class:`peewee.SelectQuery`:
An unexecuted query with no logic applied.
"""
return cls.select()
def get_by_id(cls, record_id, execute=True):
"""Return a single instance of the model queried by ID.
Args:
record_id (int): Integer representation of the ID to query on.
Keyword Args:
execute (bool, optional):
Should this method execute the query or return a query object
for further manipulation?
Returns:
cls | :py:class:`peewee.SelectQuery`:
If ``execute`` is ``True``, the query is executed, otherwise
a query is returned.
Raises:
:py:class:`peewee.DoesNotExist`:
Raised if a record with that ID doesn't exist.
"""
query = cls.base_query().where(cls.id == record_id)
if execute:
return query.get()
return query