Skip to content

Commit

Permalink
Add enable_categorical, better detection for being in a PG (#235)
Browse files Browse the repository at this point in the history
XGBoost>=1.5 added a new dmatrix param, `enable_categorical`. Adding it here.

This PR also makes the detection of the case of already being in a placement group more broad and not limited to just Tune. This allows for xgboost-ray to be ran in nested tasks that are not related to Tune (this should be two PRs really, but I needed one branch to patch it for hackathon).


Signed-off-by: Antoni Baum <[email protected]>
  • Loading branch information
Yard1 authored Sep 20, 2022
1 parent 705a592 commit 536b702
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
38 changes: 21 additions & 17 deletions xgboost_ray/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ def _get_dmatrix(data: RayDMatrix, param: Dict) -> xgb.DMatrix:
"feature_types": data.feature_types,
"missing": data.missing,
}

if data.enable_categorical is not None:
dm_param["enable_categorical"] = data.enable_categorical

param.update(dm_param)
it = RayDataIter(**param)
matrix = xgb.DeviceQuantileDMatrix(it, **dm_param)
Expand Down Expand Up @@ -342,6 +346,9 @@ def _get_dmatrix(data: RayDMatrix, param: Dict) -> xgb.DMatrix:
if "qid" not in inspect.signature(xgb.DMatrix).parameters:
param.pop("qid", None)

if data.enable_categorical is not None:
param["enable_categorical"] = data.enable_categorical

matrix = xgb.DMatrix(**param)

if not LEGACY_MATRIX:
Expand Down Expand Up @@ -855,25 +862,22 @@ def _create_placement_group(cpus_per_actor, gpus_per_actor,


def _create_communication_processes(added_tune_callback: bool = False):
# Create Queue and Event actors and make sure to colocate with driver node.
node_ip = get_node_ip_address()
# Have to explicitly set num_cpus to 0.
placement_option = {"num_cpus": 0}
if added_tune_callback:
# If Tune is using placement groups, then we force Queue and
current_pg = get_current_placement_group()
if current_pg is not None:
# If we are already in a placement group, let's use it
# Also, if we are specifically in Tune, let's
# ensure that we force Queue and
# StopEvent onto same bundle as the Trainable.
# This forces all 3 to be on the same node.
current_pg = get_current_placement_group()
if current_pg is None:
# This means the user is not using Tune PGs after all -
# e.g. via setting an environment variable.
placement_option.update({"resources": {f"node:{node_ip}": 0.01}})
else:
placement_option.update({
"placement_group": current_pg,
"placement_group_bundle_index": 0
})
placement_option.update({
"placement_group": current_pg,
"placement_group_bundle_index": 0 if added_tune_callback else -1
})
else:
# Create Queue and Event actors and make sure to colocate with
# driver node.
node_ip = get_node_ip_address()
placement_option.update({"resources": {f"node:{node_ip}": 0.01}})
queue = Queue(actor_options=placement_option) # Queue actor
stop_event = Event(actor_options=placement_option) # Stop event actor
Expand Down Expand Up @@ -1327,7 +1331,7 @@ def _wrapped(*args, **kwargs):
"Please disable elastic_training in RayParams in "
"order to use xgboost_ray with Tune.")

if added_tune_callback:
if added_tune_callback or get_current_placement_group():
# Don't autodetect resources when used with Tune.
cpus_per_actor = ray_params.cpus_per_actor
gpus_per_actor = max(0, ray_params.gpus_per_actor)
Expand Down Expand Up @@ -1408,7 +1412,7 @@ def _wrapped(*args, **kwargs):

placement_strategy = None
if not ray_params.elastic_training:
if added_tune_callback:
if added_tune_callback or get_current_placement_group():
# Tune is using placement groups, so the strategy has already
# been set. Don't create an additional placement_group here.
placement_strategy = None
Expand Down
13 changes: 12 additions & 1 deletion xgboost_ray/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(
label_upper_bound: List[Optional[Data]],
feature_names: Optional[List[str]],
feature_types: Optional[List[np.dtype]],
enable_categorical: Optional[bool],
):
super(RayDataIter, self).__init__()

Expand All @@ -98,6 +99,7 @@ def __init__(
self._label_upper_bound = label_upper_bound
self._feature_names = feature_names
self._feature_types = feature_types
self.enable_categorical = enable_categorical

self._iter = 0

Expand Down Expand Up @@ -129,7 +131,8 @@ def next(self, input_data: Callable):
label_lower_bound=self._prop(self._label_lower_bound),
label_upper_bound=self._prop(self._label_upper_bound),
feature_names=self._feature_names,
feature_types=self._feature_types)
feature_types=self._feature_types,
enable_categorical=self._enable_categorical)
self._iter += 1
return 1

Expand All @@ -146,6 +149,7 @@ def __init__(self,
feature_names: Optional[List[str]] = None,
feature_types: Optional[List[np.dtype]] = None,
qid: Optional[Data] = None,
enable_categorical: Optional[bool] = None,
filetype: Optional[RayFileType] = None,
ignore: Optional[List[str]] = None,
**kwargs):
Expand All @@ -159,6 +163,7 @@ def __init__(self,
self.feature_names = feature_names
self.feature_types = feature_types
self.qid = qid
self.enable_categorical = enable_categorical

self.data_source = None
self.actor_shards = None
Expand Down Expand Up @@ -655,6 +660,7 @@ def __init__(self,
feature_names: Optional[List[str]] = None,
feature_types: Optional[List[np.dtype]] = None,
qid: Optional[Data] = None,
enable_categorical: Optional[bool] = None,
num_actors: Optional[int] = None,
filetype: Optional[RayFileType] = None,
ignore: Optional[List[str]] = None,
Expand All @@ -677,6 +683,7 @@ def __init__(self,
self.feature_names = feature_names
self.feature_types = feature_types
self.qid = qid
self.enable_categorical = enable_categorical
self.missing = missing

self.num_actors = num_actors
Expand Down Expand Up @@ -706,6 +713,7 @@ def __init__(self,
label_upper_bound=label_upper_bound,
feature_names=feature_names,
feature_types=feature_types,
enable_categorical=enable_categorical,
filetype=filetype,
ignore=ignore,
qid=qid,
Expand All @@ -721,6 +729,7 @@ def __init__(self,
label_upper_bound=label_upper_bound,
feature_names=feature_names,
feature_types=feature_types,
enable_categorical=enable_categorical,
filetype=filetype,
ignore=ignore,
qid=qid,
Expand Down Expand Up @@ -829,6 +838,7 @@ def __init__(self,
feature_names: Optional[List[str]] = None,
feature_types: Optional[List[np.dtype]] = None,
qid: Optional[Data] = None,
enable_categorical: Optional[bool] = None,
*args,
**kwargs):
if cp is None:
Expand All @@ -852,6 +862,7 @@ def __init__(self,
feature_names=feature_names,
feature_types=feature_types,
qid=qid,
enable_categorical=enable_categorical,
*args,
**kwargs)

Expand Down

0 comments on commit 536b702

Please sign in to comment.