Skip to content

Commit

Permalink
Fix bug that log can't be shown when task runs failed (#16768)
Browse files Browse the repository at this point in the history
The log can't be shown normally when the task runs failed. Users can only get useless logs as follows. #13692

<pre>
*** Log file does not exist: /home/airflow/airflow/logs/dag_id/task_id/2021-06-28T00:00:00+08:00/28.log
*** Fetching from: http://:8793/log/dag_id/task_id/2021-06-28T00:00:00+08:00/28.log
*** Failed to fetch log file from worker. Unsupported URL protocol 
</pre>

The root cause is that scheduler will overwrite the hostname info into the task_instance table in DB by using blank str in the progress of `_execute_task_callbacks` when tasks into failed.  Webserver can't get the right host of the task from task_instance because the hostname info of  task_instance table is lost in the progress.

Co-authored-by: huozhanfeng <[email protected]>
  • Loading branch information
huozhanfeng and huozhanfeng authored Jul 22, 2021
1 parent d72b363 commit 34478c2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
12 changes: 4 additions & 8 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,15 +567,11 @@ def _execute_task_callbacks(self, dagbag: DagBag, request: TaskCallbackRequest):
dag = dagbag.dags[simple_ti.dag_id]
if simple_ti.task_id in dag.task_ids:
task = dag.get_task(simple_ti.task_id)
ti = TI(task, simple_ti.execution_date)
# Get properties needed for failure handling from SimpleTaskInstance.
ti.start_date = simple_ti.start_date
ti.end_date = simple_ti.end_date
ti.try_number = simple_ti.try_number
ti.state = simple_ti.state
ti.test_mode = self.UNIT_TEST_MODE
if request.is_failure_callback:
ti.handle_failure_with_callback(error=request.msg, test_mode=ti.test_mode)
ti = TI(task, simple_ti.execution_date)
# TODO: Use simple_ti to improve performance here in the future
ti.refresh_from_db()
ti.handle_failure_with_callback(error=request.msg, test_mode=self.UNIT_TEST_MODE)
self.log.info('Executed failure callback for %s in state %s', ti, ti.state)

@provide_session
Expand Down
23 changes: 23 additions & 0 deletions tests/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,29 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
test_mode=conf.getboolean('core', 'unit_test_mode'),
)

def test_failure_callbacks_should_not_drop_hostname(self):
dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag_file_processor.UNIT_TEST_MODE = False

with create_session() as session:
dag = dagbag.get_dag('example_branch_operator')
task = dag.get_task(task_id='run_this_first')

ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
ti.hostname = "test_hostname"
session.add(ti)

with create_session() as session:
requests = [
TaskCallbackRequest(
full_filepath="A", simple_task_instance=SimpleTaskInstance(ti), msg="Message"
)
]
dag_file_processor.execute_callbacks(dagbag, requests)
tis = session.query(TaskInstance)
assert tis[0].hostname == "test_hostname"

def test_process_file_should_failure_callback(self):
dag_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'
Expand Down

0 comments on commit 34478c2

Please sign in to comment.