Skip to content

Commit

Permalink
[release] stream the full anyscale log to buildkite (ray-project#47808)
Browse files Browse the repository at this point in the history
Currently we only print 100 last lines of anyscale job log to buildkite.
This PR removes that limit and prints everything instead. CC:
@kouroshHakha

Test:
- CI

Signed-off-by: can <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
can-anyscale authored and ujjawal-khare committed Oct 15, 2024
1 parent e71d03c commit a5e69c4
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 6 deletions.
7 changes: 6 additions & 1 deletion release/ray_release/buildkite/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ def get_step(

step = copy.deepcopy(DEFAULT_STEP_TEMPLATE)

cmd = ["./release/run_release_test.sh", test["name"]]
cmd = [
"./release/run_release_test.sh",
test["name"],
"--log-streaming-limit",
"100",
]

for file in test_collection_file or []:
cmd += ["--test-collection-file", file]
Expand Down
4 changes: 3 additions & 1 deletion release/ray_release/cluster_manager/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
add_tags_to_aws_config,
RELEASE_AWS_RESOURCE_TYPES_TO_TRACK_FOR_BILLING,
)
from ray_release.anyscale_util import get_project_name
from ray_release.anyscale_util import get_project_name, LAST_LOGS_LENGTH
from ray_release.config import DEFAULT_AUTOSUSPEND_MINS, DEFAULT_MAXIMUM_UPTIME_MINS
from ray_release.test import Test
from ray_release.exception import CloudInfoError
Expand All @@ -24,13 +24,15 @@ def __init__(
project_id: str,
sdk: Optional["AnyscaleSDK"] = None,
smoke_test: bool = False,
log_streaming_limit: int = LAST_LOGS_LENGTH,
):
self.sdk = sdk or get_anyscale_sdk()

self.test = test
self.smoke_test = smoke_test
self.project_id = project_id
self.project_name = get_project_name(self.project_id, self.sdk)
self.log_streaming_limit = log_streaming_limit

self.cluster_name = (
f"{test.get_name()}{'-smoke-test' if smoke_test else ''}_{int(time.time())}"
Expand Down
6 changes: 5 additions & 1 deletion release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Optional, List, Tuple

from ray_release.alerts.handle import handle_result, require_result
from ray_release.anyscale_util import get_cluster_name
from ray_release.anyscale_util import get_cluster_name, LAST_LOGS_LENGTH
from ray_release.buildkite.output import buildkite_group, buildkite_open_last
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.cluster_manager.full import FullClusterManager
Expand Down Expand Up @@ -75,6 +75,7 @@ def _load_test_configuration(
smoke_test: bool = False,
no_terminate: bool = False,
test_definition_root: Optional[str] = None,
log_streaming_limit: int = LAST_LOGS_LENGTH,
) -> Tuple[ClusterManager, CommandRunner, str]:
logger.info(f"Test config: {test}")

Expand Down Expand Up @@ -129,6 +130,7 @@ def _load_test_configuration(
test,
anyscale_project,
smoke_test=smoke_test,
log_streaming_limit=log_streaming_limit,
)
command_runner = command_runner_cls(
cluster_manager,
Expand Down Expand Up @@ -390,6 +392,7 @@ def run_release_test(
cluster_env_id: Optional[str] = None,
no_terminate: bool = False,
test_definition_root: Optional[str] = None,
log_streaming_limit: int = LAST_LOGS_LENGTH,
) -> Result:
old_wd = os.getcwd()
start_time = time.monotonic()
Expand All @@ -407,6 +410,7 @@ def run_release_test(
smoke_test,
no_terminate,
test_definition_root,
log_streaming_limit,
)
buildkite_group(":nut_and_bolt: Setting up cluster environment")
(
Expand Down
8 changes: 6 additions & 2 deletions release/ray_release/job_manager/anyscale_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
CreateProductionJob,
HaJobStates,
)
from ray_release.anyscale_util import LAST_LOGS_LENGTH, get_cluster_name
from ray_release.anyscale_util import get_cluster_name
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.exception import (
CommandTimeout,
Expand Down Expand Up @@ -273,7 +273,11 @@ def _get_ray_logs(self) -> str:
"""
Obtain the last few logs
"""
return anyscale.job.get_logs(id=self.job_id, max_lines=LAST_LOGS_LENGTH)
if self.cluster_manager.log_streaming_limit == -1:
return anyscale.job.get_logs(id=self.job_id)
return anyscale.job.get_logs(
id=self.job_id, max_lines=self.cluster_manager.log_streaming_limit
)

def get_last_logs(self):
if not self.job_id:
Expand Down
9 changes: 9 additions & 0 deletions release/ray_release/scripts/run_release_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ray_release.reporter.ray_test_db import RayTestDBReporter
from ray_release.reporter.log import LogReporter
from ray_release.result import Result
from ray_release.anyscale_util import LAST_LOGS_LENGTH


@click.command()
Expand Down Expand Up @@ -89,6 +90,12 @@
type=str,
help="Root of the test definition files. Default is the root of the repo.",
)
@click.option(
"--log-streaming-limit",
default=LAST_LOGS_LENGTH,
type=int,
help="Limit of log streaming in number of lines. Set to -1 to stream all logs.",
)
def main(
test_name: str,
test_collection_file: Tuple[str],
Expand All @@ -100,6 +107,7 @@ def main(
global_config: str = "oss_config.yaml",
no_terminate: bool = False,
test_definition_root: Optional[str] = None,
log_streaming_limit: int = LAST_LOGS_LENGTH,
):
global_config_file = os.path.join(
os.path.dirname(__file__), "..", "configs", global_config
Expand Down Expand Up @@ -157,6 +165,7 @@ def main(
cluster_env_id=cluster_env_id,
no_terminate=no_terminate,
test_definition_root=test_definition_root,
log_streaming_limit=log_streaming_limit,
)
return_code = result.return_code
except ReleaseTestError as e:
Expand Down
9 changes: 8 additions & 1 deletion release/ray_release/tests/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ def __init__(
project_id: str,
sdk=None,
smoke_test: bool = False,
log_streaming_limit: int = 100,
):
super(MockClusterManager, self).__init__(
test_name, project_id, this_sdk, smoke_test=smoke_test
test_name,
project_id,
this_sdk,
smoke_test=smoke_test,
log_streaming_limit=log_streaming_limit,
)
self.return_dict = this_cluster_manager_return
this_instances["cluster_manager"] = self
Expand Down Expand Up @@ -237,6 +242,7 @@ def _run(self, result: Result, **kwargs):
test=self.test,
anyscale_project=self.anyscale_project,
result=result,
log_streaming_limit=1000,
**kwargs
)

Expand Down Expand Up @@ -473,6 +479,7 @@ def testAlertFails(self):

self.assertEqual(result.return_code, ExitCode.COMMAND_ALERT.value)
self.assertEqual(result.status, "error")
self.assertEqual(self.instances["cluster_manager"].log_streaming_limit, 1000)

# Ensure cluster was terminated
self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
Expand Down

0 comments on commit a5e69c4

Please sign in to comment.