diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 08baa10b98678..6f04cbf0375c6 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -207,7 +207,7 @@ def read(self, task_instance, try_number=None, metadata=None): logs = [ [('default_host', f'Error fetching the logs. Try number {try_number} is invalid.')], ] - return logs + return logs, [{'end_of_log': True}] else: try_numbers = [try_number] diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 76115a2f3a279..fad5f8b58714a 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -63,6 +63,54 @@ def test_default_task_logging_setup(self): handler = handlers[0] assert handler.name == FILE_TASK_HANDLER + def test_file_task_handler_when_ti_value_is_invalid(self): + def task_callable(ti, **kwargs): + ti.log.info("test") + + dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE) + dag.create_dagrun(run_type=DagRunType.MANUAL, state=State.RUNNING, execution_date=DEFAULT_DATE) + task = PythonOperator( + task_id='task_for_testing_file_log_handler', + dag=dag, + python_callable=task_callable, + ) + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + + logger = ti.log + ti.log.disabled = False + + file_handler = next( + (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None + ) + assert file_handler is not None + + set_context(logger, ti) + assert file_handler.handler is not None + # We expect set_context generates a file locally. + log_filename = file_handler.handler.baseFilename + assert os.path.isfile(log_filename) + assert log_filename.endswith("1.log"), log_filename + + ti.run(ignore_ti_state=True) + + file_handler.flush() + file_handler.close() + + assert hasattr(file_handler, 'read') + # Return value of read must be a tuple of list and list. + # passing invalid `try_number` to read function + logs, metadatas = file_handler.read(ti, 0) + assert isinstance(logs, list) + assert isinstance(metadatas, list) + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert isinstance(metadatas[0], dict) + assert logs[0][0][0] == "default_host" + assert logs[0][0][1] == "Error fetching the logs. Try number 0 is invalid." + + # Remove the generated tmp log file. + os.remove(log_filename) + def test_file_task_handler(self): def task_callable(ti, **kwargs): ti.log.info("test")