From 34478c26d7de1328797e03bbf96d8261796fccbb Mon Sep 17 00:00:00 2001 From: Zhanfeng Huo Date: Fri, 23 Jul 2021 06:13:40 +0800 Subject: [PATCH] Fix bug that log can't be shown when task runs failed (#16768) The log can't be shown normally when the task runs failed. Users can only get useless logs as follows. #13692
*** Log file does not exist: /home/airflow/airflow/logs/dag_id/task_id/2021-06-28T00:00:00+08:00/28.log
*** Fetching from: http://:8793/log/dag_id/task_id/2021-06-28T00:00:00+08:00/28.log
*** Failed to fetch log file from worker. Unsupported URL protocol
The root cause is that scheduler will overwrite the hostname info into the task_instance table in DB by using blank str in the progress of `_execute_task_callbacks` when tasks into failed. Webserver can't get the right host of the task from task_instance because the hostname info of task_instance table is lost in the progress. Co-authored-by: huozhanfeng --- airflow/dag_processing/processor.py | 12 ++++-------- tests/dag_processing/test_processor.py | 23 +++++++++++++++++++++++ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 4a24c1adceec5..ee78498ce9d3c 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -567,15 +567,11 @@ def _execute_task_callbacks(self, dagbag: DagBag, request: TaskCallbackRequest): dag = dagbag.dags[simple_ti.dag_id] if simple_ti.task_id in dag.task_ids: task = dag.get_task(simple_ti.task_id) - ti = TI(task, simple_ti.execution_date) - # Get properties needed for failure handling from SimpleTaskInstance. - ti.start_date = simple_ti.start_date - ti.end_date = simple_ti.end_date - ti.try_number = simple_ti.try_number - ti.state = simple_ti.state - ti.test_mode = self.UNIT_TEST_MODE if request.is_failure_callback: - ti.handle_failure_with_callback(error=request.msg, test_mode=ti.test_mode) + ti = TI(task, simple_ti.execution_date) + # TODO: Use simple_ti to improve performance here in the future + ti.refresh_from_db() + ti.handle_failure_with_callback(error=request.msg, test_mode=self.UNIT_TEST_MODE) self.log.info('Executed failure callback for %s in state %s', ti, ti.state) @provide_session diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 9425bbbb1823b..90c2baea85327 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -640,6 +640,29 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure): test_mode=conf.getboolean('core', 'unit_test_mode'), ) + def test_failure_callbacks_should_not_drop_hostname(self): + dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + dag_file_processor.UNIT_TEST_MODE = False + + with create_session() as session: + dag = dagbag.get_dag('example_branch_operator') + task = dag.get_task(task_id='run_this_first') + + ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING) + ti.hostname = "test_hostname" + session.add(ti) + + with create_session() as session: + requests = [ + TaskCallbackRequest( + full_filepath="A", simple_task_instance=SimpleTaskInstance(ti), msg="Message" + ) + ] + dag_file_processor.execute_callbacks(dagbag, requests) + tis = session.query(TaskInstance) + assert tis[0].hostname == "test_hostname" + def test_process_file_should_failure_callback(self): dag_file = os.path.join( os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'