Skip to content

Commit

Permalink
Fetch served logs also when task attempt is up for retry and no remot…
Browse files Browse the repository at this point in the history
…e logs available (apache#39496)
  • Loading branch information
kahlstrm authored and RNHTTR committed Jun 1, 2024
1 parent 5f273ef commit b101a8f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
8 changes: 6 additions & 2 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,11 @@ def _read(
executor_messages: list[str] = []
executor_logs: list[str] = []
served_logs: list[str] = []
is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)
is_in_running_or_deferred = ti.state in (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
is_up_for_retry = ti.state == TaskInstanceState.UP_FOR_RETRY
with suppress(NotImplementedError):
remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
messages_list.extend(remote_messages)
Expand All @@ -377,7 +381,7 @@ def _read(
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
local_messages, local_logs = self._read_from_local(worker_log_full_path)
messages_list.extend(local_messages)
if is_in_running_or_deferred and not executor_messages and not remote_logs:
if (is_in_running_or_deferred or is_up_for_retry) and not executor_messages and not remote_logs:
# While task instance is still running and we don't have either executor nor remote logs, look for served logs
# This is for cases when users have not setup remote logging nor shared drive for logs
served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
Expand Down
24 changes: 17 additions & 7 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,28 +313,35 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
else:
mock_k8s_get_task_log.assert_not_called()

def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance):
@pytest.mark.parametrize(
"state", [TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, TaskInstanceState.UP_FOR_RETRY]
)
def test__read_for_celery_executor_fallbacks_to_worker(self, state, create_task_instance):
"""Test for executors which do not have `get_task_log` method, it fallbacks to reading
log from worker if and only if remote logs aren't found"""
executor_name = "CeleryExecutor"

# Reading logs from worker should occur when the task is either running, deferred, or up for retry.
ti = create_task_instance(
dag_id="dag_for_testing_celery_executor_log_read",
dag_id=f"dag_for_testing_celery_executor_log_read_{state}",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
ti.state = TaskInstanceState.RUNNING
ti.try_number = 2
ti.state = state
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")

fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=2)
fth._read_from_logs_server.assert_called_once()
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": False, "log_pos": 16})
# If we are in the up for retry state, the log has ended.
expected_end_of_log = state in (TaskInstanceState.UP_FOR_RETRY)
assert actual == (
"*** this message\nthis\nlog\ncontent",
{"end_of_log": expected_end_of_log, "log_pos": 16},
)

# Previous try_number should return served logs when remote logs aren't implemented
fth._read_from_logs_server = mock.Mock()
Expand All @@ -353,7 +360,10 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc
actual = fth._read(ti=ti, try_number=1)
fth._read_remote_logs.assert_called_once()
fth._read_from_logs_server.assert_not_called()
assert actual == ("*** remote logs\nremote\nlog\ncontent", {"end_of_log": True, "log_pos": 18})
assert actual == (
"*** remote logs\nremote\nlog\ncontent",
{"end_of_log": True, "log_pos": 18},
)

@pytest.mark.parametrize(
"remote_logs, local_logs, served_logs_checked",
Expand Down

0 comments on commit b101a8f

Please sign in to comment.