Skip to content

Commit

Permalink
chore: write log shipper in python
Browse files Browse the repository at this point in the history
Instead of orchestrating multiple processes with bash and
confusing/complex process substitution, with all the trapping of signals
and `printf x > ...` nonsense, just write a wrapper process in a real
programming language.  This reduces the complexity of our logging
solution, and prevents the out-of-order bugs inherent in having two
separate log shippers, one for stdout and one for stderr.

The new ship_logs.py has the following features:
- it launches and monitors a child command
- it shares a log buffer for shipping stdout and stderr
- it has the same log parsing regexes as enrich_logging.py
- it converts carriage returns to newlines
- it forwards signals to its child process
- it exits after a maximum of DET_LOG_WAIT_TIME or 30 seconds
- it depends on the python interpreter, but only the interpreter (all
  imports are from the standard library)
- in the special case that the child process can't be started, it ships
  an explanation of what happened to the master and exits with standard
  bash exit codes
  • Loading branch information
rb-determined-ai committed Oct 2, 2023
1 parent e031a2f commit 6211fb3
Show file tree
Hide file tree
Showing 19 changed files with 940 additions and 375 deletions.
4 changes: 2 additions & 2 deletions e2e_tests/tests/cluster/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
from typing import Any, Callable, Dict, Iterable, Optional, Union

import _socket
import socket
import pytest

from determined.cli import command
Expand Down Expand Up @@ -117,7 +117,7 @@ def task_log_fields(follow: Optional[bool] = None) -> Iterable[LogFields]:
functools.partial(api.task_logs, session, task_id),
functools.partial(bindings.get_TaskLogsFields, session, taskId=task_id),
)
except _socket.timeout:
except socket.timeout:
raise TimeoutError(f"timed out waiting for {task_type} with id {task_id}")

finally:
Expand Down
334 changes: 334 additions & 0 deletions e2e_tests/tests/cluster/test_ship_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
import socket
import sys
import threading
import os
import logging
import json
import textwrap
import subprocess
import signal
import time


here = os.path.dirname(__file__)
static_srv = os.path.join(here, "../../../master/static/srv")
old = sys.path
try:
sys.path = [static_srv] + sys.path
import ship_logs
finally:
sys.path = old


class ShipLogServer:
"""
A tiny, hacky http server for testing ship logs.
It's about the same amount of code as subclassing the stdlib's SimpleHTTPRequestHandler, but it
shuts down much faster.
"""

def __init__(self):
self.quit = False
self.logs = []

self.listener = socket.socket()
self.listener.bind(("127.0.0.1", 0))
self.listener.listen(5)

_, self.port = self.listener.getsockname()

self.thread = threading.Thread(target=self.serve_requests)
self.thread.start()

def __enter__(self):
return self

def __exit__(self, *args):
self.quit = True
# Wake up the accept() call.
try:
with socket.socket() as s:
s.connect(("127.0.0.1", self.port))
s.send(b"quit")
except Exception:
logging.error("failed to wake up accept loop", exc_info=True)
self.thread.join()
self.listener.close()

def serve_requests(self):
try:
while not self.quit:
# Accept a conneciton.
s, _ = self.listener.accept()
try:
if self.quit:
return
self.serve_one_request(s)
except Exception:
logging.error("error reading request", exc_info=True)
finally:
s.close()
except Exception:
logging.error("server crashed", with_exc=True)

def serve_one_request(self, s):
# Receive headers.
hdrs = b""
while b"\r\n\r\n" not in hdrs:
buf = s.recv(4096)
if not buf:
# EOF
return
hdrs += buf
# Receive body until we have valid json.
hdrs, body = hdrs.split(b"\r\n\r\n", maxsplit=1)
while True:
try:
jbody = json.loads(body)
break
except json.decoder.JSONDecodeError:
# Must not have the full body yet.
pass
buf = s.recv(4096)
if not buf:
# EOF
return
body += buf

# Remember the logs we saw.
self.logs.extend(l["log"] for l in jbody)

# Send a response.
s.sendall(b"HTTP/1.1 200 OK\r\n\r\n")

def master_url(self):
return f"http://127.0.0.1:{self.port}"

def pop_logs(self):
out = self.logs
self.logs = []
return out


class TestShipLogs:
"""
A suite of unit tests for ship_logs.py
Yeah, it's a hack that these tests live in e2e tests. But since they test python code it's
just easier this way.
"""

def run_ship_logs(self, master_url, cmd, log_wait_time=30):
return ship_logs.main(
master_url=master_url,
cert_name="",
cert_file="",
metadata={},
token="token",
emit_stdout_logs=False,
cmd=cmd,
log_wait_time=log_wait_time,
)

def popen_ship_logs(self, srv, cmd):
env = {
"DET_MASTER": srv.master_url(),
"DET_SESSION_TOKEN": "token",
"DET_TASK_LOGGING_METADATA": "{}",
"DET_SHIPPER_EMIT_STDOUT_LOGS": "1",
}
fullcmd = [sys.executable, ship_logs.__file__] + cmd
return subprocess.Popen(fullcmd, env=env, stdout=subprocess.PIPE)

def mkcmd(self, script):
# -u: don't buffer stdout/stderr.
return [sys.executable, "-u", "-c", textwrap.dedent(script)]

def test_preserve_exit(self) -> None:
cmd = self.mkcmd(
"""
import sys
print("hi", file=sys.stdout, flush=True)
print("bye", file=sys.stderr, flush=True)
sys.exit(9)
"""
)
with ShipLogServer() as srv:
exit_code = self.run_ship_logs(srv.master_url(), cmd)
assert exit_code == 9, exit_code
# Ordering of stdout vs stderr is non-deterministic.
assert set(srv.logs) == {"hi\n", "bye\n"}, srv.logs

def test_cr_to_lf(self) -> None:
cmd = self.mkcmd(
r"""
print("1\n", end="")
print("2\r", end="")
print("3\r\n", end="")
"""
)
with ShipLogServer() as srv:
exit_code = self.run_ship_logs(srv.master_url(), cmd)
assert exit_code == 0, exit_code
assert "".join(srv.logs) == "1\n2\n3\n\n", srv.logs

def test_stdout_stderr_ordering(self) -> None:
# Stdout and stderr are collected on different threads, and therefore _can't_ be perfectly
# synced. But they should be "approximately" synced; i.e. each 1-second batch should
# contain both log types.
#
# Most dev machines probably will be fine with small timeouts, but CI machines might be
# slower and we allow up to 0.2 seconds of slop.
timeouts = [0.001, 0.2]
for timeout in timeouts:
cmd = self.mkcmd(
f"""
import sys
import time
print("1", file=sys.stdout, flush=True)
time.sleep({timeout})
print("2", file=sys.stderr, flush=True)
time.sleep({timeout})
print("3", file=sys.stdout, flush=True)
time.sleep({timeout})
print("4", file=sys.stderr, flush=True)
time.sleep({timeout})
print("5", file=sys.stdout, flush=True)
time.sleep({timeout})
print("6", file=sys.stderr, flush=True)
"""
)
with ShipLogServer() as srv:
exit_code = self.run_ship_logs(srv.master_url(), cmd)
assert exit_code == 0, exit_code
if "".join(srv.logs) == "1\n2\n3\n4\n5\n6\n":
# Success
break
elif timeout == timeouts[-1]:
# Failed, even on the highest timeout
raise ValueError("".join(srv.logs))

def test_signal_forwarding(self) -> None:
cmd = self.mkcmd(
"""
import signal
import time
def handle_sigint(*arg):
print("caught sigint")
exit(7)
signal.signal(signal.SIGTERM, handle_sigint)
print("ready!", flush=True)
time.sleep(5)
"""
)
with ShipLogServer() as srv:
# Start a subprocess so we can signal it.
env = {
"DET_MASTER": srv.master_url(),
"DET_SESSION_TOKEN": "token",
"DET_TASK_LOGGING_METADATA": "{}",
"DET_SHIPPER_EMIT_STDOUT_LOGS": "1",
}
fullcmd = [sys.executable, "-u", ship_logs.__file__] + cmd
p = subprocess.Popen(fullcmd, env=env, stdout=subprocess.PIPE)
try:
# Wait for the granchild log to indicate signals are set up.
for line in p.stdout:
if b"ready!" in line:
break
p.send_signal(signal.SIGTERM)
exit_code = p.wait()
finally:
p.kill()
p.wait()
assert exit_code == 7, exit_code
assert "".join(srv.logs) == "ready!\ncaught sigint\n", srv.logs

def test_exit_wait_time(self) -> None:
cmd = self.mkcmd("print('hello world')")
# We need a misbehaving server to guarantee the shipper times out.
# This misbehaving server will listen without ever accepting.
with socket.socket() as listener:
listener.bind(("127.0.0.1", 0))
listener.listen(10)
_, port = listener.getsockname()
master_url = f"http://127.0.0.1:{port}"
start = time.time()
exit_code = self.run_ship_logs(master_url, cmd, log_wait_time=.1)
end = time.time()
assert exit_code == 0, exit_code
assert end - start < 1, end - start

def test_entrypoint_not_found(self) -> None:
cmd = ["/does-not-exist"]
with ShipLogServer() as srv:
exit_code = self.run_ship_logs(srv.master_url(), cmd)
# 127 is the standard bash exit code for file-not-found.
assert exit_code == 127, exit_code
assert "FileNotFoundError" in "".join(srv.logs), srv.logs

def test_entrypoint_not_executable(self) -> None:
cmd = ["/bin/"]
with ShipLogServer() as srv:
exit_code = self.run_ship_logs(srv.master_url(), cmd)
# 126 is the standard bash exit code for permission failure.
assert exit_code == 126, exit_code
assert "PermissionError" in "".join(srv.logs), srv.logs

def test_only_standard_library_dependences(self) -> None:
cmd = self.mkcmd(
"""
# ONLY STANDARD LIBRARY IMPORTS ARE ALLOWED
import datetime
import io
import json
import logging
import os
import queue
import re
import signal
import ssl
import subprocess
import sys
import threading
import time
import traceback
import typing
import urllib.request
# END OF STANDARD LIBRARY IMPORTS
# The only new module allowed after import is ship_logs itself.
allowed_modules = set((*sys.modules, "ship_logs"))
sys.path = ["%s"] + sys.path
import ship_logs
new_modules = set(sys.modules).difference(allowed_modules)
for n in new_modules:
print(n)
exit(1 if new_modules else 0)
"""%(static_srv)
)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
errmsgs = p.stdout.read().decode("utf8")
assert p.wait() == 0, "\n" + errmsgs


if __name__ == "__main__":
logging.basicConfig()
t = TestShipLogs()
t.test_preserve_exit()
t.test_cr_to_lf()
t.test_stdout_stderr_ordering()
t.test_signal_forwarding()
t.test_exit_wait_time()
t.test_entrypoint_not_found()
t.test_entrypoint_not_executable()
t.test_only_standard_library_dependences()
3 changes: 1 addition & 2 deletions harness/determined/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@
# LOG_FORMAT is the standard format for use with the logging module, which is required for the
# WebUI's log viewer to filter logs by log level.
#
# Dev note: if this format is changed,
# the enrich-task-logs.py log parsing must be updated as well.
# Dev note: if this format is changed, the ship_logs.py log parsing must be updated as well.
LOG_FORMAT = "%(levelname)s: [%(process)s] %(name)s: %(message)s"
2 changes: 0 additions & 2 deletions master/internal/rm/kubernetesrm/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,6 @@ func (p *pod) createPodSpec(scheduler string) error {

var sidecars []k8sV1.Container

envVars = append(envVars, k8sV1.EnvVar{Name: "DET_K8S_LOG_TO_FILE", Value: "true"})

container := k8sV1.Container{
Name: model.DeterminedK8ContainerName,
Command: spec.Entrypoint,
Expand Down
6 changes: 4 additions & 2 deletions master/pkg/etc/etc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ const (
NotebookIdleCheckResource = "check_idle.py"
// TaskCheckReadyLogsResource is the script to parse logs to check if a task is ready.
TaskCheckReadyLogsResource = "check_ready_logs.py"
// TaskEnrichLogsResource is the script to enrich logs for slurm (which doesn't run fluent).
TaskEnrichLogsResource = "enrich_task_logs.py"
// TaskShipLogsShellResource is the shell script to call the python script to ship logs.
TaskShipLogsShellResource = "ship_logs.sh"
// TaskShipLogsPythonResource is the python script to ship logs.
TaskShipLogsPythonResource = "ship_logs.py"
// TensorboardEntryScriptResource is the script to set up TensorBoard.
TensorboardEntryScriptResource = "tensorboard-entrypoint.sh"
// TrialEntrypointScriptResource is the script to set up a trial.
Expand Down
7 changes: 5 additions & 2 deletions master/pkg/tasks/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ const (
taskSignalHandlingScript = "task-signal-handling.sh"
taskSignalHandlingMode = 0o744

taskEnrichLogsScript = "enrich_task_logs.py"
taskEnrichLogsScriptMode = 0o744
taskShipLogsShell = "ship_logs.sh"
taskShipLogsShellMode = 0o755

taskShipLogsPython = "ship_logs.py"
taskShipLogsPythonMode = 0o755

// Put as many ssh-related files in /run/determined as possible. In particular, it is very
// important that we don't overwrite the user's host $HOME/.ssh/id_rsa, if the user happens to
Expand Down
Loading

0 comments on commit 6211fb3

Please sign in to comment.