jobs.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:lightflow 作者: AustralianSynchrotron 项目源码 文件源码
def execute_dag(self, dag, workflow_id, data=None):
    """ Celery task that runs a single dag on a worker.

    This celery task starts, manages and monitors the individual tasks of a dag.

    Args:
        self (Task): Reference to itself, the celery task object.
        dag (Dag): Reference to a Dag object that is being used to start, manage and
                   monitor tasks.
        workflow_id (string): The unique ID of the workflow run that started this dag.
        data (MultiTaskData): An optional MultiTaskData object that is being passed to
                              the first tasks in the dag. This allows the transfer of
                              data from dag to dag.
    """
    start_time = datetime.now()
    logger.info('Running DAG <{}>'.format(dag.name))

    # send custom celery event that the dag has been started
    self.send_event(JobEventName.Started,
                    job_type=JobType.Dag,
                    name=dag.name,
                    time=datetime.utcnow(),
                    workflow_id=workflow_id,
                    duration=None)

    # store job specific meta information wth the job
    self.update_state(meta={'name': dag.name,
                            'type': JobType.Dag,
                            'workflow_id': workflow_id})

    # run the tasks in the DAG
    signal = DagSignal(Client(SignalConnection(**self.app.user_options['config'].signal,
                                               auto_connect=True),
                              request_key=workflow_id), dag.name)
    dag.run(config=self.app.user_options['config'],
            workflow_id=workflow_id,
            signal=signal,
            data=data)

    # send custom celery event that the dag has succeeded
    event_name = JobEventName.Succeeded if not signal.is_stopped else JobEventName.Aborted
    self.send_event(event_name,
                    job_type=JobType.Dag,
                    name=dag.name,
                    time=datetime.utcnow(),
                    workflow_id=workflow_id,
                    duration=(datetime.now() - start_time).total_seconds())

    logger.info('Finished DAG <{}>'.format(dag.name))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号