def nested_commit_on_success(func):
"""Like commit_on_success, but doesn't commit existing transactions.
This decorator is used to run a function within the scope of a
database transaction, committing the transaction on success and
rolling it back if an exception occurs.
Unlike the standard transaction.commit_on_success decorator, this
version first checks whether a transaction is already active. If so
then it doesn't perform any commits or rollbacks, leaving that up to
whoever is managing the active transaction.
"""
from django.db import transaction
commit_on_success = transaction.commit_on_success(func)
def _nested_commit_on_success(*args, **kwds):
if transaction.is_managed():
return func(*args, **kwds)
else:
return commit_on_success(*args, **kwds)
return transaction.wraps(func)(_nested_commit_on_success)
python类is_managed()的实例源码
def _delete_nid_resource(self, scannable_id, deleted_resource_id):
from chroma_core.lib.storage_plugin.api.resources import LNETInterface, NetworkInterface as SrcNetworkInterface
resource = StorageResourceRecord.objects.get(pk = deleted_resource_id).to_resource()
# Must be run in a transaction to avoid leaving invalid things in the DB on failure.
assert transaction.is_managed()
# Shame to do this twice, but it seems that the scannable resource might not always be a host
# according to this test_subscriber
# But we will presume only a host can have a NetworkInterface or an LNetInterface
if isinstance(resource, SrcNetworkInterface) or isinstance(resource, LNETInterface):
scannable_resource = ResourceQuery().get_resource(scannable_id)
host = ManagedHost.objects.get(pk = scannable_resource.host_id)
if isinstance(resource, SrcNetworkInterface):
log.error("Deleting NetworkInterface %s from %s" % (resource.name, host.fqdn))
NetworkInterface.objects.filter(host = host,
name = resource.name).delete()
elif isinstance(resource, LNETInterface):
log.error("Deleting Nid %s from %s" % (resource.name, host.fqdn))
network_interface = NetworkInterface.objects.get(host = host,
name = resource.name) # Presumes Nid name == Interface Name, that is asserted when it is added!yes
Nid.objects.filter(network_interface = network_interface).delete()
def _resource_persist_update_attributes(self, scannable_id, local_record_id, attrs):
# Must be run in a transaction to avoid leaving invalid things in the DB on failure.
assert transaction.is_managed()
session = self._sessions[scannable_id]
global_record_id = session.local_id_to_global_id[local_record_id]
record = StorageResourceRecord.objects.get(pk = global_record_id)
''' Sometimes we are given reference to a BaseStorageResource and so we need to store the id
not the type. This code does the translation '''
cleaned_id_attrs = {}
for key, val in attrs.items():
if isinstance(val, BaseStorageResource):
cleaned_id_attrs[key] = session.local_id_to_global_id[val._handle]
else:
cleaned_id_attrs[key] = val
record.update_attributes(cleaned_id_attrs)
def set_rollback():
if hasattr(transaction, 'set_rollback'):
if connection.settings_dict.get('ATOMIC_REQUESTS', False):
# If running in >=1.6 then mark a rollback as required,
# and allow it to be handled by Django.
if connection.in_atomic_block:
transaction.set_rollback(True)
elif transaction.is_managed():
# Otherwise handle it explicitly if in managed mode.
if transaction.is_dirty():
transaction.rollback()
transaction.leave_transaction_management()
else:
# transaction not managed
pass
def set_rollback():
if hasattr(transaction, 'set_rollback'):
if connection.settings_dict.get('ATOMIC_REQUESTS', False):
# If running in >=1.6 then mark a rollback as required,
# and allow it to be handled by Django.
if connection.in_atomic_block:
transaction.set_rollback(True)
elif transaction.is_managed():
# Otherwise handle it explicitly if in managed mode.
if transaction.is_dirty():
transaction.rollback()
transaction.leave_transaction_management()
else:
# transaction not managed
pass
enable_long_polling.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def database_changed(sender, **kwargs):
table_name = sender._meta.db_table
if table_name.startswith('chroma_core'): # We are only interested in our tables, not the django ones.
using = kwargs.pop('using', DEFAULT_DB_ALIAS)
if transaction.is_managed(using) is False: # Not a managed transaction so the change has occurred
log.debug('Propagating tablechange for %s' % table_name)
_propagate_table_change([table_name])
else: # This is a transaction and until it commits it has not happened
with operation_lock:
if using not in _pending_table_changes:
log.debug('New transaction change %s using %s' % (table_name, using))
original_commit_fn = transaction.connections[using].commit
original_rollback_fn = transaction.connections[using].rollback
transaction.connections[using].commit = lambda: _transaction_commit_rollback(using,
True,
original_commit_fn,
original_rollback_fn)
transaction.connections[using].rollback = lambda: _transaction_commit_rollback(using,
False,
original_commit_fn,
original_rollback_fn)
log.debug('Adding pending change %s using %s' % (table_name, using))
_pending_table_changes[using].add(table_name)
def _persist_created_hosts(self, session, scannable_id, new_resources):
# Must be run in a transaction to avoid leaving invalid things in the DB on failure.
assert transaction.is_managed()
log.debug("_persist_created_hosts")
record_pks = []
from chroma_core.lib.storage_plugin.api.resources import VirtualMachine
for resource in new_resources:
if isinstance(resource, VirtualMachine):
assert(not resource._handle_global)
record_pks.append(session.local_id_to_global_id[resource._handle])
for vm_record_pk in record_pks:
record = StorageResourceRecord.objects.get(pk = vm_record_pk)
resource = record.to_resource()
if not resource.host_id:
try:
host = ManagedHost.objects.get(address = resource.address)
log.info("Associated existing host with VirtualMachine resource: %s" % resource.address)
record.update_attribute('host_id', host.pk)
except ManagedHost.DoesNotExist:
log.info("Creating host for new VirtualMachine resource: %s" % resource.address)
host, command = JobSchedulerClient.create_host_ssh(resource.address)
record.update_attribute('host_id', host.pk)
def _resource_modify_parent(self, record_pk, parent_pk, remove):
# Must be run in a transaction to avoid leaving invalid things in the DB on failure.
assert transaction.is_managed()
record = StorageResourceRecord.objects.get(pk = record_pk)
if remove:
record.parents.remove(parent_pk)
else:
record.parents.add(parent_pk)
def session_add_resources(self, scannable_id, resources):
"""NB this is plural because new resources may be interdependent
and if so they must be added in a blob so that we can hook up the
parent relationships"""
# Must be run in a transaction to avoid leaving invalid things in the DB on failure.
assert transaction.is_managed()
with self._instance_lock:
session = self._sessions[scannable_id]
self._persist_new_resources(session, resources)
self._persist_lun_updates(scannable_id)
self._persist_nid_updates(scannable_id, None, None)
self._persist_created_hosts(session, scannable_id, resources)
def session_remove_local_resources(self, scannable_id, resources):
# Must be run in a transaction to avoid leaving invalid things in the DB on failure.
assert transaction.is_managed()
with self._instance_lock:
session = self._sessions[scannable_id]
for local_resource in resources:
try:
resource_global_id = session.local_id_to_global_id[local_resource._handle]
self._delete_nid_resource(scannable_id, resource_global_id)
self._delete_resource(StorageResourceRecord.objects.get(pk = resource_global_id))
except KeyError:
pass
self._persist_lun_updates(scannable_id)
def session_remove_global_resources(self, scannable_id, resources):
# Must be run in a transaction to avoid leaving invalid things in the DB on failure.
assert transaction.is_managed()
with self._instance_lock:
session = self._sessions[scannable_id]
resources = session._plugin_instance._index._local_id_to_resource.values()
self._cull_lost_resources(session, resources)
self._persist_lun_updates(scannable_id)
def add_jobs(self, jobs, command):
"""Add a job, and any others which are required in order to reach its prerequisite state"""
# Important: the Job must not be committed until all
# its dependencies and locks are in.
assert transaction.is_managed()
for job in jobs:
for dependency in self._dep_cache.get(job).all():
if not dependency.satisfied():
log.info("add_jobs: setting required dependency %s %s" % (dependency.stateful_object, dependency.preferred_state))
self._set_state(dependency.get_stateful_object(), dependency.preferred_state, command)
log.info("add_jobs: done checking dependencies")
locks = self._create_locks(job)
job.locks_json = json.dumps([l.to_dict() for l in locks])
self._create_dependencies(job, locks)
with transaction.commit_on_success():
job.save()
log.info("add_jobs: created Job %s (%s)" % (job.pk, job.description()))
for l in locks:
self._lock_cache.add(l)
command.jobs.add(job)
self._job_collection.add_command(command, jobs)
def process_response(selfself, request, response):
if transaction.is_managed():
if transaction.is_dirty():
successful = not isinstance(response, http.HttpApplicationError)
if successful:
transaction.commit()
else:
transaction.rollback()
transaction.leave_transaction_management()
return response
def set_rollback():
if hasattr(transaction, 'set_rollback'):
if connection.settings_dict.get('ATOMIC_REQUESTS', False):
# If running in >=1.6 then mark a rollback as required,
# and allow it to be handled by Django.
if connection.in_atomic_block:
transaction.set_rollback(True)
elif transaction.is_managed():
# Otherwise handle it explicitly if in managed mode.
if transaction.is_dirty():
transaction.rollback()
transaction.leave_transaction_management()
else:
# transaction not managed
pass
def set_rollback():
if hasattr(transaction, 'set_rollback'):
if connection.settings_dict.get('ATOMIC_REQUESTS', False):
# If running in >=1.6 then mark a rollback as required,
# and allow it to be handled by Django.
if connection.in_atomic_block:
transaction.set_rollback(True)
elif transaction.is_managed():
# Otherwise handle it explicitly if in managed mode.
if transaction.is_dirty():
transaction.rollback()
transaction.leave_transaction_management()
else:
# transaction not managed
pass
def _make_deletable(metaclass, dct):
def mark_deleted(self):
# If this is not within a managed transaction we must use commit_on_success to ensure that the object is
# only marked deleted if the updates to alerts also succeed
if transaction.is_managed():
self._mark_deleted()
else:
with transaction.commit_on_success():
self._mark_deleted()
def _mark_deleted(self):
"""Mark a record as deleted, returns nothing.
Looks up the model instance by pk, sets the not_deleted attribute
to None and saves the model instance.
Additionally marks any AlertStates referring to this item as inactive.
This is provided as a class method which takes an ID rather than as an
instance method, in order to use ._base_manager rather than .objects -- this
allows us to find the object even if it was already deleted, making this
operation idempotent rather than throwing a DoesNotExist on the second try.
"""
# Not implemented as an instance method because
# we will need to use _base_manager to ensure
# we can get at the object
from django.db.models import signals
signals.pre_delete.send(sender = self.__class__, instance = self)
if self.not_deleted:
self.not_deleted = None
self.save()
signals.post_delete.send(sender = self.__class__, instance = self)
from chroma_core.lib.job import job_log
from chroma_core.models.alert import AlertState
updated = AlertState.filter_by_item_id(self.__class__, self.id).update(active = None)
job_log.info("Lowered %d alerts while deleting %s %s" % (updated, self.__class__, self.id))
def delete(self):
raise NotImplementedError("Must use .mark_deleted on Deletable objects")
dct['objects'] = DeletableManager()
dct['delete'] = delete
dct['mark_deleted'] = mark_deleted
dct['_mark_deleted'] = _mark_deleted
# Conditional to only create the 'deleted' attribute on the immediate
# user of the metaclass, not again on subclasses.
if issubclass(dct.get('__metaclass__', type), metaclass):
# Please forgive me. Logically this would be a field called 'deleted' which would
# be True or False. Instead, it is a field called 'not_deleted' which can be
# True or None. The reason is: unique_together constraints.
dct['not_deleted'] = models.NullBooleanField(default = True)
if 'Meta' in dct:
if hasattr(dct['Meta'], 'unique_together'):
if not 'not_deleted' in dct['Meta'].unique_together:
dct['Meta'].unique_together = dct['Meta'].unique_together + ('not_deleted',)
def _cull_lost_resources(self, session, reported_resources):
# Must be run in a transaction to avoid leaving invalid things in the DB on failure.
assert transaction.is_managed()
reported_scoped_resources = []
reported_global_resources = []
for r in reported_resources:
if isinstance(r._meta.identifier, BaseScopedId):
reported_scoped_resources.append(session.local_id_to_global_id[r._handle])
else:
reported_global_resources.append(session.local_id_to_global_id[r._handle])
# This generator re-runs the query on every loop iteration in order
# to handle situations where resources returned by the query are
# deleted as dependents of prior resources (HYD-3659).
def iterate_lost_resources(query):
loops_remaining = len(query())
while loops_remaining:
loops_remaining -= 1
rs = query()
if len(rs):
yield rs[0]
else:
raise StopIteration()
# If the list of lost items grew, don't continue looping.
# Just bail out and the next scan will get them.
if loops_remaining <= 0:
raise StopIteration()
# Look for scoped resources which were at some point reported by
# this scannable_id, but are missing this time around.
lost_scoped_resources = lambda: StorageResourceRecord.objects.filter(
~Q(pk__in = reported_scoped_resources),
storage_id_scope = session.scannable_id)
for r in iterate_lost_resources(lost_scoped_resources):
self._delete_resource(r)
# Look for globalid resources which were at some point reported by
# this scannable_id, but are missing this time around.
lost_global_resources = lambda: StorageResourceRecord.objects.filter(
~Q(pk__in = reported_global_resources),
reported_by = session.scannable_id)
for reportee in iterate_lost_resources(lost_global_resources):
reportee.reported_by.remove(session.scannable_id)
if not reportee.reported_by.count():
self._delete_resource(reportee)