def complete_job(self, job_id, errored = False, cancelled = False):
# TODO: document the rules here: jobs may only modify objects that they
# have taken out a writelock on, and they may only modify instances obtained
# via ObjectCache, or via their stateful_object attribute. Jobs may not
# modify objects via .update() calls, all changes must be done on loaded instances.
# They do not have to .save() their stateful_object, but they do have to .save()
# any other objects that they modify (having obtained their from ObjectCache and
# held a writelock on them)
job = self._job_collection.get(job_id)
with self._lock:
with transaction.commit_on_success():
if not errored and not cancelled:
try:
job.on_success()
except Exception:
log.error("Error in Job %s on_success:%s" % (job.id, traceback.format_exc()))
errored = True
log.info("Job %d: complete_job: Updating cache" % job.pk)
# Freshen cached information about anything that this job held a writelock on
for lock in self._lock_cache.get_by_job(job):
if lock.write:
if hasattr(lock.locked_item, 'not_deleted'):
log.info("Job %d: locked_item %s %s %s %s" % (
job.id,
id(lock.locked_item),
lock.locked_item.__class__,
isinstance(lock.locked_item, DeletableStatefulObject),
lock.locked_item.not_deleted
))
if hasattr(lock.locked_item, 'not_deleted') and lock.locked_item.not_deleted is None:
log.debug("Job %d: purging %s/%s" %
(job.id, lock.locked_item.__class__, lock.locked_item.id))
ObjectCache.purge(lock.locked_item.__class__, lambda o: o.id == lock.locked_item.id)
else:
log.debug("Job %d: updating write-locked %s/%s" %
(job.id, lock.locked_item.__class__, lock.locked_item.id))
# Ensure that any notifications prior to release of the writelock are not
# applied
if hasattr(lock.locked_item, 'state_modified_at'):
lock.locked_item.__class__.objects.filter(pk=lock.locked_item.pk).update(
state_modified_at=django.utils.timezone.now())
ObjectCache.update(lock.locked_item)
if job.state != 'tasked':
# This happens if a Job is cancelled while it's calling this
log.info("Job %s has state %s in complete_job" % (job.id, job.state))
return
self._complete_job(job, errored, cancelled)
with transaction.commit_on_success():
self._drain_notification_buffer()
self._run_next()
job_scheduler.py 文件源码
python
阅读 27
收藏 0
点赞 0
评论 0
评论列表
文章目录