def channel_top_list(self, count, user, server):
"""
Gets top <count> channels for a user
:param count: length of top list
:param user: the user
:param server: the server
:return: a SelectQuery with Message.channel with the top channels
"""
messages = (user.messages
.select(self.models.Message.channel)
.join(self.models.Server, on=(self.models.Server.id == self.models.Channel.server))
.switch(self.models.Message)
# peewee needs the ==
.where(self.models.Message.is_command == False,
self.models.Server.id == server)
.annotate(self.models.Channel)
.order_by(SQL('count').desc())
.limit(count))
return messages
python类SQL的实例源码
def user_top_list(self, count, server):
"""
Gets top <count> users on the server
:param count: length of the top list
:param server: the server
:return: a SelectQuery with users on the top list
"""
users = (self.models.User
.select()
.join(self.models.Channel, on=(self.models.Channel.id == self.models.Message.channel))
.join(self.models.Server, on=(self.models.Server.id == self.models.Channel.server))
.switch(self.models.User)
# peewee needs the ==
.where(self.models.Message.is_command == False,
self.models.User.is_bot == False,
self.models.Server.id == server)
.annotate(self.models.Message) # annotate alias to 'count'
.order_by(SQL('count').desc())
.limit(count))
return users
def column(self, coltype, name, **kwargs):
"""
Generic method to add a column of any type.
:param coltype: Column type (from FIELD_TO_PEEWEE).
:param name: Name of column.
:param kwargs: Arguments for the given column type.
"""
constraints = kwargs.pop('constraints', [])
new_constraints = []
for const in constraints:
if isinstance(const, str):
const = peewee.SQL(const)
new_constraints.append(const)
kwargs['constraints'] = new_constraints
field_class = FIELD_TO_PEEWEE.get(coltype, peewee.CharField)
field_class(**kwargs).add_to_class(self.model, name)
def _execute(db, to_run, interactive=True, commit=True):
if interactive: print()
try:
with db.atomic() as txn:
for sql, params in to_run:
if interactive or DEBUG: print_sql(' %s; %s' % (sql, params or ''))
if sql.strip().startswith('--'): continue
db.execute_sql(sql, params)
if interactive:
print()
print(
(colorama.Style.BRIGHT + 'SUCCESS!' + colorama.Style.RESET_ALL) if commit else 'TEST PASSED - ROLLING BACK',
colorama.Style.DIM + '-',
'https://github.com/keredson/peewee-db-evolve' + colorama.Style.RESET_ALL
)
print()
if not commit:
txn.rollback()
except Exception as e:
print()
print('------------------------------------------')
print(colorama.Style.BRIGHT + colorama.Fore.RED + ' SQL EXCEPTION - ROLLING BACK ALL CHANGES' + colorama.Style.RESET_ALL)
print('------------------------------------------')
print()
raise e
def sort(self, collection, *sorting, **Kwargs):
"""Sort resources."""
logger.debug('Sort collection: %r', sorting)
sorting_ = []
for name, desc in sorting:
field = self.meta.model._meta.fields.get(name) or SQL(name)
if desc:
field = field.desc()
sorting_.append(field)
if sorting_:
collection = collection.order_by(*sorting_)
return collection
def __ddl_column__(self, ctype):
return SQL(self.enum_name)
def test_constraint():
tc = TableCreator('awesome')
tc.column('char', 'fname')
const = peewee.SQL('fname not null')
tc.add_constraint(const)
assert tc.model._meta.constraints == [const]
def add_constraint(self, value):
"""
Add a constraint to the model.
:param name: String value of constraint.
:return: None
"""
self.model._meta.constraints.append(peewee.SQL(value))
def execute_sql(self, sql, params=None):
"""
Run the given sql and return a cursor.
:param sql: SQL string.
:param params: Parameters for the given SQL (deafult None).
:return: SQL cursor
:rtype: cursor
"""
return self.database.execute_sql(sql, params=params, require_commit=False)
def _get_last_users():
return list(
User.select(User.username, fn.MAX(LastUsers.id).alias("last_id"))
.join(LastUsers)
.where(User.username != "")
.group_by(User.id, User.username)
.order_by(SQL("last_id").desc())
.limit(10))
def drop_foreign_key(db, migrator, table_name, fk_name):
drop_stmt = 'drop foreign key' if is_mysql(db) else 'DROP CONSTRAINT'
op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL(drop_stmt), pw.Entity(fk_name))
return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, op)
def drop_default(db, migrator, table_name, column_name, field):
op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('ALTER COLUMN'), pw.Entity(column_name), pw.SQL('DROP DEFAULT'))
return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, op)
def set_default(db, migrator, table_name, column_name, field):
default = field.default
if callable(default): default = default()
param = pw.Param(field.db_value(default))
op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('ALTER COLUMN'), pw.Entity(column_name), pw.SQL('SET DEFAULT'), param)
return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, op)
def rename_column(db, migrator, ntn, ocn, ncn, field):
qc = db.compiler()
if is_mysql(db):
junk = pw.Clause(
pw.SQL('ALTER TABLE'), pw.Entity(ntn), pw.SQL('CHANGE'), pw.Entity(ocn), qc.field_definition(field)
)
else:
junk = migrator.rename_column(ntn, ocn, ncn, generate=True)
return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, junk)
def change_column_type(db, migrator, table_name, column_name, field):
qc = db.compiler()
column_type = qc.get_column_type(field.get_db_field())
if is_postgres(db):
op = pw.Clause(pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('ALTER'), field.as_entity(), pw.SQL('TYPE'), field.__ddl_column__(column_type))
elif is_mysql(db):
op = pw.Clause(*[pw.SQL('ALTER TABLE'), pw.Entity(table_name), pw.SQL('MODIFY')] + field.__ddl__(column_type))
else:
raise Exception('how do i change a column type for %s?' % db)
return normalize_whatever_junk_peewee_migrations_gives_you(db, migrator, op)
def exists(self):
clone = self.paginate(1, 1)
clone._select = [SQL('1')]
return bool(await clone.scalar())
def default_insert_clause(self, model_class):
return SQL('DEFAULT VALUES')
def test_select_all(flushdb):
await create_users_blogs(2, 2)
all_cols = SQL('*')
query = Blog.select(all_cols)
blogs = [blog async for blog in query.order_by(Blog.pk)]
assert [b.title for b in blogs] == ['b-0-0', 'b-0-1', 'b-1-0', 'b-1-1']
assert [(await b.user).username for b in blogs] == ['u0', 'u0', 'u1', 'u1']
def exists(self):
clone = self.paginate(1, 1)
clone._select = [SQL('1')]
raise gen.Return(bool((yield clone.scalar())))