diff --git a/e2e_tests/tests/cluster/test_logging.py b/e2e_tests/tests/cluster/test_logging.py index e10dfb55779c..bbb53588a38e 100644 --- a/e2e_tests/tests/cluster/test_logging.py +++ b/e2e_tests/tests/cluster/test_logging.py @@ -1,6 +1,7 @@ import functools import re -import socket +import sys +import threading from typing import Any, Callable, Dict, Iterable, Optional, Union import pytest @@ -59,7 +60,6 @@ def test_trial_logs() -> None: @pytest.mark.e2e_cpu_elastic @pytest.mark.e2e_cpu_cross_version @pytest.mark.e2e_gpu # Note, e2e_gpu and not gpu_required hits k8s cpu tests. -@pytest.mark.timeout(5 * 60) @pytest.mark.parametrize( "task_type,task_config,log_regex", [ @@ -105,13 +105,34 @@ def task_log_fields(follow: Optional[bool] = None) -> Iterable[LogFields]: return bindings.get_TaskLogsFields(session, taskId=task_id, follow=follow) try: - check_logs( - log_regex, - functools.partial(api.task_logs, session, task_id), - functools.partial(bindings.get_TaskLogsFields, session, taskId=task_id), - ) - except socket.timeout: - raise TimeoutError(f"timed out waiting for {task_type} with id {task_id}") + result: Optional[Exception] = None + + def do_check_logs() -> None: + nonlocal result + try: + check_logs( + log_regex, + functools.partial(api.task_logs, session, task_id), + functools.partial(bindings.get_TaskLogsFields, session, taskId=task_id), + ) + except Exception as e: + result = e + + thread = threading.Thread(target=do_check_logs, daemon=True) + thread.start() + thread.join(timeout=5 * 60) + if thread.is_alive(): + # The thread did not exit + raise ValueError("do_check_logs timed out") + elif isinstance(result, Exception): + # There was a failure on the thread. + raise result + except Exception: + print("============= test_task_logs_failed, logs from task =============") + for log in task_logs(): + print(log.log, end="", file=sys.stderr) + print("============= end of task logs =============") + raise finally: command._kill(master_url, task_type, task_id) @@ -127,9 +148,7 @@ def check_logs( if log_regex.match(log.message): break else: - for log in log_fn(follow=True): - print(log.message) - pytest.fail("ran out of logs without a match") + raise ValueError("ran out of logs without a match") # Just make sure these calls 200 and return some logs. assert any(log_fn(tail=10)), "tail returned no logs" diff --git a/e2e_tests/tests/experiment/experiment.py b/e2e_tests/tests/experiment/experiment.py index 956bf27b939d..d1bf3e9512ec 100644 --- a/e2e_tests/tests/experiment/experiment.py +++ b/e2e_tests/tests/experiment/experiment.py @@ -619,6 +619,24 @@ def assert_performed_final_checkpoint(exp_id: int) -> None: last_workload_matches_last_checkpoint(trials[0].workloads) +def run_cmd_and_print_on_error(cmd: List[str]) -> None: + """ + We run some commands to make sure they work, but we don't need their output polluting the logs. + """ + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = p.communicate() + ret = p.wait() + if ret != 0: + print(f"cmd failed: {cmd} exited {ret}", file=sys.stderr) + print("====== stdout from failed command ======", file=sys.stderr) + print(out.decode("utf8"), file=sys.stderr) + print("====== end of stdout ======", file=sys.stderr) + print("====== stderr from failed command ======", file=sys.stderr) + print(err.decode("utf8"), file=sys.stderr) + print("====== end of stderr ======", file=sys.stderr) + raise ValueError(f"cmd failed: {cmd} exited {ret}") + + def run_describe_cli_tests(experiment_id: int) -> None: """ Runs `det experiment describe` CLI command on a finished @@ -627,7 +645,7 @@ def run_describe_cli_tests(experiment_id: int) -> None: """ # "det experiment describe" without metrics. with tempfile.TemporaryDirectory() as tmpdir: - subprocess.check_call( + run_cmd_and_print_on_error( [ "det", "-m", @@ -646,7 +664,7 @@ def run_describe_cli_tests(experiment_id: int) -> None: # "det experiment describe" with metrics. with tempfile.TemporaryDirectory() as tmpdir: - subprocess.check_call( + run_cmd_and_print_on_error( [ "det", "-m", @@ -671,14 +689,13 @@ def run_list_cli_tests(experiment_id: int) -> None: exception if the CLI command encounters a traceback failure. """ - subprocess.check_call( + run_cmd_and_print_on_error( ["det", "-m", conf.make_master_url(), "experiment", "list-trials", str(experiment_id)] ) - - subprocess.check_call( + run_cmd_and_print_on_error( ["det", "-m", conf.make_master_url(), "experiment", "list-checkpoints", str(experiment_id)] ) - subprocess.check_call( + run_cmd_and_print_on_error( [ "det", "-m",