Skip to content

Commit

Permalink
Convert test_task_command to Pytest and unquarantine tests in it
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk committed Dec 9, 2022
1 parent 5fad8a4 commit 7cab43f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 32 deletions.
1 change: 0 additions & 1 deletion airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ def _capture_task_logs(ti: TaskInstance) -> Generator[None, None, None]:
"""
modify = not settings.DONOT_MODIFY_HANDLERS

if modify:
root_logger, task_logger = logging.getLogger(), logging.getLogger("airflow.task")

Expand Down
54 changes: 23 additions & 31 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def setup_class(cls):
def teardown_class(cls) -> None:
clear_db_runs()

@pytest.mark.execution_timeout(120)
def test_cli_list_tasks(self):
for dag_id in self.dagbag.dags:
args = self.parser.parse_args(["tasks", "list", dag_id])
Expand Down Expand Up @@ -498,8 +499,8 @@ def test_parentdag_downstream_clear(self):
task_command.task_clear(args)


class TestLogsfromTaskRunCommand(unittest.TestCase):
def setUp(self) -> None:
class TestLogsfromTaskRunCommand:
def setup_method(self) -> None:
self.dag_id = "test_logging_dag"
self.task_id = "test_task"
self.run_id = "test_run"
Expand Down Expand Up @@ -531,7 +532,7 @@ def setUp(self) -> None:
except OSError:
pass

def tearDown(self) -> None:
def teardown_method(self) -> None:
root = self.root_logger
root.setLevel(self.root_level)
root.handlers[:] = self.root_handlers
Expand Down Expand Up @@ -598,9 +599,6 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job)

@unittest.skipIf(not hasattr(os, "fork"), "Forking not available")
def test_logging_with_run_task(self):
# We are not using self.assertLogs as we want to verify what actually is stored in the Log file
# as that is what gets displayed

with conf_vars({("core", "dags_folder"): self.dag_path}):
task_command.task_run(self.parser.parse_args(self.task_args))

Expand Down Expand Up @@ -644,12 +642,8 @@ def test_run_task_with_pool(self):
session.delete(pool)
session.commit()

# For this test memory spins out of control on Python 3.6. TODO(potiuk): FIXME")
@pytest.mark.quarantined
@mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False)
def test_logging_with_run_task_subprocess(self):
# We are not using self.assertLogs as we want to verify what actually is stored in the Log file
# as that is what gets displayed
with conf_vars({("core", "dags_folder"): self.dag_path}):
task_command.task_run(self.parser.parse_args(self.task_args))

Expand All @@ -665,10 +659,7 @@ def test_logging_with_run_task_subprocess(self):
self.assert_log_line("Log from TI Logger", logs_list)
self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True)

assert (
f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', "
f"'{self.task_id}', '{self.execution_date_str}'," in logs
)
assert f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', '{self.task_id}'," in logs
assert (
f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
f"task_id={self.task_id}, execution_date=20170101T000000" in logs
Expand Down Expand Up @@ -696,7 +687,7 @@ def test_log_file_template_with_run_task(self):
pass

@mock.patch.object(task_command, "_run_task_by_selected_method")
def test_root_logger_restored(self, run_task_mock):
def test_root_logger_restored(self, run_task_mock, caplog):
"""Verify that the root logging context is restored"""

logger = logging.getLogger("foo.bar")
Expand All @@ -711,19 +702,18 @@ def task_inner(*args, **kwargs):
("logging", "logging_level"): "INFO",
}

with conf_vars(config):
with self.assertLogs(level=logging.WARNING) as captured:
with caplog.at_level(level=logging.WARNING):
with conf_vars(config):
logger.warning("not redirected")
task_command.task_run(self.parser.parse_args(self.task_args))

assert captured.output == ["WARNING:foo.bar:not redirected"]
assert "not redirected" in caplog.text
assert self.root_logger.level == logging.WARNING

assert self.root_logger.handlers == self.root_handlers

@pytest.mark.quarantined
@mock.patch.object(task_command, "_run_task_by_selected_method")
def test_disable_handler_modifying(self, run_task_mock):
@pytest.mark.parametrize("do_not_modify_handler", [True, False])
def test_disable_handler_modifying(self, run_task_mock, caplog, do_not_modify_handler):
"""If [core] donot_modify_handlers is set to True, the root logger is untouched"""
from airflow import settings

Expand All @@ -738,16 +728,18 @@ def task_inner(*args, **kwargs):
("core", "dags_folder"): self.dag_path,
("logging", "logging_level"): "INFO",
}
old_value = settings.DONOT_MODIFY_HANDLERS
settings.DONOT_MODIFY_HANDLERS = True

with conf_vars(config):
with self.assertLogs(level=logging.WARNING) as captured:
task_command.task_run(self.parser.parse_args(self.task_args))

assert captured.output == ["WARNING:foo.bar:not redirected"]

settings.DONOT_MODIFY_HANDLERS = old_value
with caplog.at_level(logging.WARNING, logger="foo.bar"):
with conf_vars(config):
old_value = settings.DONOT_MODIFY_HANDLERS
settings.DONOT_MODIFY_HANDLERS = do_not_modify_handler
try:
task_command.task_run(self.parser.parse_args(self.task_args))
if do_not_modify_handler:
assert "not redirected" in caplog.text
else:
assert "not redirected" not in caplog.text
finally:
settings.DONOT_MODIFY_HANDLERS = old_value


def test_context_with_run():
Expand Down

0 comments on commit 7cab43f

Please sign in to comment.