def _check_size(self):
"""Apply a size limit to the table of log messages"""
MAX_ROWS_PER_TRANSACTION = 10000
removed_num_entries = 0
overflow_filename = os.path.join(settings.LOG_PATH, "db_log")
if self._table_size > settings.DBLOG_HW:
remove_num_entries = self._table_size - settings.DBLOG_LW
trans_size = min(MAX_ROWS_PER_TRANSACTION, remove_num_entries)
with transaction.commit_on_success():
while remove_num_entries > 0:
removed_entries = LogMessage.objects.all().order_by('id')[0:trans_size]
self.log.debug("writing %s batch of entries" % trans_size)
try:
f = open(overflow_filename, "a")
for line in removed_entries:
f.write("%s\n" % line.__str__())
LogMessage.objects.filter(id__lte = removed_entries[-1].id).delete()
except Exception, e:
self.log.error("error opening/writing/closing the db_log: %s" % e)
finally:
f.close()
remove_num_entries -= trans_size
removed_num_entries += trans_size
if remove_num_entries < trans_size:
trans_size = remove_num_entries
self._table_size -= removed_num_entries
self.log.info("Wrote %s DB log entries to %s" % (removed_num_entries, overflow_filename))
return removed_num_entries
python类commit_on_success()的实例源码
def remove_host(self, fqdn):
log.info("remove_host: %s" % fqdn)
self.sessions.remove_host(fqdn)
self.queues.remove_host(fqdn)
self.hosts.remove_host(fqdn)
with transaction.commit_on_success():
for cert in ClientCertificate.objects.filter(host__fqdn = fqdn, revoked = False):
log.info("Revoking %s:%s" % (fqdn, cert.serial))
self.valid_certs.pop(cert.serial, None)
ClientCertificate.objects.filter(host__fqdn = fqdn, revoked = False).update(revoked = True)
# TODO: ensure there are no GETs left in progress after this completes
# TODO: drain plugin_rx_queue so that anything we will send to AMQP has been sent before this returns
def change(self, new_email, confirm=True):
"""
Given a new email address, change self and re-confirm.
"""
with transaction.commit_on_success():
self.user.email = new_email
self.user.save()
self.email = new_email
self.verified = False
self.save()
if confirm:
self.send_confirmation()
def complete_job(self, job_id, errored = False, cancelled = False):
# TODO: document the rules here: jobs may only modify objects that they
# have taken out a writelock on, and they may only modify instances obtained
# via ObjectCache, or via their stateful_object attribute. Jobs may not
# modify objects via .update() calls, all changes must be done on loaded instances.
# They do not have to .save() their stateful_object, but they do have to .save()
# any other objects that they modify (having obtained their from ObjectCache and
# held a writelock on them)
job = self._job_collection.get(job_id)
with self._lock:
with transaction.commit_on_success():
if not errored and not cancelled:
try:
job.on_success()
except Exception:
log.error("Error in Job %s on_success:%s" % (job.id, traceback.format_exc()))
errored = True
log.info("Job %d: complete_job: Updating cache" % job.pk)
# Freshen cached information about anything that this job held a writelock on
for lock in self._lock_cache.get_by_job(job):
if lock.write:
if hasattr(lock.locked_item, 'not_deleted'):
log.info("Job %d: locked_item %s %s %s %s" % (
job.id,
id(lock.locked_item),
lock.locked_item.__class__,
isinstance(lock.locked_item, DeletableStatefulObject),
lock.locked_item.not_deleted
))
if hasattr(lock.locked_item, 'not_deleted') and lock.locked_item.not_deleted is None:
log.debug("Job %d: purging %s/%s" %
(job.id, lock.locked_item.__class__, lock.locked_item.id))
ObjectCache.purge(lock.locked_item.__class__, lambda o: o.id == lock.locked_item.id)
else:
log.debug("Job %d: updating write-locked %s/%s" %
(job.id, lock.locked_item.__class__, lock.locked_item.id))
# Ensure that any notifications prior to release of the writelock are not
# applied
if hasattr(lock.locked_item, 'state_modified_at'):
lock.locked_item.__class__.objects.filter(pk=lock.locked_item.pk).update(
state_modified_at=django.utils.timezone.now())
ObjectCache.update(lock.locked_item)
if job.state != 'tasked':
# This happens if a Job is cancelled while it's calling this
log.info("Job %s has state %s in complete_job" % (job.id, job.state))
return
self._complete_job(job, errored, cancelled)
with transaction.commit_on_success():
self._drain_notification_buffer()
self._run_next()