job_scheduler.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码
def complete_job(self, job_id, errored = False, cancelled = False):
        # TODO: document the rules here: jobs may only modify objects that they
        # have taken out a writelock on, and they may only modify instances obtained
        # via ObjectCache, or via their stateful_object attribute.  Jobs may not
        # modify objects via .update() calls, all changes must be done on loaded instances.
        # They do not have to .save() their stateful_object, but they do have to .save()
        # any other objects that they modify (having obtained their from ObjectCache and
        # held a writelock on them)

        job = self._job_collection.get(job_id)
        with self._lock:
            with transaction.commit_on_success():
                if not errored and not cancelled:
                    try:
                        job.on_success()
                    except Exception:
                        log.error("Error in Job %s on_success:%s" % (job.id, traceback.format_exc()))
                        errored = True

                log.info("Job %d: complete_job: Updating cache" % job.pk)
                # Freshen cached information about anything that this job held a writelock on
                for lock in self._lock_cache.get_by_job(job):
                    if lock.write:
                        if hasattr(lock.locked_item, 'not_deleted'):
                            log.info("Job %d: locked_item %s %s %s %s" % (
                                job.id,
                                id(lock.locked_item),
                                lock.locked_item.__class__,
                                isinstance(lock.locked_item, DeletableStatefulObject),
                                lock.locked_item.not_deleted
                            ))
                        if hasattr(lock.locked_item, 'not_deleted') and lock.locked_item.not_deleted is None:
                            log.debug("Job %d: purging %s/%s" %
                                      (job.id, lock.locked_item.__class__, lock.locked_item.id))
                            ObjectCache.purge(lock.locked_item.__class__, lambda o: o.id == lock.locked_item.id)
                        else:
                            log.debug("Job %d: updating write-locked %s/%s" %
                                      (job.id, lock.locked_item.__class__, lock.locked_item.id))

                            # Ensure that any notifications prior to release of the writelock are not
                            # applied
                            if hasattr(lock.locked_item, 'state_modified_at'):
                                lock.locked_item.__class__.objects.filter(pk=lock.locked_item.pk).update(
                                    state_modified_at=django.utils.timezone.now())

                            ObjectCache.update(lock.locked_item)

                if job.state != 'tasked':
                    # This happens if a Job is cancelled while it's calling this
                    log.info("Job %s has state %s in complete_job" % (job.id, job.state))
                    return

                self._complete_job(job, errored, cancelled)

            with transaction.commit_on_success():
                self._drain_notification_buffer()
                self._run_next()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号