From 2631109b021db7e48d2ad22dc0cf64b0c7fb82a7 Mon Sep 17 00:00:00 2001 From: scv119 Date: Mon, 3 Jan 2022 09:50:51 -0800 Subject: [PATCH 1/3] fix --- release/nightly_tests/dataset/ray_sgd_training.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/nightly_tests/dataset/ray_sgd_training.py b/release/nightly_tests/dataset/ray_sgd_training.py index b75b927c0e4a..0d941d469d23 100644 --- a/release/nightly_tests/dataset/ray_sgd_training.py +++ b/release/nightly_tests/dataset/ray_sgd_training.py @@ -36,7 +36,7 @@ def handle_result(self, results, **info): # TODO: fix type hint for logdir def start_training(self, logdir, **info): - mlflow.start_run(run_name=str(logdir.name)) + mlflow.start_run(run_name=logdir) mlflow.log_params(config) # TODO: Update TrainCallback to provide logdir in finish_training. From 81d0b91ad54853619d365695981052b7b220b3cd Mon Sep 17 00:00:00 2001 From: scv119 Date: Mon, 3 Jan 2022 09:56:49 -0800 Subject: [PATCH 2/3] fix test failure --- .../nightly_tests/dataset/ray_sgd_training.py | 31 +++---------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/release/nightly_tests/dataset/ray_sgd_training.py b/release/nightly_tests/dataset/ray_sgd_training.py index 0d941d469d23..42fa8de1032a 100644 --- a/release/nightly_tests/dataset/ray_sgd_training.py +++ b/release/nightly_tests/dataset/ray_sgd_training.py @@ -18,35 +18,11 @@ from ray import train from ray.data.aggregate import Mean, Std from ray.data.dataset_pipeline import DatasetPipeline -from ray.train import Trainer, TrainingCallback +from ray.train import Trainer +from ray.train.callbacks.logging import MLflowLoggerCallback from ray.train.callbacks import TBXLoggerCallback -# TODO(amogkam): Upstream this into Ray Train. -class MLflowCallback(TrainingCallback): - def __init__(self, config): - self.config = config - - def handle_result(self, results, **info): - # For each result that's being reported by ``train.report()``, - # we get the result from the rank 0 worker (i.e. first worker) and - # report it to MLflow. - rank_zero_results = results[0] - mlflow.log_metrics(rank_zero_results) - - # TODO: fix type hint for logdir - def start_training(self, logdir, **info): - mlflow.start_run(run_name=logdir) - mlflow.log_params(config) - - # TODO: Update TrainCallback to provide logdir in finish_training. - self.logdir = logdir - - def finish_training(self, error: bool = False, **info): - # Save the Trainer checkpoints as artifacts to mlflow. - mlflow.log_artifacts(self.logdir) - - def read_dataset(path: str) -> ray.data.Dataset: print(f"reading data from {path}") return ray.data.read_parquet(path, _spread_resource_prefix="node:") \ @@ -593,7 +569,8 @@ def train(self): os.makedirs(tbx_runs_dir, exist_ok=True) callbacks = [ TBXLoggerCallback(logdir=tbx_runs_dir), - MLflowCallback(config) + MLflowLoggerCallback( + experiment_name="cuj-big-data-training", save_artifact=True) ] # Remove CPU resource so Datasets can be scheduled. From 8bcda5969800c01aef2b6d7099087aafffdd4ea8 Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Mon, 3 Jan 2022 10:31:17 -0800 Subject: [PATCH 3/3] Update release/nightly_tests/dataset/ray_sgd_training.py Co-authored-by: matthewdeng --- release/nightly_tests/dataset/ray_sgd_training.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/release/nightly_tests/dataset/ray_sgd_training.py b/release/nightly_tests/dataset/ray_sgd_training.py index 42fa8de1032a..67c97c391dfc 100644 --- a/release/nightly_tests/dataset/ray_sgd_training.py +++ b/release/nightly_tests/dataset/ray_sgd_training.py @@ -19,8 +19,7 @@ from ray.data.aggregate import Mean, Std from ray.data.dataset_pipeline import DatasetPipeline from ray.train import Trainer -from ray.train.callbacks.logging import MLflowLoggerCallback -from ray.train.callbacks import TBXLoggerCallback +from ray.train.callbacks import MLflowLoggerCallback, TBXLoggerCallback def read_dataset(path: str) -> ray.data.Dataset: