def validate_obfuscated_name(self):
"""Make sure the obfuscated name is unique."""
if self.name != self.obfuscated_name:
is_unique = False
while not is_unique:
if not self.obfuscated_name:
random_num = base_random_number(5)
self.obfuscated_name = base_alphabet_encode(random_num, 5)
try:
get_reserved_by_name(self.obfuscated_name)
except pwe.DoesNotExist:
is_unique = True
else:
self.obfuscated_name = None
python类DoesNotExist()的实例源码
def save_identifier(identifier_row, **kwargs):
"""Save and identifier row.
Parameters
----------
identifier_row
Returns
-------
identifier_id : int
"""
try:
for name, value in kwargs.iteritems():
getattr(identifier_row, name) # Make sure column exists
setattr(identifier_row, name, value)
except AttributeError:
raise
with obfuscatedb.atomic():
try:
identifier_id = save_row(identifier_row, **kwargs)
except IntegrityError as e:
# Resave with different obfuscated_name if already exists
if 'unique' in e.message.lower() \
and not kwargs.get('obfuscated_name', None) \
and 'obfuscated_name' in e.message:
identifier_row.obfuscated_name = None
identifier_id = save_identifier(identifier_row)
else:
raise
except DoesNotExist:
raise
return identifier_id
def add_from_import(self, from_import_list):
"""Add imported modules from reserved modules to reserved.
Parameters
----------
from_import_list : list
"""
if not from_import_list or \
from_import_list[0] != 'from' or \
'import' not in from_import_list[:]:
return
reserved_list = set()
import_index = from_import_list[:].index('import')
package_name = ''
is_reserved = False
for reserve_name in from_import_list[1:import_index]:
# Start with first reserved directory in tree (if one exists)
if not is_reserved:
try:
get_reserved_by_name(reserve_name)
is_reserved = True
package_name = reserve_name
except DoesNotExist:
continue
if is_reserved:
if reserve_name[0].isalpha() or reserve_name[0] == '_':
reserved_list.add(reserve_name)
if is_reserved:
# Get imported items
for reserve_name in from_import_list[import_index+1:]:
if reserve_name[0].isalpha() or reserve_name[0] == '_':
reserved_list.add(reserve_name)
add_reserveds(package_name, reserved_list)
def add_kivy_import(self, kivy_import_list):
"""Add imported modules from reserved modules to reserved.
Parameters
----------
kivy_import_list : list
Kivy import statement.
"""
if not kivy_import_list or \
kivy_import_list[0].strip() != '#:' or \
kivy_import_list[1] != 'import':
return
reserved_list = set()
package_name = ''
is_reserved = False
for reserve_name in kivy_import_list[3:]:
# Start with first reserved directory in tree (if one exists)
if not is_reserved:
try:
get_reserved_by_name(reserve_name)
is_reserved = True
package_name = reserve_name
except DoesNotExist:
continue
if is_reserved:
if reserve_name[0].isalpha() or reserve_name[0] == '_':
reserved_list.add(reserve_name)
if is_reserved:
reserved_list.add(kivy_import_list[2])
add_reserveds(package_name, reserved_list)
def test_get_reserved():
# Get good reserved_id
reserved_row = get_reserved(1)
assert reserved_row.id == 1
assert reserved_row.name == u'reserved_one'
# Fail at getting bad reserved_id
with pytest.raises(DoesNotExist):
get_reserved(999)
def test_create_decision():
from logic.reserved import Reserved
from logic.reserved import get_reserved
time_before = datetime.datetime.now()
reserved_row = get_reserved(None)
time_after = datetime.datetime.now()
assert not reserved_row.id
assert reserved_row.name == Reserved.name.default
assert time_before <= reserved_row.created_timestamp <= time_after
# Fail at bad parameter
with pytest.raises(DoesNotExist):
get_reserved(999)
def test_get_identifier():
# Get good identifier_id
identifier_row = get_identifier(1)
assert identifier_row.id == 1
assert identifier_row.name == u'identifier_one'
# Fail at getting bad identifier_id
with pytest.raises(DoesNotExist):
get_identifier(999)
def test_get_identifier_by_name():
# Get good identifier_id
identifier_row = get_identifier_by_name('identifier_one')
assert identifier_row.id == 1
assert identifier_row.name == u'identifier_one'
# Fail at getting bad identifier_id
with pytest.raises(DoesNotExist):
get_identifier_by_name('junk')
def test_add_reserveds():
# Define values to be updated, and their new values
names_to_update = {
"variable": "v001",
"variable_row": "v002",
"scenario": "s001",
"decision": "d001",
"decision_row": "d002"
}
for ident_name, obfuscated_name in names_to_update.iteritems():
identifier_row = get_identifier(None)
save_identifier(
identifier_row,
name=ident_name,
obfuscated_name=obfuscated_name
)
bnf_parser = ObfuscatePythonBNF(get_obfuscated_name)
# Identifiers after leading reserved attributes should not be obfuscated
# They should be added to reserved and removed from identifiers
bnf_parser.attribs.parseString(
"decision.scenario(variable) = reserved_one.variable "
"+ view.reserved_one")
assert bnf_parser.statement.transformString(
"decision.scenario(variable) = reserved_one.variable") == \
"d001.s001(variable)=reserved_one.variable"
assert get_reserved_by_name('variable').primary_package == 'reserved_one'
assert get_identifier_by_name('variable').obfuscated_name == 'variable'
with pytest.raises(DoesNotExist):
assert get_identifier_by_name('view').obfuscated_name != 'view'
def test_add_reserveds():
# Define values to be updated, and their new values
names_to_update = {
"variable": "v001",
"variable_row": "v002",
"scenario": "s001",
"decision": "d001",
"decision_row": "d002"
}
for ident_name, obfuscated_name in names_to_update.iteritems():
identifier_row = get_identifier(None)
save_identifier(
identifier_row,
name=ident_name,
obfuscated_name=obfuscated_name
)
bnf_parser = ObfuscateKivyBNF(get_obfuscated_name)
# Identifiers after leading reserved attributes should not be obfuscated
# They should be added to reserved and removed from identifiers
bnf_parser.attribs.parseString(
"decision.scenario(variable) = reserved_one.variable "
"+ view.reserved_one")
assert bnf_parser.statement.transformString(
"decision.scenario(variable) = reserved_one.variable") == \
"d001.s001(variable)=reserved_one.variable"
assert get_reserved_by_name('variable').primary_package == 'reserved_one'
assert get_identifier_by_name('variable').obfuscated_name == 'variable'
with pytest.raises(DoesNotExist):
assert get_identifier_by_name('view').obfuscated_name != 'view'
def get_row_by_FK(model, FK_name, FK_id, is_required=False, **kwargs):
"""Get a unique row a FK id or return an empty row.
Parameters
----------
FK_id : int
FK_name : str
is_required : bool
model : BaseModel
Returns
-------
row
"""
if FK_id:
try:
row = model.get(getattr(model, FK_name) == FK_id)
except DoesNotExist:
if is_required:
raise
else:
row = model()
else:
row = model()
return row
def get_row_by_fields(model, **kwargs):
"""Get a unique row by arbitrary fields.
Parameters
----------
kwargs : dict of field names and values
model : BaseModel
Returns
-------
row
"""
field_name = []
field_value = []
try:
for name, value in kwargs.iteritems():
field_name.append(getattr(model, name)) # Make sure column exists
field_value.append(value)
except AssertionError as e:
raise
try:
if len(field_name) == 1:
row = model.get(field_name[0] == field_value[0])
elif len(field_name) == 2:
row = model.get(field_name[1] == field_value[1])
elif len(field_name) == 3:
row = model.get(field_name[2] == field_value[2])
elif len(field_name) == 4:
row = model.get(field_name[3] == field_value[3])
elif len(field_name) == 5:
row = model.get(field_name[4] == field_value[4])
else:
raise ValueError('Too many arguments for row lookup')
except DoesNotExist:
raise
else:
return row
def _get_existing_episode_from_database(episode):
"""Retrieves an existing episode from the database.
An episode is equal if it has the same show, season, episode
and quality. We store multiple show+season+episode Episodes if
the quality is different so later on we can decide which
we want to download and perhaps wait for a better quality episode."""
try:
return Episode.select() \
.where(Episode.show == episode.show) \
.where(Episode.season == episode.season) \
.where(Episode.episode == episode.episode) \
.where(Episode.quality == episode.quality) \
.get()
except DoesNotExist:
return None
def _is_episode_downloaded(episode):
"""Checks if this item has already been downloaded."""
try:
Episode.select() \
.where(Episode.show == episode.show) \
.where(Episode.season == episode.season) \
.where(Episode.episode == episode.episode) \
.where(Episode.is_downloaded == 1) \
.get()
return True
except DoesNotExist:
return False
def get(show_id):
"""Handles GET requests. Returns a single show."""
try:
show = Show.get(Show.id == show_id)
return show.to_dict()
except DoesNotExist:
abort(404)
def delete(show_id):
"""Handles DELETE requests. Deletes a show."""
try:
show = Show.get(Show.id == show_id)
show.delete_instance()
return '', 204
except DoesNotExist:
abort(404)
def startup_library_service():
wamp = autobahn_sync.AutobahnSync()
wamp.run()
db = pw.SqliteDatabase('books.db')
class Book(pw.Model):
title = pw.CharField()
author = pw.CharField()
class Meta:
database = db
try:
db.create_table(Book)
except pw.OperationalError:
pass
@wamp.register('com.library.get_book')
def get_book(id):
try:
b = Book.get(id=id)
except pw.DoesNotExist:
return {'_error': "Doesn't exist"}
return {'id': id, 'title': b.title, 'author': b.author}
@wamp.register('com.library.new_book')
def new_book(title, author):
book = Book(title=title, author=author)
book.save()
wamp.session.publish('com.library.book_created', book.id)
return {'id': book.id}
def get_object_or_404(query_or_model, *query):
if not isinstance(query_or_model, SelectQuery):
query_or_model = query_or_model.select()
try:
return query_or_model.where(*query).get()
except DoesNotExist:
abort(404)
def get_user_battery():
username = get_jwt_identity()
# username = request.args.get("username")
try:
battery, battery_record = battery_rent_service.get_user_battery(
username=username
)
# battery = model_to_dict(battery)
# battery_record = model_to_dict(battery_record)
battery = {
"serial_number": battery.serial_number,
"rent_date": battery_record.rent_date
}
return jsonify({
'response': {
"battery": battery,
# "battery_record": battery_record
}
}), 200
except DoesNotExist as e:
return jsonify({
'response': {
"battery": [],
}
}), 200
# return jsonify({
# 'response': {
# 'error': e.args,
# 'message': '?????'
# }
# }), 400
# 3. ????