From cfc5806c2d758e762b3a8a6257109f13edc5eea9 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Wed, 14 Jul 2021 09:38:55 +0200 Subject: [PATCH] [release] LightGBM release tests (#17043) --- python/ray/util/lightgbm/BUILD | 36 +++++ python/ray/util/lightgbm/__init__.py | 18 +++ python/ray/util/lightgbm/release_test_util.py | 149 ++++++++++++++++++ python/ray/util/lightgbm/simple_example.py | 44 ++++++ python/ray/util/lightgbm/simple_tune.py | 95 +++++++++++ python/ray/util/lightgbm/tests/__init__.py | 0 python/ray/util/lightgbm/tests/test_client.py | 28 ++++ release/lightgbm_tests/README.rst | 24 +++ release/lightgbm_tests/app_config.yaml | 24 +++ .../lightgbm_tests/cluster_cpu_moderate.yaml | 38 +++++ release/lightgbm_tests/cluster_cpu_small.yaml | 38 +++++ release/lightgbm_tests/create_test_data.py | 58 +++++++ release/lightgbm_tests/lightgbm_tests.yaml | 83 ++++++++++ release/lightgbm_tests/requirements.txt | 3 + release/lightgbm_tests/setup_lightgbm.sh | 15 ++ release/lightgbm_tests/tpl_cpu_moderate.yaml | 15 ++ release/lightgbm_tests/tpl_cpu_small.yaml | 15 ++ release/lightgbm_tests/wait_cluster.py | 49 ++++++ .../workloads/distributed_api_test.py | 26 +++ .../workloads/ft_small_non_elastic.py | 51 ++++++ .../workloads/train_moderate.py | 49 ++++++ .../lightgbm_tests/workloads/train_small.py | 67 ++++++++ .../workloads/train_small_connect.py | 58 +++++++ release/lightgbm_tests/workloads/tune_32x4.py | 72 +++++++++ release/lightgbm_tests/workloads/tune_4x32.py | 72 +++++++++ .../lightgbm_tests/workloads/tune_small.py | 72 +++++++++ 26 files changed, 1199 insertions(+) create mode 100644 python/ray/util/lightgbm/BUILD create mode 100644 python/ray/util/lightgbm/__init__.py create mode 100644 python/ray/util/lightgbm/release_test_util.py create mode 100644 python/ray/util/lightgbm/simple_example.py create mode 100644 python/ray/util/lightgbm/simple_tune.py create mode 100644 python/ray/util/lightgbm/tests/__init__.py create mode 100644 python/ray/util/lightgbm/tests/test_client.py create mode 100644 release/lightgbm_tests/README.rst create mode 100755 release/lightgbm_tests/app_config.yaml create mode 100644 release/lightgbm_tests/cluster_cpu_moderate.yaml create mode 100644 release/lightgbm_tests/cluster_cpu_small.yaml create mode 100644 release/lightgbm_tests/create_test_data.py create mode 100644 release/lightgbm_tests/lightgbm_tests.yaml create mode 100644 release/lightgbm_tests/requirements.txt create mode 100755 release/lightgbm_tests/setup_lightgbm.sh create mode 100644 release/lightgbm_tests/tpl_cpu_moderate.yaml create mode 100644 release/lightgbm_tests/tpl_cpu_small.yaml create mode 100644 release/lightgbm_tests/wait_cluster.py create mode 100644 release/lightgbm_tests/workloads/distributed_api_test.py create mode 100644 release/lightgbm_tests/workloads/ft_small_non_elastic.py create mode 100644 release/lightgbm_tests/workloads/train_moderate.py create mode 100644 release/lightgbm_tests/workloads/train_small.py create mode 100644 release/lightgbm_tests/workloads/train_small_connect.py create mode 100644 release/lightgbm_tests/workloads/tune_32x4.py create mode 100644 release/lightgbm_tests/workloads/tune_4x32.py create mode 100644 release/lightgbm_tests/workloads/tune_small.py diff --git a/python/ray/util/lightgbm/BUILD b/python/ray/util/lightgbm/BUILD new file mode 100644 index 000000000000..3358a1e24f0a --- /dev/null +++ b/python/ray/util/lightgbm/BUILD @@ -0,0 +1,36 @@ +# -------------------------------------------------------------------- +# Tests from the python/ray/util/lightgbm directory. +# Please keep these sorted alphabetically. +# -------------------------------------------------------------------- +py_test( + name = "simple_example", + size = "small", + srcs = ["simple_example.py"], + deps = [":lgbm_lib"], + tags = ["exclusive"], +) + +py_test( + name = "simple_tune", + size="small", + srcs = ["simple_tune.py"], + deps = [":lgbm_lib"], + tags = ["exclusive"] +) + +py_test( + name = "test_client", + size = "small", + srcs = ["tests/test_client.py"], + deps = [":lgbm_lib"], + tags = ["exclusive", "client"] +) + +# This is a dummy test dependency that causes the above tests to be +# re-run if any of these files changes. +py_library( + name = "lgbm_lib", + srcs = glob(["**/*.py"]), +) + + diff --git a/python/ray/util/lightgbm/__init__.py b/python/ray/util/lightgbm/__init__.py new file mode 100644 index 000000000000..c09684c27f17 --- /dev/null +++ b/python/ray/util/lightgbm/__init__.py @@ -0,0 +1,18 @@ +import logging + +logger = logging.getLogger(__name__) + +train = None +predict = None +RayParams = None +RayDMatrix = None +RayFileType = None + +try: + from lightgbm_ray import train, predict, RayParams, RayDMatrix, RayFileType +except ImportError: + logger.info( + "lightgbm_ray is not installed. Please run " + "`pip install git+https://github.com/ray-project/lightgbm_ray`.") + +__all__ = ["train", "predict", "RayParams", "RayDMatrix", "RayFileType"] diff --git a/python/ray/util/lightgbm/release_test_util.py b/python/ray/util/lightgbm/release_test_util.py new file mode 100644 index 000000000000..630dc7415af0 --- /dev/null +++ b/python/ray/util/lightgbm/release_test_util.py @@ -0,0 +1,149 @@ +import glob +import os +import time + +import ray + +from lightgbm_ray import train, RayDMatrix, RayFileType, \ + RayParams, RayDeviceQuantileDMatrix +from lightgbm_ray.tune import _TuneLGBMRank0Mixin +from lightgbm.callback import CallbackEnv + +if "OMP_NUM_THREADS" in os.environ: + del os.environ["OMP_NUM_THREADS"] + + +@ray.remote +class FailureState: + def __init__(self): + self._failed_ids = set() + + def set_failed(self, id): + if id in self._failed_ids: + return False + self._failed_ids.add(id) + return True + + def has_failed(self, id): + return id in self._failed_ids + + +class FailureInjection(_TuneLGBMRank0Mixin): + def __init__(self, id, state, ranks, iteration): + self._id = id + self._state = state + self._ranks = ranks or [] + self._iteration = iteration + + def __call__(self, env: CallbackEnv): + if env.iteration == self._iteration: + rank = 0 if self.is_rank_0 else 1 + if rank in self._ranks: + if not ray.get(self._state.has_failed.remote(self._id)): + success = ray.get(self._state.set_failed.remote(self._id)) + if not success: + # Another rank is already about to fail + return + + pid = os.getpid() + print(f"Killing process: {pid} for actor rank {rank}") + time.sleep(1) + os.kill(pid, 9) + + order = 2 + + +class TrackingCallback(_TuneLGBMRank0Mixin): + def __call__(self, env: CallbackEnv): + if self.is_rank_0: + print(f"[Rank 0] I am at iteration {env.iteration}") + + order = 1 + + +def train_ray(path, + num_workers, + num_boost_rounds, + num_files=0, + regression=False, + use_gpu=False, + ray_params=None, + lightgbm_params=None, + **kwargs): + path = os.path.expanduser(path) + if not os.path.exists(path): + raise ValueError(f"Path does not exist: {path}") + + if num_files: + files = sorted(glob.glob(f"{path}/**/*.parquet")) + while num_files > len(files): + files = files + files + path = files[0:num_files] + + use_device_matrix = False + if use_gpu: + try: + import cupy # noqa: F401 + use_device_matrix = True + except ImportError: + use_device_matrix = False + + if use_device_matrix: + dtrain = RayDeviceQuantileDMatrix( + path, + num_actors=num_workers, + label="labels", + ignore=["partition"], + filetype=RayFileType.PARQUET) + else: + dtrain = RayDMatrix( + path, + num_actors=num_workers, + label="labels", + ignore=["partition"], + filetype=RayFileType.PARQUET) + + config = {"device": "cpu" if not use_gpu else "gpu"} + + if not regression: + # Classification + config.update({ + "objective": "binary", + "metric": ["binary_logloss", "binary_error"], + }) + else: + # Regression + config.update({ + "objective": "regression", + "metric": ["l2", "rmse"], + }) + + if lightgbm_params: + config.update(lightgbm_params) + + start = time.time() + evals_result = {} + additional_results = {} + bst = train( + config, + dtrain, + evals_result=evals_result, + additional_results=additional_results, + num_boost_round=num_boost_rounds, + ray_params=ray_params or RayParams( + max_actor_restarts=2, + num_actors=num_workers, + cpus_per_actor=2, + gpus_per_actor=0 if not use_gpu else 1), + evals=[(dtrain, "train")], + **kwargs) + taken = time.time() - start + print(f"TRAIN TIME TAKEN: {taken:.2f} seconds") + + out_file = os.path.expanduser( + "~/benchmark_{}.lgbm".format("cpu" if not use_gpu else "gpu")) + bst.booster_.save_model(out_file) + + print("Final training error: {:.4f}".format(evals_result["train"][ + "binary_error" if not regression else "rmse"][-1])) + return bst, additional_results, taken diff --git a/python/ray/util/lightgbm/simple_example.py b/python/ray/util/lightgbm/simple_example.py new file mode 100644 index 000000000000..c9d2e5c7509f --- /dev/null +++ b/python/ray/util/lightgbm/simple_example.py @@ -0,0 +1,44 @@ +from sklearn import datasets +from sklearn.model_selection import train_test_split + +from ray.util.lightgbm import RayDMatrix, RayParams, train + + +# __lightgbm_begin__ +def main(): + # Load dataset + data, labels = datasets.load_breast_cancer(return_X_y=True) + # Split into train and test set + train_x, test_x, train_y, test_y = train_test_split( + data, labels, test_size=0.25) + + train_set = RayDMatrix(train_x, train_y) + test_set = RayDMatrix(test_x, test_y) + + # Set config + config = { + "objective": "binary", + "metric": ["binary_logloss", "binary_error"], + "max_depth": 3, + } + + evals_result = {} + + # Train the classifier + bst = train( + config, + train_set, + evals=[(test_set, "eval")], + evals_result=evals_result, + ray_params=RayParams(max_actor_restarts=1, num_actors=1), + verbose_eval=False) + + bst.booster_.save_model("simple.lgbm") + print("Final validation error: {:.4f}".format( + evals_result["eval"]["binary_error"][-1])) + + +# __lightgbm_end__ + +if __name__ == "__main__": + main() diff --git a/python/ray/util/lightgbm/simple_tune.py b/python/ray/util/lightgbm/simple_tune.py new file mode 100644 index 000000000000..b05ddd13a481 --- /dev/null +++ b/python/ray/util/lightgbm/simple_tune.py @@ -0,0 +1,95 @@ +from sklearn import datasets +from sklearn.model_selection import train_test_split + +from ray.util.lightgbm import RayDMatrix, RayParams, train + +# __train_begin__ +num_cpus_per_actor = 2 +num_actors = 1 + + +def train_model(config): + # Load dataset + data, labels = datasets.load_breast_cancer(return_X_y=True) + # Split into train and test set + train_x, test_x, train_y, test_y = train_test_split( + data, labels, test_size=0.25) + + train_set = RayDMatrix(train_x, train_y) + test_set = RayDMatrix(test_x, test_y) + + evals_result = {} + bst = train( + params=config, + dtrain=train_set, + evals=[(test_set, "eval")], + evals_result=evals_result, + verbose_eval=False, + ray_params=RayParams( + num_actors=num_actors, cpus_per_actor=num_cpus_per_actor)) + bst.booster_.save_model("model.lgbm") + + +# __train_end__ + + +# __load_begin__ +def load_best_model(best_logdir): + import lightgbm as lgbm + import os + + best_bst = lgbm.Booster(model_file=os.path.join(best_logdir, "model.lgbm")) + return best_bst + + +# __load_end__ + + +def main(): + # __tune_begin__ + from ray import tune + + # Set config + config = { + "objective": "binary", + "metric": ["binary_logloss", "binary_error"], + "eta": tune.loguniform(1e-4, 1e-1), + "subsample": tune.uniform(0.5, 1.0), + "max_depth": tune.randint(1, 9) + } + # __tune_end__ + + # __tune_run_begin__ + analysis = tune.run( + train_model, + config=config, + metric="eval-binary_error", + mode="min", + num_samples=4, + resources_per_trial={ + "cpu": 1, + "extra_cpu": num_actors * num_cpus_per_actor + }) + + # Load in the best performing model. + best_bst = load_best_model(analysis.best_logdir) + + # Use the following code block instead if using Ray Client. + # import ray + # if ray.util.client.ray.is_connected(): + # # If using Ray Client best_logdir is a directory on the server. + # # So we want to make sure we wrap model loading in a task. + # remote_load_fn = ray.remote(load_best_model) + # best_bst = ray.get(remote_load_fn.remote(analysis.best_logdir)) + + # Do something with the best model. + _ = best_bst + + accuracy = 1. - analysis.best_result["eval-binary_error"] + print(f"Best model parameters: {analysis.best_config}") + print(f"Best model total accuracy: {accuracy:.4f}") + # __tune_run_end__ + + +if __name__ == "__main__": + main() diff --git a/python/ray/util/lightgbm/tests/__init__.py b/python/ray/util/lightgbm/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/util/lightgbm/tests/test_client.py b/python/ray/util/lightgbm/tests/test_client.py new file mode 100644 index 000000000000..93ad58a3b28a --- /dev/null +++ b/python/ray/util/lightgbm/tests/test_client.py @@ -0,0 +1,28 @@ +import pytest +import sys + +import ray +from ray.util.client.ray_client_helpers import ray_start_client_server + + +@pytest.fixture +def start_client_server(): + with ray_start_client_server() as client: + yield client + + +def test_simple_example(start_client_server): + assert ray.util.client.ray.is_connected() + from ray.util.lightgbm.simple_example import main + main() + + +def test_simple_tune(start_client_server): + assert ray.util.client.ray.is_connected() + from ray.util.lightgbm.simple_tune import main + main() + + +if __name__ == "__main__": + import pytest + sys.exit(pytest.main(["-v", __file__])) diff --git a/release/lightgbm_tests/README.rst b/release/lightgbm_tests/README.rst new file mode 100644 index 000000000000..ac5094ce97ed --- /dev/null +++ b/release/lightgbm_tests/README.rst @@ -0,0 +1,24 @@ +LightGBM on Ray tests +==================== + +This directory contains various LightGBM on Ray release tests. + +You should run these tests with the `releaser `_ tool. + +Overview +-------- +There are four kinds of tests: + +1. ``distributed_api_test`` - checks general API functionality and should finish very quickly (< 1 minute) +2. ``train_*`` - checks single trial training on different setups. +3. ``tune_*`` - checks multi trial training via Ray Tune. +4. ``ft_*`` - checks fault tolerance. + +Generally the releaser tool will run all tests in parallel, but if you do +it sequentially, be sure to do it in the order above. If ``train_*`` fails, +``tune_*`` will fail, too. + +Acceptance criteria +------------------- +These tests are considered passing when they throw no error at the end of +the output log. diff --git a/release/lightgbm_tests/app_config.yaml b/release/lightgbm_tests/app_config.yaml new file mode 100755 index 000000000000..ef94c62b78c5 --- /dev/null +++ b/release/lightgbm_tests/app_config.yaml @@ -0,0 +1,24 @@ +base_image: "anyscale/ray-ml:pinned-nightly-py37" +env_vars: {} +debian_packages: + - curl + +python: + pip_packages: + - pytest + - lightgbm_ray + - petastorm + - tblib + conda_packages: [] + +post_build_cmds: + - pip uninstall -y numpy ray || true + - sudo rm -rf /home/ray/anaconda3/lib/python3.7/site-packages/numpy + - pip3 install numpy || true + - pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }} + - pip3 install -U lightgbm_ray petastorm # Install latest releases + - sudo mkdir -p /data || true + - sudo chown ray:1000 /data || true + - rm -rf /data/classification.parquet || true + - curl -o create_test_data.py https://raw.githubusercontent.com/ray-project/ray/releases/1.3.0/release/xgboost_tests/create_test_data.py # XGBoost is intended + - python ./create_test_data.py /data/classification.parquet --seed 1234 --num-rows 1000000 --num-cols 40 --num-partitions 100 --num-classes 2 diff --git a/release/lightgbm_tests/cluster_cpu_moderate.yaml b/release/lightgbm_tests/cluster_cpu_moderate.yaml new file mode 100644 index 000000000000..04a5534236ec --- /dev/null +++ b/release/lightgbm_tests/cluster_cpu_moderate.yaml @@ -0,0 +1,38 @@ +cluster_name: ray-lightgbm-release-cpu-moderate + +max_workers: 32 + +upscaling_speed: 32 + +idle_timeout_minutes: 15 + +docker: + image: anyscale/ray:latest + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_4_ondemand: + node_config: + InstanceType: m5.xlarge + resources: {"CPU": 4} + min_workers: 31 + max_workers: 31 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_4_ondemand +worker_default_node_type: cpu_4_ondemand + +file_mounts: { + "~/lightgbm_tests": "." +} + +file_mounts_sync_continuously: false diff --git a/release/lightgbm_tests/cluster_cpu_small.yaml b/release/lightgbm_tests/cluster_cpu_small.yaml new file mode 100644 index 000000000000..c489abdae0c1 --- /dev/null +++ b/release/lightgbm_tests/cluster_cpu_small.yaml @@ -0,0 +1,38 @@ +cluster_name: ray-lightgbm-release-cpu-small + +max_workers: 4 + +upscaling_speed: 32 + +idle_timeout_minutes: 15 + +docker: + image: anyscale/ray:latest + container_name: ray_container + pull_before_run: true + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: false + +available_node_types: + cpu_4_ondemand: + node_config: + InstanceType: m5.xlarge + resources: {"CPU": 4} + min_workers: 3 + max_workers: 3 + +auth: + ssh_user: ubuntu + +head_node_type: cpu_4_ondemand +worker_default_node_type: cpu_4_ondemand + +file_mounts: { + "~/lightgbm_tests": "." +} + +file_mounts_sync_continuously: false diff --git a/release/lightgbm_tests/create_test_data.py b/release/lightgbm_tests/create_test_data.py new file mode 100644 index 000000000000..21b1c8fdd6cb --- /dev/null +++ b/release/lightgbm_tests/create_test_data.py @@ -0,0 +1,58 @@ +import argparse +import numpy as np +import os + +from xgboost_ray.tests.utils import create_parquet + +if __name__ == "__main__": + if "OMP_NUM_THREADS" in os.environ: + del os.environ["OMP_NUM_THREADS"] + + parser = argparse.ArgumentParser(description="Create fake data.") + parser.add_argument( + "filename", type=str, default="/data/parted.parquet/", help="ray/dask") + parser.add_argument( + "-r", + "--num-rows", + required=False, + type=int, + default=1e8, + help="num rows") + parser.add_argument( + "-p", + "--num-partitions", + required=False, + type=int, + default=100, + help="num partitions") + parser.add_argument( + "-c", + "--num-cols", + required=False, + type=int, + default=4, + help="num columns (features)") + parser.add_argument( + "-C", + "--num-classes", + required=False, + type=int, + default=2, + help="num classes") + parser.add_argument( + "-s", + "--seed", + required=False, + type=int, + default=1234, + help="random seed") + + args = parser.parse_args() + + np.random.seed(args.seed) + create_parquet( + args.filename, + num_rows=int(args.num_rows), + num_partitions=int(args.num_partitions), + num_features=int(args.num_cols), + num_classes=int(args.num_classes)) diff --git a/release/lightgbm_tests/lightgbm_tests.yaml b/release/lightgbm_tests/lightgbm_tests.yaml new file mode 100644 index 000000000000..321f57d2ecbe --- /dev/null +++ b/release/lightgbm_tests/lightgbm_tests.yaml @@ -0,0 +1,83 @@ +- name: train_small + cluster: + app_config: app_config.yaml + compute_template: tpl_cpu_small.yaml + + run: + use_connect: True + timeout: 600 + prepare: python wait_cluster.py 4 600 + script: python workloads/train_small.py + +- name: train_moderate + cluster: + app_config: app_config.yaml + compute_template: tpl_cpu_moderate.yaml + + run: + timeout: 600 + prepare: python wait_cluster.py 32 600 + script: python workloads/train_moderate.py + +- name: train_gpu + cluster: + app_config: app_config_gpu.yaml + compute_template: tpl_gpu_small.yaml + + run: + timeout: 600 + prepare: python wait_cluster.py 5 600 + script: python workloads/train_gpu.py + +- name: distributed_api_test + cluster: + app_config: app_config.yaml + compute_template: tpl_cpu_small.yaml + results: + + run: + timeout: 600 + prepare: python wait_cluster.py 4 600 + script: python workloads/distributed_api_test.py + results: "" + +- name: ft_small_non_elastic + cluster: + app_config: app_config.yaml + compute_template: tpl_cpu_small.yaml + + run: + timeout: 900 + prepare: python wait_cluster.py 4 600 + script: python workloads/ft_small_non_elastic.py + results: "" + +- name: tune_small + cluster: + app_config: app_config.yaml + compute_template: tpl_cpu_small.yaml + + run: + timeout: 600 + prepare: python wait_cluster.py 4 600 + script: python workloads/tune_small.py + +- name: tune_32x4 + cluster: + app_config: app_config.yaml + compute_template: tpl_cpu_moderate.yaml + + run: + timeout: 900 + prepare: python wait_cluster.py 32 600 + script: python workloads/tune_32x4.py + +- name: tune_4x32 + cluster: + app_config: app_config.yaml + compute_template: tpl_cpu_moderate.yaml + + run: + timeout: 900 + prepare: python wait_cluster.py 32 600 + script: python workloads/tune_4x32.py diff --git a/release/lightgbm_tests/requirements.txt b/release/lightgbm_tests/requirements.txt new file mode 100644 index 000000000000..b72f1cf6edfb --- /dev/null +++ b/release/lightgbm_tests/requirements.txt @@ -0,0 +1,3 @@ +ray[tune] +lightgbm_ray +xgboost_ray \ No newline at end of file diff --git a/release/lightgbm_tests/setup_lightgbm.sh b/release/lightgbm_tests/setup_lightgbm.sh new file mode 100755 index 000000000000..f0e4d12b3848 --- /dev/null +++ b/release/lightgbm_tests/setup_lightgbm.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +pip install pytest +# Uninstall any existing lightgbm_ray repositories +pip uninstall -y lightgbm_ray || true + +# Install lightgbm package +pip install -U "${LIGHTGBM_RAY_PACKAGE:-lightgbm_ray}" + +# Create test dataset +sudo mkdir -p /data || true +sudo chown ray:1000 /data || true +rm -rf /data/classification.parquet || true +cp -R /tmp/ray_tmp_mount/lightgbm_tests ~/lightgbm_tests || echo "Copy failed" +python ~/lightgbm_tests/create_test_data.py /data/classification.parquet --seed 1234 --num-rows 1000000 --num-cols 40 --num-partitions 100 --num-classes 2 diff --git a/release/lightgbm_tests/tpl_cpu_moderate.yaml b/release/lightgbm_tests/tpl_cpu_moderate.yaml new file mode 100644 index 000000000000..786a4ccdb7e6 --- /dev/null +++ b/release/lightgbm_tests/tpl_cpu_moderate.yaml @@ -0,0 +1,15 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 31 + +head_node_type: + name: head_node + instance_type: m5.xlarge + +worker_node_types: + - name: worker_node + instance_type: m5.xlarge + min_workers: 31 + max_workers: 31 + use_spot: false diff --git a/release/lightgbm_tests/tpl_cpu_small.yaml b/release/lightgbm_tests/tpl_cpu_small.yaml new file mode 100644 index 000000000000..38a81bfd064f --- /dev/null +++ b/release/lightgbm_tests/tpl_cpu_small.yaml @@ -0,0 +1,15 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 3 + +head_node_type: + name: head_node + instance_type: m5.xlarge + +worker_node_types: + - name: worker_node + instance_type: m5.xlarge + min_workers: 3 + max_workers: 3 + use_spot: false diff --git a/release/lightgbm_tests/wait_cluster.py b/release/lightgbm_tests/wait_cluster.py new file mode 100644 index 000000000000..a0f8260bd139 --- /dev/null +++ b/release/lightgbm_tests/wait_cluster.py @@ -0,0 +1,49 @@ +import argparse +import time + +import ray + +ray.init(address="auto") + +parser = argparse.ArgumentParser() +parser.add_argument( + "num_nodes", + type=int, + help="Wait for this number of nodes (includes head)") + +parser.add_argument( + "max_time_s", type=int, help="Wait for this number of seconds") + +parser.add_argument( + "--feedback_interval_s", + type=int, + default=10, + help="Wait for this number of seconds") + +args = parser.parse_args() + +curr_nodes = 0 +start = time.time() +next_feedback = start +max_time = start + args.max_time_s +while not curr_nodes >= args.num_nodes: + now = time.time() + + if now >= max_time: + raise RuntimeError( + f"Maximum wait time reached, but only " + f"{curr_nodes}/{args.num_nodes} nodes came up. Aborting.") + + if now >= next_feedback: + passed = now - start + print(f"Waiting for more nodes to come up: " + f"{curr_nodes}/{args.num_nodes} " + f"({passed:.0f} seconds passed)") + next_feedback = now + args.feedback_interval_s + + time.sleep(5) + curr_nodes = len(ray.nodes()) + +passed = time.time() - start +print(f"Cluster is up: {curr_nodes}/{args.num_nodes} nodes online after " + f"{passed:.0f} seconds") diff --git a/release/lightgbm_tests/workloads/distributed_api_test.py b/release/lightgbm_tests/workloads/distributed_api_test.py new file mode 100644 index 000000000000..ee5c24f101f1 --- /dev/null +++ b/release/lightgbm_tests/workloads/distributed_api_test.py @@ -0,0 +1,26 @@ +"""Distributed LightGBM API test + +This test runs unit tests on a distributed cluster. This will confirm that +LightGBM API features like custom metrics/objectives work with remote +trainables. + +Test owner: Yard1 (primary), krfricke + +Acceptance criteria: Unit tests should pass (requires pytest). +""" + +import ray + +from lightgbm_ray.tests.test_lightgbm_api import LightGBMAPITest + + +class LightGBMDistributedAPITest(LightGBMAPITest): + def _init_ray(self): + if not ray.is_initialized(): + ray.init(address="auto") + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", f"{__file__}::LightGBMDistributedAPITest"])) diff --git a/release/lightgbm_tests/workloads/ft_small_non_elastic.py b/release/lightgbm_tests/workloads/ft_small_non_elastic.py new file mode 100644 index 000000000000..0615df0e8823 --- /dev/null +++ b/release/lightgbm_tests/workloads/ft_small_non_elastic.py @@ -0,0 +1,51 @@ +"""Fault tolerance test (small cluster, non-elastic training) + +In this run, two training actors will die after some time. It is expected that +in both cases lightgbm_ray stops training, restarts the dead actors, and +continues training with all four actors. + +Test owner: Yard1 (primary), krfricke + +Acceptance criteria: Should run through and report final results. Intermediate +output should show that training halts wenn an actor dies and continues only +when all four actors are available again. The test will fail if fault +tolerance did not work correctly. + +Notes: This test seems to be somewhat flaky. This might be due to +race conditions in handling dead actors. This is likely a problem of +the lightgbm_ray implementation and not of this test. +""" +import ray + +from lightgbm_ray import RayParams + + +from ray.util.lightgbm.release_test_util import train_ray, \ + FailureState, FailureInjection, TrackingCallback + +if __name__ == "__main__": + ray.init(address="auto") + + failure_state = FailureState.remote() + + ray_params = RayParams( + max_actor_restarts=2, num_actors=4, cpus_per_actor=4, gpus_per_actor=0) + + _, additional_results, _ = train_ray( + path="/data/classification.parquet", + num_workers=4, + num_boost_rounds=100, + num_files=200, + regression=False, + use_gpu=False, + ray_params=ray_params, + lightgbm_params=None, + callbacks=[ + TrackingCallback(), + FailureInjection( + id="first_fail", state=failure_state, ranks=[1], iteration=14), + FailureInjection( + id="second_fail", state=failure_state, ranks=[0], iteration=34) + ]) + + print("PASSED.") diff --git a/release/lightgbm_tests/workloads/train_moderate.py b/release/lightgbm_tests/workloads/train_moderate.py new file mode 100644 index 000000000000..832b03313b24 --- /dev/null +++ b/release/lightgbm_tests/workloads/train_moderate.py @@ -0,0 +1,49 @@ +"""Moderate cluster training + +This training run will start 32 workers on 32 nodes (including head node). + +Test owner: Yard1 (primary), krfricke + +Acceptance criteria: Should run through and report final results. +""" +import json +import os +import time + +import ray +from lightgbm_ray import RayParams + +from ray.util.lightgbm.release_test_util import train_ray + +if __name__ == "__main__": + ray.init(address="auto") + + ray_params = RayParams( + elastic_training=False, + max_actor_restarts=2, + num_actors=32, + cpus_per_actor=4, + gpus_per_actor=0) + + start = time.time() + train_ray( + path="/data/classification.parquet", + num_workers=32, + num_boost_rounds=100, + num_files=128, + regression=False, + use_gpu=False, + ray_params=ray_params, + lightgbm_params=None, + ) + taken = time.time() - start + + result = { + "time_taken": taken, + } + test_output_json = os.environ.get("TEST_OUTPUT_JSON", + "/tmp/train_moderate.json") + with open(test_output_json, "wt") as f: + json.dump(result, f) + + print("PASSED.") diff --git a/release/lightgbm_tests/workloads/train_small.py b/release/lightgbm_tests/workloads/train_small.py new file mode 100644 index 000000000000..5eb1df9c8cce --- /dev/null +++ b/release/lightgbm_tests/workloads/train_small.py @@ -0,0 +1,67 @@ +"""Small cluster training + +This training run will start 4 workers on 4 nodes (including head node). + +Test owner: Yard1 (primary), krfricke + +Acceptance criteria: Should run through and report final results. +""" +import json +import os +import time + +import ray +from ray.test_utils import wait_for_num_nodes +from lightgbm_ray import RayParams + +from ray.util.lightgbm.release_test_util import train_ray + +if __name__ == "__main__": + addr = os.environ.get("RAY_ADDRESS") + job_name = os.environ.get("RAY_JOB_NAME", "train_small") + if addr.startswith("anyscale://"): + ray.client(address=addr).job_name(job_name).connect() + else: + ray.init(address="auto") + + wait_for_num_nodes( + int(os.environ.get("RAY_RELEASE_MIN_WORKERS", 0)) + 1, 600) + + output = os.environ["TEST_OUTPUT_JSON"] + state = os.environ["TEST_STATE_JSON"] + ray_params = RayParams( + elastic_training=False, + max_actor_restarts=2, + num_actors=4, + cpus_per_actor=4, + gpus_per_actor=0) + + start = time.time() + + @ray.remote + def train(): + os.environ["TEST_OUTPUT_JSON"] = output + os.environ["TEST_STATE_JSON"] = state + train_ray( + path="/data/classification.parquet", + num_workers=4, + num_boost_rounds=100, + num_files=25, + regression=False, + use_gpu=False, + ray_params=ray_params, + lightgbm_params=None, + ) + + ray.get(train.remote()) + taken = time.time() - start + + result = { + "time_taken": taken, + } + test_output_json = os.environ.get("TEST_OUTPUT_JSON", + "/tmp/train_small.json") + with open(test_output_json, "wt") as f: + json.dump(result, f) + + print("PASSED.") diff --git a/release/lightgbm_tests/workloads/train_small_connect.py b/release/lightgbm_tests/workloads/train_small_connect.py new file mode 100644 index 000000000000..c0e5ef10df5d --- /dev/null +++ b/release/lightgbm_tests/workloads/train_small_connect.py @@ -0,0 +1,58 @@ +"""Small cluster training + +This training run will start 4 workers on 4 nodes (including head node). + +Test owner: Yard1 (primary), krfricke + +Acceptance criteria: Should run through and report final results. +""" +import json +import os +import time + +import ray +from lightgbm_ray import RayParams + +from ray.util.lightgbm.release_test_util import train_ray + +if __name__ == "__main__": + addr = os.environ.get("RAY_ADDRESS") + job_name = os.environ.get("RAY_JOB_NAME", "train_small") + if addr.startswith("anyscale://"): + ray.client(address=addr).job_name(job_name).connect() + else: + ray.init(address="auto") + + ray_params = RayParams( + elastic_training=False, + max_actor_restarts=2, + num_actors=4, + cpus_per_actor=4, + gpus_per_actor=0) + + @ray.remote + def train(): + train_ray( + path="/data/classification.parquet", + num_workers=4, + num_boost_rounds=100, + num_files=25, + regression=False, + use_gpu=False, + ray_params=ray_params, + lightgbm_params=None, + ) + + start = time.time() + ray.get(train.remote()) + taken = time.time() - start + + result = { + "time_taken": taken, + } + test_output_json = os.environ.get("TEST_OUTPUT_JSON", + "/tmp/train_small.json") + with open(test_output_json, "wt") as f: + json.dump(result, f) + + print("PASSED.") diff --git a/release/lightgbm_tests/workloads/tune_32x4.py b/release/lightgbm_tests/workloads/tune_32x4.py new file mode 100644 index 000000000000..3ed5f15bb28d --- /dev/null +++ b/release/lightgbm_tests/workloads/tune_32x4.py @@ -0,0 +1,72 @@ +"""Moderate Ray Tune run (32 trials, 4 actors). + +This training run will start 32 Ray Tune trials, each starting 4 actors. +The cluster comprises 32 nodes. + +Test owner: Yard1 (primary), krfricke + +Acceptance criteria: Should run through and report final results, as well +as the Ray Tune results table. No trials should error. All trials should +run in parallel. +""" +from collections import Counter +import json +import os +import time + +import ray +from ray import tune + +from lightgbm_ray import RayParams + +from ray.util.lightgbm.release_test_util import train_ray + + +def train_wrapper(config, ray_params): + train_ray( + path="/data/classification.parquet", + num_workers=4, + num_boost_rounds=100, + num_files=64, + regression=False, + use_gpu=False, + ray_params=ray_params, + lightgbm_params=config, + ) + + +if __name__ == "__main__": + search_space = { + "eta": tune.loguniform(1e-4, 1e-1), + "subsample": tune.uniform(0.5, 1.0), + "max_depth": tune.randint(1, 9) + } + + ray.init(address="auto") + + ray_params = RayParams( + elastic_training=False, + max_actor_restarts=2, + num_actors=4, + cpus_per_actor=1, + gpus_per_actor=0) + + start = time.time() + analysis = tune.run( + tune.with_parameters(train_wrapper, ray_params=ray_params), + config=search_space, + num_samples=32, + resources_per_trial=ray_params.get_tune_resources()) + taken = time.time() - start + + result = { + "time_taken": taken, + "trial_states": dict( + Counter([trial.status for trial in analysis.trials])) + } + test_output_json = os.environ.get("TEST_OUTPUT_JSON", + "/tmp/tune_32x4.json") + with open(test_output_json, "wt") as f: + json.dump(result, f) + + print("PASSED.") diff --git a/release/lightgbm_tests/workloads/tune_4x32.py b/release/lightgbm_tests/workloads/tune_4x32.py new file mode 100644 index 000000000000..5e48998093a3 --- /dev/null +++ b/release/lightgbm_tests/workloads/tune_4x32.py @@ -0,0 +1,72 @@ +"""Moderate Ray Tune run (4 trials, 32 actors). + +This training run will start 4 Ray Tune trials, each starting 32 actors. +The cluster comprises 32 nodes. + +Test owner: Yard1 (primary), krfricke + +Acceptance criteria: Should run through and report final results, as well +as the Ray Tune results table. No trials should error. All trials should +run in parallel. +""" +from collections import Counter +import json +import os +import time + +import ray +from ray import tune + +from lightgbm_ray import RayParams + +from ray.util.lightgbm.release_test_util import train_ray + + +def train_wrapper(config, ray_params): + train_ray( + path="/data/classification.parquet", + num_workers=32, + num_boost_rounds=100, + num_files=128, + regression=False, + use_gpu=False, + ray_params=ray_params, + lightgbm_params=config, + ) + + +if __name__ == "__main__": + search_space = { + "eta": tune.loguniform(1e-4, 1e-1), + "subsample": tune.uniform(0.5, 1.0), + "max_depth": tune.randint(1, 9) + } + + ray.init(address="auto") + + ray_params = RayParams( + elastic_training=False, + max_actor_restarts=2, + num_actors=32, + cpus_per_actor=1, + gpus_per_actor=0) + + start = time.time() + analysis = tune.run( + tune.with_parameters(train_wrapper, ray_params=ray_params), + config=search_space, + num_samples=4, + resources_per_trial=ray_params.get_tune_resources()) + taken = time.time() - start + + result = { + "time_taken": taken, + "trial_states": dict( + Counter([trial.status for trial in analysis.trials])) + } + test_output_json = os.environ.get("TEST_OUTPUT_JSON", + "/tmp/tune_4x32.json") + with open(test_output_json, "wt") as f: + json.dump(result, f) + + print("PASSED.") diff --git a/release/lightgbm_tests/workloads/tune_small.py b/release/lightgbm_tests/workloads/tune_small.py new file mode 100644 index 000000000000..57df4b886f89 --- /dev/null +++ b/release/lightgbm_tests/workloads/tune_small.py @@ -0,0 +1,72 @@ +"""Small Ray Tune run (4 trials, 4 actors). + +This training run will start 4 Ray Tune Trials, each starting 4 actors. +The cluster comprises 4 nodes. + +Test owner: Yard1 (primary), krfricke + +Acceptance criteria: Should run through and report final results, as well +as the Ray Tune results table. No trials should error. All trials should +run in parallel. +""" +from collections import Counter +import json +import os +import time + +import ray +from ray import tune + +from lightgbm_ray import RayParams + +from ray.util.lightgbm.release_test_util import train_ray + + +def train_wrapper(config, ray_params): + train_ray( + path="/data/classification.parquet", + num_workers=4, + num_boost_rounds=100, + num_files=25, + regression=False, + use_gpu=False, + ray_params=ray_params, + lightgbm_params=config, + ) + + +if __name__ == "__main__": + search_space = { + "eta": tune.loguniform(1e-4, 1e-1), + "subsample": tune.uniform(0.5, 1.0), + "max_depth": tune.randint(1, 9) + } + + ray.init(address="auto") + + ray_params = RayParams( + elastic_training=False, + max_actor_restarts=2, + num_actors=4, + cpus_per_actor=1, + gpus_per_actor=0) + + start = time.time() + analysis = tune.run( + tune.with_parameters(train_wrapper, ray_params=ray_params), + config=search_space, + num_samples=4, + resources_per_trial=ray_params.get_tune_resources()) + taken = time.time() - start + + result = { + "time_taken": taken, + "trial_states": dict( + Counter([trial.status for trial in analysis.trials])) + } + test_output_json = os.environ.get("TEST_OUTPUT_JSON", + "/tmp/tune_small.json") + with open(test_output_json, "wt") as f: + json.dump(result, f) + + print("PASSED.")