def get_or_create(db, model, create_method: str='',
create_method_kwargs=None, **kwargs):
try:
return db.query(model).filter_by(**kwargs).one()
except NoResultFound:
pass
kwargs.update(create_method_kwargs or {})
created = getattr(model, create_method, model)(**kwargs)
try:
db.add(created)
db.flush()
return created
except IntegrityError:
pass
db.rollback()
return db.query(model).filter_by(**kwargs).one()
python类IntegrityError()的实例源码
def insert_element(self, element):
s = db.session
if isinstance(element, DBFeedItem):
try:
s.add(element)
s.commit()
except exc.IntegrityError, e:
# we expect integrity errors. The reason is that we may try to regularly insert the same element into the database
# those elements should not be added to the database and we just rollback the transaction and don't log anything
s.rollback()
except Exception, e:
self._logger.error(str(e))
s.rollback()
elif isinstance(element, Feeditem):
self.insert_element(DBFeedItem(element.content, element.type, element.source, element.time))
else:
self._logger.warning('An element could not be inserted into the database, because it is non of the accepted types. (' + type(element) + ')')
def generate_fake(count=100, **kwargs):
"""Generate a number of fake users for testing."""
from sqlalchemy.exc import IntegrityError
from random import seed, choice
from faker import Faker
fake = Faker()
roles = Role.query.all()
seed()
for i in range(count):
u = User(
first_name=fake.first_name(),
last_name=fake.last_name(),
email=fake.email(),
password=fake.password(),
confirmed=True,
role=choice(roles),
**kwargs)
db.session.add(u)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
def grant_access_to_existing_repos(user):
provider = GitHubRepositoryProvider(cache=True)
owner_list = [o['name'] for o in provider.get_owners(user)]
if owner_list:
matching_repos = Repository.query.unrestricted_unsafe().filter(
Repository.provider == RepositoryProvider.github,
Repository.owner_name.in_(owner_list),
~Repository.id.in_(db.session.query(
RepositoryAccess.repository_id,
).filter(
RepositoryAccess.user_id == user.id,
))
)
for repo in matching_repos:
if provider.has_access(auth.get_current_user(), repo):
try:
with db.session.begin_nested():
db.session.add(RepositoryAccess(
repository_id=repo.id,
user_id=user.id,
))
db.session.flush()
except IntegrityError:
pass
db.session.commit()
def post(self, build: Build):
"""
Create a new job.
"""
result = self.schema_from_request(job_schema, partial=True)
if result.errors:
return self.respond(result.errors, 403)
data = result.data
job = Job(build=build, repository_id=build.repository_id, **data)
if job.status != Status.queued and not job.date_started:
job.date_started = timezone.now()
db.session.add(job)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
return self.respond(status=422)
aggregate_build_stats_for_job.delay(job_id=job.id)
return self.respond_with_schema(job_schema, job)
def process(self, fp):
results = self.get_coverage(fp)
for result in results:
try:
with db.session.begin_nested():
db.session.add(result)
except IntegrityError:
lock_key = 'coverage:{build_id}:{file_hash}'.format(
build_id=result.build_id.hex,
file_hash=sha1(result.filename.encode(
'utf-8')).hexdigest(),
)
with redis.lock(lock_key):
result = self.merge_coverage(result)
db.session.add(result)
db.session.flush()
return results
def generate_fake(count=100) :
from sqlalchemy.exc import IntegrityError
from random import seed
import forgery_py
seed()
for i in range(count) :
u = User(email=forgery_py.internet.email_address() ,
username=forgery_py.internet.user_name() ,
password=forgery_py.lorem_ipsum.word(),
confirmed=True,
name=forgery_py.name.full_name(),
location=forgery_py.address.city(),
about_me=forgery_py.lorem_lpsum.sentence(),
member_since=forgery_py.date.date(True))
db.session.add(u)
try :
db.session.commit()
except IntegrityError :
db.session.rollback()
# ?????????
def generate_fake(count=100):
from sqlalchemy.exc import IntegrityError
from random import seed
import forgery_py
seed()
for i in range(count):
u = User(email=forgery_py.internet.email_address(),
username=forgery_py.internet.user_name(True),
password=forgery_py.lorem_ipsum.word(),
confirmed=True,
name=forgery_py.name.full_name(),
location=forgery_py.address.city(),
about_me=forgery_py.lorem_ipsum.sentence(),
member_since=forgery_py.date.date(True))
db.session.add(u)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
def add_account(account, type, email, session=None):
""" Add an account with the given account name and type.
:param account: the name of the new account.
:param type: the type of the new account.
:param email: The Email address associated with the account.
:param session: the database session in use.
"""
new_account = models.Account(account=account, account_type=type, email=email,
status=AccountStatus.ACTIVE)
try:
new_account.save(session=session)
except IntegrityError:
raise exception.Duplicate('Account ID \'%s\' already exists!' % account)
# Create the account counters for this account
rucio.core.account_counter.create_counters_for_new_account(account=account, session=session)
def add_scope(scope, account, session=None):
""" add a scope for the given account name.
:param scope: the name for the new scope.
:param account: the account to add the scope to.
:param session: The database session in use.
"""
result = session.query(models.Account).filter_by(account=account, status=AccountStatus.ACTIVE).first()
if result is None:
raise AccountNotFound('Account ID \'%s\' does not exist' % account)
new_scope = models.Scope(scope=scope, account=account, status=ScopeStatus.OPEN)
try:
new_scope.save(session=session)
except IntegrityError, e:
if match('.*IntegrityError.*ORA-00001: unique constraint.*SCOPES_PK.*violated.*', e.args[0]) \
or match('.*IntegrityError.*1062, "Duplicate entry.*for key.*', e.args[0]) \
or match('.*IntegrityError.*UNIQUE constraint failed: scopes.scope.*', e.args[0]) \
or match('.*IntegrityError.*duplicate key value violates unique constraint.*', e.args[0])\
or match('.*sqlite3.IntegrityError.*is not unique.*', e.args[0]):
raise Duplicate('Scope \'%s\' already exists!' % scope)
except:
raise RucioException(str(format_exc()))
def list_subscription_rule_states(name=None, account=None, session=None):
"""Returns a list of with the number of rules per state for a subscription.
:param name: Name of the subscription
:param account: Account identifier
:param session: The database session in use.
:returns: List with tuple (account, name, state, count)
"""
subscription = aliased(models.Subscription)
rule = aliased(models.ReplicationRule)
query = session.query(subscription.account, subscription.name, rule.state, func.count()).join(rule, subscription.id == rule.subscription_id)
try:
if name:
query = query.filter(subscription.name == name)
if account:
query = query.filter(subscription.account == account)
except IntegrityError as error:
print(error)
raise
query = query.group_by(subscription.account, subscription.name, rule.state)
for row in query:
yield row
def add_key(key, key_type, value_type=None, value_regexp=None, session=None):
"""
Adds a new allowed key.
:param key: the name for the new key.
:param key_type: the type of the key: all(container, dataset, file), collection(dataset or container), file, derived(compute from file for collection).
:param value_type: the type of the value, if defined.
:param value_regexp: the regular expression that values should match, if defined.
:param session: The database session in use.
"""
# Check if value_type is supported
if value_type and value_type not in [str(t) for t in AUTHORIZED_VALUE_TYPES]:
raise UnsupportedValueType('The type \'%(value_type)s\' is not supported for values!' % locals())
new_key = models.DIDKey(key=key, value_type=value_type and str(value_type), value_regexp=value_regexp, key_type=key_type)
try:
new_key.save(session=session)
except IntegrityError as error:
if error.args[0] == "(IntegrityError) column key is not unique":
raise Duplicate('key \'%(key)s\' already exists!' % locals())
raise
def add_rse_attribute(rse, key, value, session=None):
""" Adds a RSE attribute.
:param rse: the rse name.
:param key: the key name.
:param value: the value name.
:param issuer: The issuer account.
:param session: The database session in use.
:returns: True is successful
"""
rse_id = get_rse_id(rse, session=session)
try:
new_rse_attr = models.RSEAttrAssociation(rse_id=rse_id, key=key, value=value)
new_rse_attr = session.merge(new_rse_attr)
new_rse_attr.save(session=session)
except IntegrityError:
raise exception.Duplicate("RSE attribute '%(key)s-%(value)s\' for RSE '%(rse)s' already exists!" % locals())
return True
def delete_rse_transfer_limits(rse, activity=None, rse_id=None, session=None):
"""
Delete RSE transfer limits.
:param rse: The RSE name.
:param activity: The activity.
:param rse_id: The RSE id.
"""
try:
if not rse_id:
rse_id = get_rse_id(rse=rse, session=session)
query = session.query(models.RSETransferLimit).filter_by(rse_id=rse_id)
if activity:
query = query.filter_by(activity=activity)
rowcount = query.delete()
return rowcount
except IntegrityError, e:
raise exception.RucioException(e.args)
def add_naming_convention(scope, regexp, convention_type, session=None):
"""
add a naming convention for a given scope
:param scope: the name for the scope.
:param regexp: the regular expression to validate the name.
:param convention_type: the did_type on which the regexp should apply.
:param session: The database session in use.
"""
# validate the regular expression
try:
compile(regexp)
except error:
raise RucioException('Invalid regular expression %s!' % regexp)
new_convention = models.NamingConvention(scope=scope,
regexp=regexp,
convention_type=convention_type)
try:
new_convention.save(session=session)
except IntegrityError:
raise Duplicate('Naming convention already exists!')
except:
raise RucioException(str(format_exc()))
def touch_transfer(external_host, transfer_id, session=None):
"""
Update the timestamp of requests in a transfer. Fails silently if the transfer_id does not exist.
:param request_host: Name of the external host.
:param transfer_id: External transfer job id as a string.
:param session: Database session to use.
"""
record_counter('core.request.touch_transfer')
try:
# don't touch it if it's already touched in 30 seconds
session.query(models.Request).with_hint(models.Request, "INDEX(REQUESTS REQUESTS_EXTERNALID_UQ)", 'oracle')\
.filter_by(external_id=transfer_id)\
.filter(models.Request.state == RequestState.SUBMITTED)\
.filter(models.Request.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=30))\
.update({'updated_at': datetime.datetime.utcnow()}, synchronize_session=False)
except IntegrityError, e:
raise RucioException(e.args)
def __set_transfer_state(external_host, transfer_id, new_state, session=None):
"""
Update the state of a transfer. Fails silently if the transfer_id does not exist.
:param external_host: Selected external host as string in format protocol://fqdn:port
:param transfer_id: External transfer job id as a string.
:param new_state: New state as string.
:param session: Database session to use.
"""
record_counter('core.request.set_transfer_state')
try:
rowcount = session.query(models.Request).filter_by(external_id=transfer_id).update({'state': new_state, 'updated_at': datetime.datetime.utcnow()}, synchronize_session=False)
except IntegrityError, e:
raise RucioException(e.args)
if not rowcount:
raise UnsupportedOperation("Transfer %s on %s state %s cannot be updated." % (transfer_id, external_host, new_state))