def get_task_instances(
self, session, start_date=None, end_date=None, state=None):
TI = TaskInstance
if not start_date:
start_date = (timezone.utcnow() - timedelta(30)).date()
start_date = datetime.combine(start_date, datetime.min.time())
end_date = end_date or timezone.utcnow()
tis = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.execution_date >= start_date,
TI.execution_date <= end_date,
TI.task_id.in_([t.task_id for t in self.tasks]),
)
if state:
tis = tis.filter(TI.state == state)
tis = tis.order_by(TI.execution_date).all()
return tis
评论列表
文章目录