def test_execute_task_instances(self):
dag_id = 'SchedulerJobTest.test_execute_task_instances'
task_id_1 = 'dummy_task'
task_id_2 = 'dummy_task_nonexistent_queue'
# important that len(tasks) is less than concurrency
# because before scheduler._execute_task_instances would only
# check the num tasks once so if concurrency was 3,
# we could execute arbitrarily many tasks in the second run
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
task2 = DummyOperator(dag=dag, task_id=task_id_2)
dagbag = self._make_simple_dag_bag([dag])
scheduler = SchedulerJob(**self.default_scheduler_args)
session = settings.Session()
# create first dag run with 1 running and 1 queued
dr1 = scheduler.create_dag_run(dag)
ti1 = TI(task1, dr1.execution_date)
ti2 = TI(task2, dr1.execution_date)
ti1.refresh_from_db()
ti2.refresh_from_db()
ti1.state = State.RUNNING
ti2.state = State.RUNNING
session.merge(ti1)
session.merge(ti2)
session.commit()
self.assertEqual(State.RUNNING, dr1.state)
self.assertEqual(2, DAG.get_num_task_instances(dag_id, dag.task_ids,
states=[State.RUNNING], session=session))
# create second dag run
dr2 = scheduler.create_dag_run(dag)
ti3 = TI(task1, dr2.execution_date)
ti4 = TI(task2, dr2.execution_date)
ti3.refresh_from_db()
ti4.refresh_from_db()
# manually set to scheduled so we can pick them up
ti3.state = State.SCHEDULED
ti4.state = State.SCHEDULED
session.merge(ti3)
session.merge(ti4)
session.commit()
self.assertEqual(State.RUNNING, dr2.state)
res = scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
# check that concurrency is respected
ti1.refresh_from_db()
ti2.refresh_from_db()
ti3.refresh_from_db()
ti4.refresh_from_db()
self.assertEqual(3, DAG.get_num_task_instances(dag_id, dag.task_ids,
states=[State.RUNNING, State.QUEUED], session=session))
self.assertEqual(State.RUNNING, ti1.state)
self.assertEqual(State.RUNNING, ti2.state)
six.assertCountEqual(self, [State.QUEUED, State.SCHEDULED], [ti3.state, ti4.state])
self.assertEqual(1, res)
评论列表
文章目录