def execute_dag(self, dag, workflow_id, data=None):
""" Celery task that runs a single dag on a worker.
This celery task starts, manages and monitors the individual tasks of a dag.
Args:
self (Task): Reference to itself, the celery task object.
dag (Dag): Reference to a Dag object that is being used to start, manage and
monitor tasks.
workflow_id (string): The unique ID of the workflow run that started this dag.
data (MultiTaskData): An optional MultiTaskData object that is being passed to
the first tasks in the dag. This allows the transfer of
data from dag to dag.
"""
start_time = datetime.now()
logger.info('Running DAG <{}>'.format(dag.name))
# send custom celery event that the dag has been started
self.send_event(JobEventName.Started,
job_type=JobType.Dag,
name=dag.name,
time=datetime.utcnow(),
workflow_id=workflow_id,
duration=None)
# store job specific meta information wth the job
self.update_state(meta={'name': dag.name,
'type': JobType.Dag,
'workflow_id': workflow_id})
# run the tasks in the DAG
signal = DagSignal(Client(SignalConnection(**self.app.user_options['config'].signal,
auto_connect=True),
request_key=workflow_id), dag.name)
dag.run(config=self.app.user_options['config'],
workflow_id=workflow_id,
signal=signal,
data=data)
# send custom celery event that the dag has succeeded
event_name = JobEventName.Succeeded if not signal.is_stopped else JobEventName.Aborted
self.send_event(event_name,
job_type=JobType.Dag,
name=dag.name,
time=datetime.utcnow(),
workflow_id=workflow_id,
duration=(datetime.now() - start_time).total_seconds())
logger.info('Finished DAG <{}>'.format(dag.name))
评论列表
文章目录