Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR output] UX issues of new AIR output for release test: air_benchmark_xgboost_cpu_10 #33822

Closed
scottsun94 opened this issue Mar 28, 2023 · 3 comments
Assignees
Labels
bug Something that is supposed to be working; but isn't P2 Important issue, but not time-critical ray-team-created Ray Team created

Comments

@scottsun94
Copy link
Contributor

scottsun94 commented Mar 28, 2023

What happened + What you expected to happen

  1. Saw this warning. This seems irrelevant in this case and can be suppressed?
2023-03-28 13:51:14,756 WARNING trial_runner.py:1576 -- The maximum number of pending trials has been automatically set to the number of available cluster CPUs, which is high (176 CPUs/pending trials). If you're running an experiment with a large number of trials, this could lead to scheduling overhead. In this case, consider setting the `TUNE_MAX_PENDING_TRIALS_PG` environment variable to the desired maximum number of concurrent trials.
  1. Empty config is printed.
Training started with configuration:
{}

Let's follow your suggestion in this case: #33810 (comment)

#if there is no config available to be printed
XGBoostTrainer started

# If there are configs we can print:
TorchTrainer started with following configurations:
train_loop_config:
    batch_size: 128
    num_epochs: 20
    num_features: num_features
    lr: 0.001
  1. Some warnings might be confusing to users because they mention trials that users don't use in their script.
2023-03-28 14:04:06,409 WARNING util.py:244 -- The `process_trial_save` operation took 3.927 s, which may be a performance bottleneck.
2023-03-28 14:04:06,409 WARNING trial_runner.py:887 -- Consider turning off forced head-worker trial checkpoint syncs by setting sync_on_checkpoint=False. Note that this may result in faulty trial restoration if a failure occurs while the checkpoint is being synced from the worker to the head node.
  1. Not sure why I see this every 30 seconds when I set AIR_VERBOSITY=1
(XGBoostTrainer pid=1608, ip=10.0.40.81) 2023-03-28 13:53:38,085        INFO main.py:1175 -- Training in progress (60 seconds since last restart).
(XGBoostTrainer pid=1608, ip=10.0.40.81) 2023-03-28 13:54:08,166        INFO main.py:1175 -- Training in progress (90 seconds since last restart).
(XGBoostTrainer pid=1608, ip=10.0.40.81) 2023-03-28 13:54:38,253        INFO main.py:1175 -- Training in progress (120 seconds since last restart).
  1. why do we only printed checkpoint info for iteration 11?
Training finished iter 9 at 2023-03-28 14:02:49 (running for 00:11:34.47)
time_this_iter_s: 68.1873939037323
time_total_s: 691.2493581771851
train-error: 0.15105200000000002
train-logloss: 0.42479700000000004
training_iteration: 9

(XGBoostTrainer pid=1608, ip=10.0.40.81) 2023-03-28 14:03:10,289        INFO main.py:1175 -- Training in progress (632 seconds since last restart).
(XGBoostTrainer pid=1608, ip=10.0.40.81) 2023-03-28 14:03:40,472        INFO main.py:1175 -- Training in progress (663 seconds since last restart).
Training finished iter 10 at 2023-03-28 14:04:00 (running for 00:12:45.82)
time_this_iter_s: 71.34764289855957
time_total_s: 762.5970010757446
train-error: 0.14691899999999997
train-logloss: 0.41702300000000003
training_iteration: 10

Training finished iter 11 at 2023-03-28 14:04:02 (running for 00:12:47.72)
time_this_iter_s: 1.8985095024108887
time_total_s: 764.4955105781555
train-error: 0.14691899999999997
train-logloss: 0.41702300000000003
training_iteration: 11

Training saved checkpoint for iter 11 at /home/ray/ray_results/XGBoostTrainer_2023-03-28_13-51-13/XGBoostTrainer_46e21_00000_0_2023-03-28_13-51-14/checkpoint_000010

2023-03-28 14:04:06,409 WARNING util.py:244 -- The `process_trial_save` operation took 3.927 s, which may be a performance bottleneck.
2023-03-28 14:04:06,409 WARNING trial_runner.py:887 -- Consider turning off forced head-worker trial checkpoint syncs by setting sync_on_checkpoint=False. Note that this may result in faulty trial restoration if a failure occurs while the checkpoint is being synced from the worker to the head node.
Training (11 iters) finished at 2023-03-28 14:04:06 (running for 00:12:51.66)
time_this_iter_s: 1.8985095024108887
time_total_s: 764.4955105781555
train-error: 0.14691899999999997
train-logloss: 0.41702300000000003
training_iteration: 11

Versions / Dependencies

Nightly

Reproduction script

xgboost_benchmark.py in release test

from functools import wraps
import json
import multiprocessing
from multiprocessing import Process
import os
import time
import traceback
import xgboost as xgb

import ray
from ray import data
from ray.train.xgboost import (
    XGBoostTrainer,
    XGBoostCheckpoint,
    XGBoostPredictor,
)
from ray.train.batch_predictor import BatchPredictor
from ray.air.config import ScalingConfig

_XGB_MODEL_PATH = "model.json"
_TRAINING_TIME_THRESHOLD = 1000
_PREDICTION_TIME_THRESHOLD = 450

_EXPERIMENT_PARAMS = {
    "smoke_test": {
        "data": (
            "https://air-example-data-2.s3.us-west-2.amazonaws.com/"
            "10G-xgboost-data.parquet/8034b2644a1d426d9be3bbfa78673dfa_000000.parquet"
        ),
        "num_workers": 1,
        "cpus_per_worker": 1,
    },
    "10G": {
        "data": "s3://air-example-data-2/10G-xgboost-data.parquet/",
        "num_workers": 1,
        "cpus_per_worker": 12,
    },
    "100G": {
        "data": "s3://air-example-data-2/100G-xgboost-data.parquet/",
        "num_workers": 10,
        "cpus_per_worker": 12,
    },
}


def run_and_time_it(f):
    """Runs f in a separate process and times it."""

    @wraps(f)
    def wrapper(*args, **kwargs):
        class MyProcess(Process):
            def __init__(self, *args, **kwargs):
                super(MyProcess, self).__init__(*args, **kwargs)
                self._pconn, self._cconn = multiprocessing.Pipe()
                self._exception = None

            def run(self):
                try:
                    super(MyProcess, self).run()
                except Exception as e:
                    tb = traceback.format_exc()
                    print(tb)
                    self._cconn.send(e)

            @property
            def exception(self):
                if self._pconn.poll():
                    self._exception = self._pconn.recv()
                return self._exception

        p = MyProcess(target=f, args=args, kwargs=kwargs)
        start = time.monotonic()
        p.start()
        p.join()
        if p.exception:
            raise p.exception
        time_taken = time.monotonic() - start
        print(f"{f.__name__} takes {time_taken} seconds.")
        return time_taken

    return wrapper


@run_and_time_it
def run_xgboost_training(data_path: str, num_workers: int, cpus_per_worker: int):
    ds = data.read_parquet(data_path)
    params = {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    }

    trainer = XGBoostTrainer(
        scaling_config=ScalingConfig(
            num_workers=num_workers,
            resources_per_worker={"CPU": cpus_per_worker},
        ),
        label_column="labels",
        params=params,
        datasets={"train": ds},
    )
    result = trainer.fit()
    checkpoint = XGBoostCheckpoint.from_checkpoint(result.checkpoint)
    xgboost_model = checkpoint.get_model()
    xgboost_model.save_model(_XGB_MODEL_PATH)
    ray.shutdown()


@run_and_time_it
def run_xgboost_prediction(model_path: str, data_path: str):
    model = xgb.Booster()
    model.load_model(model_path)
    ds = data.read_parquet(data_path)
    ckpt = XGBoostCheckpoint.from_model(booster=model)
    batch_predictor = BatchPredictor.from_checkpoint(ckpt, XGBoostPredictor)
    result = batch_predictor.predict(
        ds.drop_columns(["labels"]),
        # Improve prediction throughput for xgboost with larger
        # batch size than default 4096
        batch_size=8192,
    )

    for _ in result.iter_batches():
        pass

    return result


def main(args):
    experiment = args.size if not args.smoke_test else "smoke_test"
    experiment_params = _EXPERIMENT_PARAMS[experiment]

    data_path, num_workers, cpus_per_worker = (
        experiment_params["data"],
        experiment_params["num_workers"],
        experiment_params["cpus_per_worker"],
    )
    print("Running xgboost training benchmark...")
    training_time = run_xgboost_training(data_path, num_workers, cpus_per_worker)
    print("Running xgboost prediction benchmark...")
    prediction_time = run_xgboost_prediction(_XGB_MODEL_PATH, data_path)
    result = {
        "training_time": training_time,
        "prediction_time": prediction_time,
    }
    print("Results:", result)
    test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/result.json")
    with open(test_output_json, "wt") as f:
        json.dump(result, f)

    if not args.disable_check:
        if training_time > _TRAINING_TIME_THRESHOLD:
            raise RuntimeError(
                f"Training on XGBoost is taking {training_time} seconds, "
                f"which is longer than expected ({_TRAINING_TIME_THRESHOLD} seconds)."
            )

        if prediction_time > _PREDICTION_TIME_THRESHOLD:
            raise RuntimeError(
                f"Batch prediction on XGBoost is taking {prediction_time} seconds, "
                f"which is longer than expected ({_PREDICTION_TIME_THRESHOLD} seconds)."
            )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("--size", type=str, choices=["10G", "100G"], default="100G")
    # Add a flag for disabling the timeout error.
    # Use case: running the benchmark as a documented example, in infra settings
    # different from the formal benchmark's EC2 setup.
    parser.add_argument(
        "--disable-check",
        action="store_true",
        help="disable runtime error on benchmark timeout",
    )
    parser.add_argument("--smoke-test", action="store_true")
    args = parser.parse_args()
    main(args)

Issue Severity

Low: It annoys or frustrates me.

@scottsun94 scottsun94 added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Mar 28, 2023
@xwjiang2010 xwjiang2010 added this to the Tune Console Output milestone Mar 28, 2023
@xwjiang2010 xwjiang2010 added P2 Important issue, but not time-critical air ray-team-created Ray Team created and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Mar 28, 2023
@xwjiang2010
Copy link
Contributor

why do we only printed checkpoint info for iteration 11?

It's actually iteration 10. This is user configured. And when user doesn't instruct checkpoint to happen, the output won't have it.

@xwjiang2010
Copy link
Contributor

For 2, it will be fixed by: #33811

@scottsun94
Copy link
Contributor Author

Created separate issues for them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P2 Important issue, but not time-critical ray-team-created Ray Team created
Projects
None yet
Development

No branches or pull requests

3 participants