Skip to content

Commit

Permalink
chore: write log shipper in python [MLG-993]
Browse files Browse the repository at this point in the history
Instead of orchestrating multiple processes with bash and
confusing/complex process substitution, with all the trapping of signals
and `printf x > ...` nonsense, just write a wrapper process in a real
programming language.  This reduces the complexity of our logging
solution, and prevents the out-of-order bugs inherent in having two
separate log shippers, one for stdout and one for stderr.

The new ship_logs.py has the following features:
- it launches and monitors a child command
- it shares a log buffer for shipping stdout and stderr
- it has the same log parsing regexes as enrich_logging.py
- it converts carriage returns to newlines
- it forwards signals to its child process
- it exits after a maximum of DET_LOG_WAIT_TIME or 30 seconds
- it depends on the python interpreter, but only the interpreter (all
  imports are from the standard library)
- in the special case that the child process can't be started, it ships
  an explanation of what happened to the master and exits with standard
  bash exit codes
  • Loading branch information
rb-determined-ai committed Oct 4, 2023
1 parent d11f6c9 commit a9ba331
Show file tree
Hide file tree
Showing 22 changed files with 1,216 additions and 534 deletions.
11 changes: 2 additions & 9 deletions e2e_tests/tests/cluster/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import functools
import re
import socket
from typing import Any, Callable, Dict, Iterable, Optional, Union

import _socket
import pytest

from determined.cli import command
Expand Down Expand Up @@ -78,13 +78,6 @@ def test_task_logs(task_type: str, task_config: Dict[str, Any], log_regex: Any)
rps = bindings.get_GetResourcePools(session)
assert rps.resourcePools and len(rps.resourcePools) > 0, "missing resource pool"

if (
rps.resourcePools[0].type == bindings.v1ResourcePoolType.K8S
and task_type == command.TaskTypeCommand
):
# TODO(DET-6712): Investigate intermittent slowness with K8s command logs.
pytest.skip("DET-6712: Investigate intermittent slowness with K8s command logs")

if task_type == command.TaskTypeTensorBoard:
exp_id = exp.run_basic_test(
conf.fixtures_path("no_op/single.yaml"),
Expand Down Expand Up @@ -117,7 +110,7 @@ def task_log_fields(follow: Optional[bool] = None) -> Iterable[LogFields]:
functools.partial(api.task_logs, session, task_id),
functools.partial(bindings.get_TaskLogsFields, session, taskId=task_id),
)
except _socket.timeout:
except socket.timeout:
raise TimeoutError(f"timed out waiting for {task_type} with id {task_id}")

finally:
Expand Down
Loading

0 comments on commit a9ba331

Please sign in to comment.