def get_player(self, login=None, pk=None, lock=True):
"""
Get player by login or primary key.
:param login: Login.
:param pk: Primary Key identifier.
:param lock: Lock for a sec when receiving.
:return: Player or exception if not found
:rtype: pyplanet.apps.core.maniaplanet.models.Player
"""
try:
if login:
return await Player.get_by_login(login)
elif pk:
return await Player.get(pk=pk)
else:
raise PlayerNotFound('Player not found.')
except DoesNotExist:
if lock:
await asyncio.sleep(4)
return await self.get_player(login=login, pk=pk, lock=False)
else:
raise PlayerNotFound('Player not found.')
python类DoesNotExist()的实例源码
def check_db_schema_version():
""" Ensure DB schema is correct version. Drop tables if not. """
db_schema_version = None
try:
db_schema_version = Setting.get(Setting.name == 'DB_SCHEMA_VERSION').value
except (peewee.OperationalError, peewee.DoesNotExist, peewee.ProgrammingError) as e:
printdbg("[info]: Can't get DB_SCHEMA_VERSION...")
printdbg("[info]: SCHEMA_VERSION (code) = [%s]" % SCHEMA_VERSION)
printdbg("[info]: DB_SCHEMA_VERSION = [%s]" % db_schema_version)
if (SCHEMA_VERSION != db_schema_version):
printdbg("[info]: Schema version mis-match. Syncing tables.")
try:
existing_table_names = db.get_tables()
existing_models = [m for m in db_models() if m._meta.db_table in existing_table_names]
if (existing_models):
printdbg("[info]: Dropping tables...")
db.drop_tables(existing_models, safe=False, cascade=False)
except (peewee.InternalError, peewee.OperationalError, peewee.ProgrammingError) as e:
print("[error] Could not drop tables: %s" % e)
def check_db_schema_version():
""" Ensure DB schema is correct version. Drop tables if not. """
db_schema_version = None
try:
db_schema_version = Setting.get(Setting.name == 'DB_SCHEMA_VERSION').value
except (peewee.OperationalError, peewee.DoesNotExist, peewee.ProgrammingError) as e:
printdbg("[info]: Can't get DB_SCHEMA_VERSION...")
printdbg("[info]: SCHEMA_VERSION (code) = [%s]" % SCHEMA_VERSION)
printdbg("[info]: DB_SCHEMA_VERSION = [%s]" % db_schema_version)
if (SCHEMA_VERSION != db_schema_version):
printdbg("[info]: Schema version mis-match. Syncing tables.")
try:
existing_table_names = db.get_tables()
existing_models = [m for m in db_models() if m._meta.db_table in existing_table_names]
if (existing_models):
printdbg("[info]: Dropping tables...")
db.drop_tables(existing_models, safe=False, cascade=False)
except (peewee.InternalError, peewee.OperationalError, peewee.ProgrammingError) as e:
print("[error] Could not drop tables: %s" % e)
def users(self, ctx: commands.Context, count=10):
"""
Show users with the most messages
"""
count = count
# Show at least 1 user and 20 at most
count = max(1, count)
count = min(20, count)
try:
server = self.orm.Server.get(discord_id=ctx.message.server.id)
except DoesNotExist as e:
# If there's no server in the db an exception will be raised
self.config.logger.error(e)
else:
users = await self.orm.query.user_top_list(count, server)
embed = discord.Embed(color=discord.Color(self.config.constants.embed_color),
timestamp=datetime.datetime.now())
for user in users:
# the user might not have a name if s/he hasn't sent a message already
# so in that case use the id instead
name = user.name if user.name != '' else user.discord_id
embed.add_field(name=name, value='Total messages: {}'.format(user.count), inline=False)
await self.bot.say(content='Top active users:', embed=embed)
def get_obfuscated_by_lower(obfuscated_lower, **kwargs):
"""Get an identifier row from the lowercase obfuscated value.
Parameters
----------
obfuscated_lower: str
Returns
-------
identifier_row
"""
try:
identifier_row = get_row_by_fields(Identifier,
obfuscated_lower=obfuscated_lower)
return identifier_row.name
except DoesNotExist:
return None
def get_obfuscated_name(identifier_name, **kwargs):
"""Get obfuscated name of an identifier.
Parameters
----------
identifier_name : str
Returns
-------
obfuscated_name : str
"""
try:
identifier_row = get_identifier_by_name(identifier_name)
return identifier_row.obfuscated_name
except DoesNotExist:
return identifier_name
def add_attribs_reserveds_list(attrib_list):
"""Add attributes that follow a reserved name to reserveds list."""
if len(attrib_list) > 1: # A single item does not change
is_reserved = False
reserved_set = set()
for name in attrib_list:
if not is_reserved:
try:
get_reserved_by_name(name)
is_reserved = True
package_name = name
continue # Don't add already reserved name
except DoesNotExist:
continue
if is_reserved:
reserved_set.add(name)
if reserved_set:
add_reserveds(package_name, reserved_set)
def test_logic_reserved_delete():
from logic.reserved import delete_reserved, get_reserved, \
save_reserved
reserved_row = get_reserved(None)
assert save_reserved(
reserved_row,
name=u'Reserved Two')
assert get_reserved(2).id == 2, 'Record not present or wrong id'
# Delete an existing reserved
row = get_reserved(2)
assert delete_reserved(row) == 1
with pytest.raises(DoesNotExist):
assert get_reserved(2).id == 2, 'Record not found'
# Fail to delete a non-existing reserved
with pytest.raises(DoesNotExist):
assert delete_reserved(row)
def test_logic_identifier_delete():
from logic.identifier import delete_identifier, get_identifier, \
save_identifier
identifier_row = get_identifier(None)
assert save_identifier(
identifier_row,
name=u'Identifier Two')
assert get_identifier(2).id == 2, 'Record not present or wrong id'
# Delete an existing identifier
row = get_identifier(2)
assert delete_identifier(row) == 1
with pytest.raises(DoesNotExist):
assert get_identifier(2).id == 2, 'Record not found'
# Fail to delete a non-existing identifier
with pytest.raises(DoesNotExist):
assert delete_identifier(row)
def test_create_decision():
from logic.identifier import Identifier
from logic.identifier import get_identifier
time_before = datetime.datetime.now()
identifier_row = get_identifier(None)
time_after = datetime.datetime.now()
assert not identifier_row.id
assert identifier_row.name == Identifier.name.default
assert identifier_row.obfuscated_name == Identifier.obfuscated_name.default
assert time_before <= identifier_row.created_timestamp <= time_after
# Fail at bad parameter
with pytest.raises(DoesNotExist):
get_identifier(999)
def test_add_from_import():
bnf_parser = ObfuscatePythonBNF(get_obfuscated_name)
# Reserve imported modules if first from module is reserved
bnf_parser.from_import.parseString(
" from some_module.reserved_one import is_reserved, also_is")
assert get_reserved_by_name('is_reserved')
assert get_reserved_by_name('also_is')
# Reserve imported modules if first from module is reserved
bnf_parser.from_import.parseString(
" from reserved_one.some_module import is_reserved, also_reserved")
assert get_reserved_by_name('is_reserved').primary_package == \
'reserved_one'
assert get_reserved_by_name('also_reserved').primary_package == \
'reserved_one'
# Import (without a from) should take no action
bnf_parser.from_import.parseString(
" import reserved_one, not_reserved_1, also_not_1")
with pytest.raises(DoesNotExist):
get_reserved_by_name('not_reserved_1')
with pytest.raises(DoesNotExist):
get_reserved_by_name('also_not_1')
def test_add_except_error():
bnf_parser = ObfuscatePythonBNF(get_obfuscated_name)
# Reserve exception names
bnf_parser.except_error.parseString(
" except (FloatingPointError, SomeOtherError) as exc_err")
assert get_reserved_by_name('FloatingPointError')
assert get_reserved_by_name('SomeOtherError')
with pytest.raises(DoesNotExist):
assert get_reserved_by_name('exc_err')
# Reserve exception names with double tab
bnf_parser.except_error.parseString(
" except (FloatingPointError, SomeOtherError) as exc_err")
assert get_reserved_by_name('FloatingPointError')
assert get_reserved_by_name('SomeOtherError')
with pytest.raises(DoesNotExist):
assert get_reserved_by_name('exc_err')
def test_add_kivy_import():
bnf_parser = ObfuscateKivyBNF(get_obfuscated_name)
# Reserve imported modules from first reserved module
bnf_parser.kivy_import.parseString(
"#: import is_reserved some_module.reserved_one")
assert get_reserved_by_name('is_reserved').primary_package == \
'reserved_one'
with pytest.raises(DoesNotExist):
get_reserved_by_name('some_module')
# Reserve imported modules if first from module is reserved
bnf_parser.kivy_import.parseString(
"#: import is_reserved reserved_one.some_module")
assert get_reserved_by_name('is_reserved').primary_package == \
'reserved_one'
assert get_reserved_by_name('some_module').primary_package == \
'reserved_one'
# Import without directive should take no action
bnf_parser.kivy_import.parseString(
"import not_reserved_1 reserved_one")
with pytest.raises(DoesNotExist):
get_reserved_by_name('not_reserved_1')
def get_row(model, row_id, **kwargs):
"""Get a database row by its id.
Parameters
----------
model : BaseModel
row_id : int or None
Returns
-------
row
"""
try:
if row_id:
try:
row = model.get(model.id == row_id)
except DoesNotExist:
raise
else:
row = model()
except ImportError:
raise
return row
def get_row_by_name(model, row_name, FK_name=None, FK_id=None, **kwargs):
"""Get a unique row by its name or by name plus FK if a FK is needed.
Parameters
----------
FK_id : int
FK_name : str
model : BaseModel
row_name : str
Returns
-------
row
"""
try:
if FK_name:
row = model.get(model.name == row_name,
getattr(model, FK_name) == FK_id)
else:
row = model.get(model.name == row_name)
except DoesNotExist:
raise
else:
return row
def _process_download_command(data):
matches = re.search('download (\d{1,})', data)
if matches is not None:
episode_id = int(matches.group(1))
try:
episode = Episode.get(Episode.id == episode_id)
torrentclient = Transmission()
torrentclient.download_episode(episode)
episode.is_downloaded = True
episode.save()
return True
except DoesNotExist:
error = 'Tried to download non-existing episode with ID: %d'
logging.error(error % episode_id)
return False
except TorrentClientException as e:
logging.error('TorrentClientException occured: %s' % str(e))
return False
else:
logging.error('Incorrect download command received: %s' % data)
return False
def get_virtual_card():
username = request.args.get("username")
try:
virtual_card = virtual_card_service.get_virtual_card(
card_no=username
)
virtual_card = model_to_dict(virtual_card, recurse=False)
return jsonify({'response': virtual_card}), 200
except DoesNotExist as e:
return jsonify({
'response': {
'error': e.args,
'message': '????????'
}
}), 400
# ????
def get_battery(serial_number):
"""
get battery rent information
:param serial_number: battery serial_number
:return: information
"""
try:
battery = battery_rent_service.get_battery(serial_number)
return jsonify({'response': model_to_dict(battery)}), 200
except DoesNotExist as e:
return jsonify({
'response': {
"error": '%s: %s' % (str(DoesNotExist), e.args),
"message": '%s' % e.args
}
}), 400
# 2. ?????? ????
def validate(self, name, data):
"""
If there is a problem with the data, raise ValidationError.
:param name: The name of this field.
:param data: Dictionary of data for all fields.
:raises: ValidationError
"""
super().validate(name, data)
if self.value is not None:
try:
# self.query could be a query like "User.select()" or a model like "User"
# so ".select().where()" handles both cases.
self.value = [self.query.select().where(self.lookup_field == v).get() for v in self.value if v]
except (AttributeError, ValueError, peewee.DoesNotExist):
raise ValidationError('related', field=self.lookup_field.name, values=self.value)
def add_image(dataset_pk, image):
try:
dataset = db.Dataset.get(db.Dataset.dataset == dataset_pk)
except peewee.DoesNotExist:
print("Can't find any dataset with id {}".format(dataset_pk))
sys.exit(1)
with open(image, 'rb') as f:
data = f.read()
try:
mimetype = infer_mimetype( image )
except NoMimetypeException:
print("Can't find mime type for <{}>".format(image))
sys.exit(2)
print(len(data))
with db.database.atomic():
db.DatasetLogo.create(
dataset = dataset,
mimetype = mimetype,
data = data
)
def get_current_user(self):
email = self.get_secure_cookie('email')
name = self.get_secure_cookie('user')
# Fix ridiculous bug with quotation marks showing on the web
if name and (name[0] == '"') and (name[-1] == '"'):
name = user[1:-1]
if email:
try:
return db.User.select().where( db.User.email == email ).get()
except peewee.DoesNotExist:
## Not saved in the database yet
return db.User(email = email.decode('utf-8'),
name = name.decode('utf-8'))
else:
return None
def get_map(self, uid=None):
"""
Get map instance by uid.
:param uid: By uid (pk).
:return: Player or exception if not found
"""
try:
return await Map.get_by_uid(uid)
except DoesNotExist:
raise MapNotFound('Map not found.')
def get_map_by_index(self, index):
"""
Get map instance by index id (primary key).
:param index: Primary key index.
:return: Map instance or raise exception.
"""
try:
return await Map.get(id=index)
except DoesNotExist:
raise MapNotFound('Map not found.')
def register(self, name, description='', app=None, min_level=1, namespace=None):
"""
Register a new permission.
:param name: Name of permission
:param description: Description in english.
:param app: App instance to retrieve the label.
:param min_level: Minimum level required.
:param namespace: Namespace, only for core usage!
:return: Permission instance.
"""
if not namespace and app:
namespace = app.label
if not namespace:
raise Exception('Namespace is required. You should give your app instance with app=app instead!')
try:
perm = await self.get_perm(namespace=namespace, name=name)
# TODO: Implement overrides on min_level here.
if perm.min_level != min_level:
perm.min_level = min_level
await perm.save()
except DoesNotExist:
perm = Permission(namespace=namespace, name=name, description=description, min_level=min_level)
await perm.save()
return perm
def get(self, name):
setting_name = "__transient_%s" % (name)
try:
the_setting = Setting.get(Setting.name == setting_name)
t = Transient.from_setting(the_setting)
except Setting.DoesNotExist as e:
return False
if t.is_expired():
the_setting.delete_instance()
return False
else:
return t.value
def delete(self, name):
setting_name = "__transient_%s" % (name)
try:
s = Setting.get(Setting.name == setting_name)
except Setting.DoesNotExist as e:
return False
return s.delete_instance()
# === /models ===
def get(self, name):
setting_name = "__transient_%s" % (name)
try:
the_setting = Setting.get(Setting.name == setting_name)
t = Transient.from_setting(the_setting)
except Setting.DoesNotExist as e:
return False
if t.is_expired():
the_setting.delete_instance()
return False
else:
return t.value
def delete(self, name):
setting_name = "__transient_%s" % (name)
try:
s = Setting.get(Setting.name == setting_name)
except Setting.DoesNotExist as e:
return False
return s.delete_instance()
# === /models ===
def total(self, ctx: commands.Context):
"""
Show total amount of messages on server
"""
try:
server = self.orm.Server.get(discord_id=ctx.message.server.id)
except DoesNotExist as e:
# If there's no server in the db an exception will be raised
self.config.logger.error(e)
else:
total_messages = await self.orm.query.server_total_messages(server)
await self.bot.say('Total messages: {}'.format(total_messages))
def add_identifiers(identifier_list=None, do_obfuscate=True):
"""Add identifier and obfuscated names to Identifiers table.
Parameters
----------
do_obfuscate : bool
identifier_list : list
"""
for identifier_name in identifier_list:
# Skip identifiers in reserved
try:
get_reserved_by_name(identifier_name)
except DoesNotExist:
pass
else:
continue
if not do_obfuscate \
or identifier_name[0:2] == '__' \
or identifier_name == '__init__.py' \
or identifier_name.startswith('test_') \
or identifier_name.endswith('_test.py'):
obfuscated_name = identifier_name
else:
obfuscated_name = ''
identifier_row = get_identifier(None)
try:
save_identifier(identifier_row,
name=identifier_name,
obfuscated_name=obfuscated_name)
except IntegrityError as e:
if 'unique' not in e.message.lower():
raise
# If should not be obfuscated, replace obfuscated, o/w pass
if not do_obfuscate:
identifier_row = get_identifier_by_name(identifier_name)
if identifier_row.obfuscated_name != identifier_name:
save_identifier(identifier_row,
obfuscated_name=identifier_name)