diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 078565dc38d1dd..93e7c81146bb28 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -292,7 +292,6 @@ def _capture_task_logs(ti: TaskInstance) -> Generator[None, None, None]: """ modify = not settings.DONOT_MODIFY_HANDLERS - if modify: root_logger, task_logger = logging.getLogger(), logging.getLogger("airflow.task") diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index c565f601d720ad..a062f31d4f65ed 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -94,6 +94,7 @@ def setup_class(cls): def teardown_class(cls) -> None: clear_db_runs() + @pytest.mark.execution_timeout(120) def test_cli_list_tasks(self): for dag_id in self.dagbag.dags: args = self.parser.parse_args(["tasks", "list", dag_id]) @@ -498,8 +499,8 @@ def test_parentdag_downstream_clear(self): task_command.task_clear(args) -class TestLogsfromTaskRunCommand(unittest.TestCase): - def setUp(self) -> None: +class TestLogsfromTaskRunCommand: + def setup_method(self) -> None: self.dag_id = "test_logging_dag" self.task_id = "test_task" self.run_id = "test_run" @@ -531,7 +532,7 @@ def setUp(self) -> None: except OSError: pass - def tearDown(self) -> None: + def teardown_method(self) -> None: root = self.root_logger root.setLevel(self.root_level) root.handlers[:] = self.root_handlers @@ -598,9 +599,6 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job) @unittest.skipIf(not hasattr(os, "fork"), "Forking not available") def test_logging_with_run_task(self): - # We are not using self.assertLogs as we want to verify what actually is stored in the Log file - # as that is what gets displayed - with conf_vars({("core", "dags_folder"): self.dag_path}): task_command.task_run(self.parser.parse_args(self.task_args)) @@ -644,12 +642,8 @@ def test_run_task_with_pool(self): session.delete(pool) session.commit() - # For this test memory spins out of control on Python 3.6. TODO(potiuk): FIXME") - @pytest.mark.quarantined @mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False) def test_logging_with_run_task_subprocess(self): - # We are not using self.assertLogs as we want to verify what actually is stored in the Log file - # as that is what gets displayed with conf_vars({("core", "dags_folder"): self.dag_path}): task_command.task_run(self.parser.parse_args(self.task_args)) @@ -665,10 +659,7 @@ def test_logging_with_run_task_subprocess(self): self.assert_log_line("Log from TI Logger", logs_list) self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True) - assert ( - f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', " - f"'{self.task_id}', '{self.execution_date_str}'," in logs - ) + assert f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', '{self.task_id}'," in logs assert ( f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, " f"task_id={self.task_id}, execution_date=20170101T000000" in logs @@ -696,7 +687,7 @@ def test_log_file_template_with_run_task(self): pass @mock.patch.object(task_command, "_run_task_by_selected_method") - def test_root_logger_restored(self, run_task_mock): + def test_root_logger_restored(self, run_task_mock, caplog): """Verify that the root logging context is restored""" logger = logging.getLogger("foo.bar") @@ -711,19 +702,18 @@ def task_inner(*args, **kwargs): ("logging", "logging_level"): "INFO", } - with conf_vars(config): - with self.assertLogs(level=logging.WARNING) as captured: + with caplog.at_level(level=logging.WARNING): + with conf_vars(config): logger.warning("not redirected") task_command.task_run(self.parser.parse_args(self.task_args)) - - assert captured.output == ["WARNING:foo.bar:not redirected"] + assert "not redirected" in caplog.text assert self.root_logger.level == logging.WARNING assert self.root_logger.handlers == self.root_handlers - @pytest.mark.quarantined @mock.patch.object(task_command, "_run_task_by_selected_method") - def test_disable_handler_modifying(self, run_task_mock): + @pytest.mark.parametrize("do_not_modify_handler", [True, False]) + def test_disable_handler_modifying(self, run_task_mock, caplog, do_not_modify_handler): """If [core] donot_modify_handlers is set to True, the root logger is untouched""" from airflow import settings @@ -738,16 +728,18 @@ def task_inner(*args, **kwargs): ("core", "dags_folder"): self.dag_path, ("logging", "logging_level"): "INFO", } - old_value = settings.DONOT_MODIFY_HANDLERS - settings.DONOT_MODIFY_HANDLERS = True - - with conf_vars(config): - with self.assertLogs(level=logging.WARNING) as captured: - task_command.task_run(self.parser.parse_args(self.task_args)) - - assert captured.output == ["WARNING:foo.bar:not redirected"] - - settings.DONOT_MODIFY_HANDLERS = old_value + with caplog.at_level(logging.WARNING, logger="foo.bar"): + with conf_vars(config): + old_value = settings.DONOT_MODIFY_HANDLERS + settings.DONOT_MODIFY_HANDLERS = do_not_modify_handler + try: + task_command.task_run(self.parser.parse_args(self.task_args)) + if do_not_modify_handler: + assert "not redirected" in caplog.text + else: + assert "not redirected" not in caplog.text + finally: + settings.DONOT_MODIFY_HANDLERS = old_value def test_context_with_run():