def log(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dttm = pendulum.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
ti = session.query(models.TaskInstance).filter(
models.TaskInstance.dag_id == dag_id,
models.TaskInstance.task_id == task_id,
models.TaskInstance.execution_date == dttm).first()
if ti is None:
logs = ["*** Task instance did not exist in the DB\n"]
else:
logger = logging.getLogger('airflow.task')
task_log_reader = conf.get('core', 'task_log_reader')
handler = next((handler for handler in logger.handlers
if handler.name == task_log_reader), None)
try:
ti.task = dag.get_task(ti.task_id)
logs = handler.read(ti)
except AttributeError as e:
logs = ["Task log handler {} does not support read logs.\n{}\n" \
.format(task_log_reader, str(e))]
for i, log in enumerate(logs):
if PY2 and not isinstance(log, unicode):
logs[i] = log.decode('utf-8')
return self.render(
'airflow/ti_log.html',
logs=logs, dag=dag, title="Log by attempts", task_id=task_id,
execution_date=execution_date, form=form)
评论列表
文章目录