Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch served logs also when task attempt is up for retry and no remote logs available #39496

Merged
merged 3 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,11 @@ def _read(
executor_messages: list[str] = []
executor_logs: list[str] = []
served_logs: list[str] = []
is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)
is_in_running_or_deferred = ti.state in (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
is_up_for_retry = ti.state == TaskInstanceState.UP_FOR_RETRY
with suppress(NotImplementedError):
remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
messages_list.extend(remote_messages)
Expand All @@ -377,7 +381,7 @@ def _read(
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
local_messages, local_logs = self._read_from_local(worker_log_full_path)
messages_list.extend(local_messages)
if is_in_running_or_deferred and not executor_messages and not remote_logs:
if (is_in_running_or_deferred or is_up_for_retry) and not executor_messages and not remote_logs:
# While task instance is still running and we don't have either executor nor remote logs, look for served logs
# This is for cases when users have not setup remote logging nor shared drive for logs
served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
Expand Down
24 changes: 17 additions & 7 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,28 +313,35 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
else:
mock_k8s_get_task_log.assert_not_called()

def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance):
@pytest.mark.parametrize(
"state", [TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, TaskInstanceState.UP_FOR_RETRY]
)
def test__read_for_celery_executor_fallbacks_to_worker(self, state, create_task_instance):
"""Test for executors which do not have `get_task_log` method, it fallbacks to reading
log from worker if and only if remote logs aren't found"""
executor_name = "CeleryExecutor"

# Reading logs from worker should occur when the task is either running, deferred, or up for retry.
ti = create_task_instance(
dag_id="dag_for_testing_celery_executor_log_read",
dag_id=f"dag_for_testing_celery_executor_log_read_{state}",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
ti.state = TaskInstanceState.RUNNING
ti.try_number = 2
ti.state = state
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")

fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=2)
fth._read_from_logs_server.assert_called_once()
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": False, "log_pos": 16})
# If we are in the up for retry state, the log has ended.
expected_end_of_log = state in (TaskInstanceState.UP_FOR_RETRY)
assert actual == (
"*** this message\nthis\nlog\ncontent",
{"end_of_log": expected_end_of_log, "log_pos": 16},
)

# Previous try_number should return served logs when remote logs aren't implemented
fth._read_from_logs_server = mock.Mock()
Expand All @@ -353,7 +360,10 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc
actual = fth._read(ti=ti, try_number=1)
fth._read_remote_logs.assert_called_once()
fth._read_from_logs_server.assert_not_called()
assert actual == ("*** remote logs\nremote\nlog\ncontent", {"end_of_log": True, "log_pos": 18})
assert actual == (
"*** remote logs\nremote\nlog\ncontent",
{"end_of_log": True, "log_pos": 18},
)

@pytest.mark.parametrize(
"remote_logs, local_logs, served_logs_checked",
Expand Down