def test_unique_pairs(add_text, add_doc, add_citation):
"""
Don't allow duplicate links between the same text -> syllabus pair.
"""
t = add_text()
d = add_doc()
add_citation(text=t, document=d)
with pytest.raises(IntegrityError):
add_citation(text=t, document=d)
python类IntegrityError()的实例源码
def test_unique_corpus_identifier(add_text):
"""
Don't allow duplicate corpus+identifier.
"""
add_text(corpus='jstor', identifier='001')
with pytest.raises(IntegrityError):
add_text(corpus='jstor', identifier='001')
def followLeader(leaderid):
if leaderid == g.user['id']:
return (jsonify(
{'errors': [{'detail': 'You can not follow yourself silly'}]}),
405)
# if there is an integrity error (the relation is already existant)
try:
FollowerRelation.create(follower=g.user['id'], leader=leaderid)
except IntegrityError:
return (jsonify(
{'errors': [{'detail': 'You are already following this user'}]}),
405)
return (jsonify({'success': 'Followed user succefully'}))
def heart_it(contentid):
# my endpoint to heart this content with contentid
try:
# I don't need the returned heart instance
Heart.create(imageId=contentid, userId=g.user['id'])
except IntegrityError:
return (jsonify(
{'error': 'You sneaky bastard you already hearted this content.'}),
202)
return (jsonify({'success': 'Nice you hearted this content.'}), 200)
def insert_new_jobs(raw_data_files, jar, xml, queue, progress=True, **kwargs):
failed_files = []
for f in tqdm(raw_data_files, total=raw_data_files.count(), disable=not progress):
try:
insert_new_job(f, jar=jar, xml=xml, queue=queue, **kwargs)
except peewee.IntegrityError:
log.warning('Job already submitted: {}_{:03d}'.format(f.night, f.run_id))
except ValueError as e:
log.warning('Could not submit {}_{:03d}: {}'.format(
f.night, f.run_id, e,
))
failed_files.append(f)
return failed_files
def create(self, key, value, ttl=0):
try:
item = Cacheable.create(key=key, value=value.strip(), ttl=ttl)
except IntegrityError:
item = None
return item
def update(self, item, value, ttl=0):
kwargs = {
'value': value.strip(),
'ttl': ttl,
'updated': now()
}
try:
query = Cacheable.update(**kwargs).where(Cacheable.key == item.key)
query.execute()
except IntegrityError:
pass
return item
def create_single(self, model, kwargs):
model = self.get_model(model)
try:
item = model.create(**kwargs)
except IntegrityError:
item = None
return item
def update_single(self, model, item, kwargs):
model = self.get_model(model)
try:
kwargs['updated'] = now()
query = model.update(**kwargs).where(model.id == item.id)
query.execute()
except IntegrityError:
item = None
return item
def create_or_get(cls, **kwargs):
try:
result = (yield cls.create(**kwargs)), True
except IntegrityError:
query = [] # TODO: multi-column unique constraints.
for field_name, value in kwargs.items():
field = getattr(cls, field_name)
if field.unique or field.primary_key:
query.append(field == value)
result = (yield cls.get(*query)), False
raise gen.Return(result)
def test_signature_functionality(folder_model):
"""Ensure that the FieldSignatureMixin works as expected."""
# Create a Folder named 'etc' in the root of the file system.
etc_folder = folder_model(name='etc')
etc_folder.save()
assert etc_folder.signature
assert len(etc_folder.signature) == 40
# No other Folders in the root may be named 'etc
with pytest.raises(peewee.IntegrityError):
folder_model(name='etc').save()
# Let's test this with a child Folder of 'etc'
apt_folder = folder_model(name='apt', parent_folder=etc_folder)
apt_folder.save()
assert apt_folder.signature
assert len(apt_folder.signature) == 40
# Can't have another named 'apt' in 'etc'
with pytest.raises(peewee.IntegrityError):
folder_model(name='apt', parent_folder=etc_folder).save()
# If a Folder is archived, it's signature is nulled out
etc_folder.archive_instance()
assert etc_folder.signature is None
def post(self):
await self.request.post()
create = self.request.POST.get('create') is not None
update = self.request.POST.get('update') is not None
delete = self.request.POST.get('delete') is not None
uid = self.request.POST.get('uid')
active = True if self.request.POST.get('active') is not None else False
name = self.request.POST.get('name', '').strip()
description = self.request.POST.get('description')
try:
page = int(self.request.POST.get('page', 1))
except (ValueError, TypeError):
page = 1
if all((create, active, name)):
try:
async with settings.manager.atomic():
created = await settings.manager.create(
Record,
active=active,
name=name,
description=description)
except peewee.IntegrityError:
return (await self.ajax_empty('not_created'))
else:
return (await self.ajax_page('create', page))
elif all((update, uid, active, name)):
try:
async with settings.manager.atomic():
updated = await settings.manager.execute(
Record
.update(
active=active,
name=name,
description=description)
.where(Record.uid == uid))
except peewee.IntegrityError:
return (await self.ajax_empty('not_updated'))
else:
return (await self.ajax_page('update', page))
elif all((delete, uid)):
try:
async with settings.manager.atomic():
deleted = await settings.manager.execute(
Record
.delete()
.where(Record.uid == uid))
except peewee.IntegrityError:
return (await self.ajax_empty('not_deleted'))
else:
return (await self.ajax_page('delete', page))
else:
return (await self.ajax_empty('not_command'))
def post(self):
"""
Handles POST request.
Validates data and creates new item.
Returns serialized object written to response.
HTTPError 405 is raised in case of not creatable model (there must be
_create method implemented in model class).
HTTPError 400 is raised in case of violated constraints, invalid
parameters and other data and integrity errors.
:raises: HTTPError 405, 400
"""
data = await self.validate(self.request.body, self.post_schema_input)
try:
item = await self.model_cls._create(self.application, data)
except AttributeError as e:
# We can only create item if _create() model method implemented
raise HTTPError(
405,
body=self.get_response(
errors=[
{
'code': '',
'message': 'Method not allowed',
'detail': str(e)
}
]
)
)
except (peewee.IntegrityError, peewee.DataError) as e:
raise HTTPError(
400,
body=self.get_response(
errors=[
{
'code': '',
'message': 'Invalid parameters',
'detail': str(e)
}
]
)
)
self.response(result=await self.serialize(item))
def put(self, item_id):
"""
Handles PUT request.
Validates data and updates given item.
Returns serialized model.
Raises 405 in case of not updatable model (there must be
_update method implemented in model class).
Raises 400 in case of violated constraints, invalid parameters and other
data and integrity errors.
:raises: HTTP 405, HTTP 400.
"""
item = await self.get_item(item_id)
data = await self.validate(self.request.body, self.put_schema_input, item_id=item_id)
try:
item = await item._update(self.application, data)
except AttributeError as e:
# We can only update item if model method _update is implemented
raise HTTPError(
405,
body=self.get_response(
errors=[
{
'code': '',
'message': 'Method not allowed',
'detail': str(e)
}
]
)
)
except (peewee.IntegrityError, peewee.DataError) as e:
raise HTTPError(
400,
body=self.get_response(
errors=[
{
'code': '',
'message': 'Invalid parameters',
'detail': str(e)
}
]
)
)
self.response(result=await self.serialize(item))
def import_gobject_from_monacoCoind(self, monacoCoind, rec):
import monacoCoinlib
import inflection
object_hex = rec['DataHex']
object_hash = rec['Hash']
gobj_dict = {
'object_hash': object_hash,
'object_fee_tx': rec['CollateralHash'],
'absolute_yes_count': rec['AbsoluteYesCount'],
'abstain_count': rec['AbstainCount'],
'yes_count': rec['YesCount'],
'no_count': rec['NoCount'],
}
# shim/monacoCoind conversion
object_hex = monacoCoinlib.SHIM_deserialise_from_monacoCoind(object_hex)
objects = monacoCoinlib.deserialise(object_hex)
subobj = None
obj_type, dikt = objects[0:2:1]
obj_type = inflection.pluralize(obj_type)
subclass = self._meta.reverse_rel[obj_type].model_class
# set object_type in govobj table
gobj_dict['object_type'] = subclass.govobj_type
# exclude any invalid model data from monacoCoind...
valid_keys = subclass.serialisable_fields()
subdikt = {k: dikt[k] for k in valid_keys if k in dikt}
# get/create, then sync vote counts from monacoCoind, with every run
govobj, created = self.get_or_create(object_hash=object_hash, defaults=gobj_dict)
if created:
printdbg("govobj created = %s" % created)
count = govobj.update(**gobj_dict).where(self.id == govobj.id).execute()
if count:
printdbg("govobj updated = %d" % count)
subdikt['governance_object'] = govobj
# get/create, then sync payment amounts, etc. from monacoCoind - monacoCoind is the master
try:
subobj, created = subclass.get_or_create(object_hash=object_hash, defaults=subdikt)
except (peewee.OperationalError, peewee.IntegrityError) as e:
# in this case, vote as delete, and log the vote in the DB
printdbg("Got invalid object from monacoCoind! %s" % e)
if not govobj.voted_on(signal=VoteSignals.delete, outcome=VoteOutcomes.yes):
govobj.vote(monacoCoind, VoteSignals.delete, VoteOutcomes.yes)
return (govobj, None)
if created:
printdbg("subobj created = %s" % created)
count = subobj.update(**subdikt).where(subclass.id == subobj.id).execute()
if count:
printdbg("subobj updated = %d" % count)
# ATM, returns a tuple w/gov attributes and the govobj
return (govobj, subobj)
def get_page_context(self):
"""
Return current page context
"""
paginate = self.get_argument('paginate', None)
try:
page = int(self.get_argument('page', 1))
except ValueError:
page = 1
try:
count = Record\
.select()\
.where(Record.active == True)\
.count()
except peewee.IntegrityError:
count = 0
per_page = env.RECORDS_PER_PAGE
page_count = int(count/per_page) + int(bool(count % per_page))
prev_page, page, next_page = self.paging(page, page_count)
try:
if paginate is None:
records = Record\
.select()\
.where(Record.active == True)\
.order_by(Record.name.asc())\
.paginate(
page,
paginate_by=per_page)
else:
records = Record\
.select(
Record,
RecordPage)\
.where(
RecordPage.page == page)\
.join(
RecordPage,
join_type=peewee.JOIN.INNER)\
.order_by(Record.name.asc())
except peewee.IntegrityError:
records = []
return dict(records=records,
paginate=paginate,
page_count=page_count,
prev_page=prev_page,
page=page,
next_page=next_page)
def save_reserved(reserved_row, **kwargs):
"""Save a Reserved row."""
from logic.identifier import get_identifier_by_name, \
get_identifier_by_obfuscated, save_identifier, get_identifier
try:
for name, value in kwargs.iteritems():
getattr(reserved_row, name) # Make sure column exists
setattr(reserved_row, name, value)
except AttributeError:
raise
with obfuscatedb.atomic():
try:
reserved_id = save_row(reserved_row, **kwargs)
except IntegrityError:
raise
####################
# Update identifiers
####################
if reserved_row.name[0] in [reserved_prefixes.reserved_dir,
reserved_prefixes.non_obfuscated_dir]:
identifier_name = reserved_row.name[1:]
elif reserved_row.name[0] in [reserved_prefixes.reserved_file,
reserved_prefixes.non_obfuscated_file]:
identifier_name = reserved_row.name[1:-3]
else:
identifier_name = reserved_row.name
# Reassign identifier obfuscated name if it exists for another name
try:
identifier_row = get_identifier_by_obfuscated(identifier_name)
except DoesNotExist:
pass
else:
if identifier_row.name != identifier_name:
identifier_row.obfuscated_name = None
save_identifier(identifier_row)
# Unobfuscate name in identifiers
try:
identifier_row = get_identifier_by_name(identifier_name)
except DoesNotExist:
identifier_row = get_identifier(None)
save_identifier(
identifier_row,
name=identifier_name,
obfuscated_name=identifier_name)
else:
if identifier_row.obfuscated_name != identifier_name:
save_identifier(
identifier_row,
name=identifier_name,
obfuscated_name=identifier_name)
return reserved_id
def user_collection():
uuid = session.get('uuid')
collection_name = request.values.get('name')
collection_id = request.values.get('collection_id')
if request.method == 'GET':
collections = Collection.select()\
.where(Collection.uuid == uuid,
Collection.status == 1)\
.order_by(Collection.create_time)
for fi_collection in collections:
fi_collection.projects = CollectionProject.select() \
.join(Collection) \
.where(Collection.id ==
fi_collection.id,
CollectionProject.status == 1)
return render_template('profile/collection.html',
collections=collections,
page_title=u'????')
# ?????
elif request.method == 'POST':
if not collection_name:
raise InvalidParams()
try:
Collection.create(name=collection_name, uuid=uuid)
return jsonify(message=u'????? {} ??'
.format(collection_name))
except IntegrityError:
raise ParamsConflict()
# ?????
elif request.method == 'PATCH':
if not collection_name:
raise InvalidParams()
update_time = datetime.now()
Collection.update(name=collection_name, update_time=update_time)\
.where(Collection.id == collection_id).execute()
return jsonify(message=u'????? {} ??'.format(collection_name))
# ?????
elif request.method == 'DELETE':
if not collection_id:
raise InvalidParams()
collection_projects = CollectionProject.select()\
.join(Collection)\
.where(Collection.id == collection_id,
Collection.status == 1)
with database.transaction():
for fi_collection_project in collection_projects:
CollectionProject.del_collection_project(fi_collection_project.id)
Collection.del_collection(collection_id)
return jsonify(message=u'???????')
def manage_category():
"""
????
"""
if request.method == 'GET':
category_id = request.args.get('category_id')
if category_id:
category_object = Category.select()\
.where(Category.id == category_id)\
.get()
return jsonify(payload=model_to_dict(category_object))
else:
category_objects = Category.select().order_by(Category.id)
return render_template('manage/category.html',
categorys=category_objects,
page_title=u'????')
# ????
elif request.method == 'POST':
category_name = request.form.get('category_name', '')
if not category_name:
raise InvalidParams(message=u'??????????????')
try:
Category.create(name=category_name)
return jsonify(message=u'?????{}???'.format(category_name))
except IntegrityError:
raise ParamsConflict(message=u'???????{} ???'
.format(category_name))
# ????
elif request.method == 'PATCH':
category_name = request.form.get('category_name')
category_id = request.form.get('category_id')
if not category_name:
raise InvalidParams(message=u'???????name ????')
try:
Category.update(name=category_name,
update_time=datetime.now())\
.where(Category.id == category_id)\
.execute()
return jsonify(message=u'???? {} ????'.format(category_name))
except IntegrityError:
raise ParamsConflict(message=u'???????{} ???'
.format(category_name))
# ????
elif request.method == 'DELETE':
category_id = request.form.get('category_id')
category_name = request.form.get('category_name')
try:
content_query = Content.select()\
.join(Category)\
.where(Category.id == category_id)\
.get()
project_url = content_query.project_url
raise InvalidParams(message=u'???????{project_url}????'
u'???????????????'
.format(project_url=project_url))
except DoesNotExist:
Category.delete().where(Category.id == category_id).execute()
return jsonify(message=u'?????{}???'.format(category_name))
def manage_volume():
"""
????
"""
if request.method == 'GET':
volume_id = request.args.get('volume_id')
if volume_id:
volume_object = Volume.select()\
.where(Volume.id == volume_id)\
.get()
return jsonify(payload=model_to_dict(volume_object))
else:
volume_objects = Volume.select().order_by(Volume.name)
return render_template('manage/volume.html',
volumes=volume_objects,
page_title=u'Vol.??')
# ?? Vol.
elif request.method == 'POST':
volume_name = request.form.get('volume_name')
if volume_name:
try:
Volume.create(name=volume_name)
return jsonify(message=u'?????{}???'.format(volume_name))
except IntegrityError:
raise ParamsConflict(message=u'???????{} ???'
.format(volume_name))
else:
raise InvalidParams(message=u'???????name ????')
# ?? Vol.
elif request.method == 'PATCH':
volume_name = request.form.get('volume_name')
volume_id = request.form.get('volume_id')
if not volume_name:
raise InvalidParams(message=u'?? Vol. ???name ????')
else:
try:
Volume.update(name=volume_name, update_time=datetime.now())\
.where(Volume.id == volume_id)\
.execute()
return jsonify(message=u'?? Vol. {} ????'
.format(volume_name))
except IntegrityError:
raise ParamsConflict(message=u'?? Vol. ???{} ???'
.format(volume_name))
# ?? Vol.
elif request.method == 'DELETE':
volume_id = request.form.get('volume_id')
volume_name = request.form.get('volume_name')
try:
content_query = Content.select()\
.join(Volume)\
.where(Volume.id == volume_id)\
.get()
project_url = content_query.project_url
raise InvalidParams(message=u'?? Vol. ???{project_url}????'
u'??? Vol.??????????'
.format(project_url=project_url))
except DoesNotExist:
Volume.delete().where(Volume.id == volume_id).execute()
return jsonify(message=u'?? Vol.?{}???'
.format(volume_name))