Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce verbosity for ray events #226

Merged
merged 4 commits into from
Aug 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions xgboost_ray/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ class RayParams:
Defaults to 0 (no retries). Set to -1 for unlimited retries.
checkpoint_frequency (int): How often to save checkpoints. Defaults
to ``5`` (every 5th iteration).
verbose (bool): Whether to output Ray-specific info messages
during training/prediction.
"""
# Actor scheduling
num_actors: int = 0
Expand All @@ -382,6 +384,8 @@ class RayParams:
# Distributed callbacks
distributed_callbacks: Optional[List[DistributedCallback]] = None

verbose: Optional[bool] = None

def get_tune_resources(self):
"""Return the resources to use for xgboost_ray training with Tune."""
if self.cpus_per_actor <= 0 or self.num_actors <= 0:
Expand Down Expand Up @@ -422,6 +426,9 @@ def _validate_ray_params(ray_params: Union[None, RayParams, dict]) \
warnings.warn(
f"`num_actors` in `ray_params` is smaller than 2 "
f"({ray_params.num_actors}). XGBoost will NOT be distributed!")
if ray_params.verbose is None:
# In Tune sessions, reduce verbosity
ray_params.verbose = not is_session_enabled()
return ray_params


Expand Down Expand Up @@ -928,6 +935,9 @@ def _train(params: Dict,
from xgboost_ray.elastic import _maybe_schedule_new_actors, \
_update_scheduled_actor_states, _get_actor_alive_status

# Do not modify original parameters
params = params.copy()

# Un-schedule possible scheduled restarts
_training_state.restart_training_at = None

Expand All @@ -942,6 +952,13 @@ def _train(params: Dict,
params["nthread"] = cpus_per_actor
params["n_jobs"] = cpus_per_actor

if ray_params.verbose:
maybe_log = logger.info
params.setdefault("verbosity", 1)
else:
maybe_log = logger.debug
params.setdefault("verbosity", 0)

# This is a callback that handles actor failures.
# We identify the rank of the failed actor, add this to a set of
# failed actors (which we might want to restart later), and set its
Expand Down Expand Up @@ -977,9 +994,10 @@ def handle_actor_failure(actor_id):
newly_created += 1

alive_actors = sum(1 for a in _training_state.actors if a is not None)
logger.info(f"[RayXGBoost] Created {newly_created} new actors "
f"({alive_actors} total actors). Waiting until actors "
f"are ready for training.")

maybe_log(f"[RayXGBoost] Created {newly_created} new actors "
f"({alive_actors} total actors). Waiting until actors "
f"are ready for training.")

# For distributed datasets (e.g. Modin), this will initialize
# (and fix) the assignment of data shards to actor ranks
Expand Down Expand Up @@ -1022,7 +1040,7 @@ def handle_actor_failure(actor_id):
_get_actor_alive_status(_training_state.actors, handle_actor_failure)
raise RayActorError from exc

logger.info("[RayXGBoost] Starting XGBoost training.")
maybe_log("[RayXGBoost] Starting XGBoost training.")

# Start Rabit tracker for gradient sharing
rabit_process, env = _start_rabit_tracker(alive_actors)
Expand Down Expand Up @@ -1513,10 +1531,15 @@ def _wrapped(*args, **kwargs):
train_additional_results["training_time_s"] = total_training_time
train_additional_results["total_time_s"] = total_time

logger.info("[RayXGBoost] Finished XGBoost training on training data "
"with total N={total_n:,} in {total_time_s:.2f} seconds "
"({training_time_s:.2f} pure XGBoost training time).".format(
**train_additional_results))
if ray_params.verbose:
maybe_log = logger.info
else:
maybe_log = logger.debug

maybe_log("[RayXGBoost] Finished XGBoost training on training data "
"with total N={total_n:,} in {total_time_s:.2f} seconds "
"({training_time_s:.2f} pure XGBoost training time).".format(
**train_additional_results))

_shutdown(
actors=actors,
Expand All @@ -1538,6 +1561,11 @@ def _predict(model: xgb.Booster, data: RayDMatrix, ray_params: RayParams,
**kwargs):
_assert_ray_support()

if ray_params.verbose:
maybe_log = logger.info
else:
maybe_log = logger.debug

if not ray.is_initialized():
ray.init()

Expand All @@ -1553,7 +1581,7 @@ def _predict(model: xgb.Booster, data: RayDMatrix, ray_params: RayParams,
distributed_callbacks=ray_params.distributed_callbacks)
for i in range(ray_params.num_actors)
]
logger.info(f"[RayXGBoost] Created {len(actors)} remote actors.")
maybe_log(f"[RayXGBoost] Created {len(actors)} remote actors.")

# Split data across workers
wait_load = []
Expand All @@ -1570,7 +1598,7 @@ def _predict(model: xgb.Booster, data: RayDMatrix, ray_params: RayParams,
# Put model into object store
model_ref = ray.put(model)

logger.info("[RayXGBoost] Starting XGBoost prediction.")
maybe_log("[RayXGBoost] Starting XGBoost prediction.")

# Train
fut = [actor.predict.remote(model_ref, data, **kwargs) for actor in actors]
Expand Down