diff --git a/xgboost_ray/main.py b/xgboost_ray/main.py index 24c66aaa..cef66350 100644 --- a/xgboost_ray/main.py +++ b/xgboost_ray/main.py @@ -315,6 +315,10 @@ def _get_dmatrix(data: RayDMatrix, param: Dict) -> xgb.DMatrix: "feature_types": data.feature_types, "missing": data.missing, } + + if data.enable_categorical is not None: + dm_param["enable_categorical"] = data.enable_categorical + param.update(dm_param) it = RayDataIter(**param) matrix = xgb.DeviceQuantileDMatrix(it, **dm_param) @@ -342,6 +346,9 @@ def _get_dmatrix(data: RayDMatrix, param: Dict) -> xgb.DMatrix: if "qid" not in inspect.signature(xgb.DMatrix).parameters: param.pop("qid", None) + if data.enable_categorical is not None: + param["enable_categorical"] = data.enable_categorical + matrix = xgb.DMatrix(**param) if not LEGACY_MATRIX: @@ -855,25 +862,22 @@ def _create_placement_group(cpus_per_actor, gpus_per_actor, def _create_communication_processes(added_tune_callback: bool = False): - # Create Queue and Event actors and make sure to colocate with driver node. - node_ip = get_node_ip_address() # Have to explicitly set num_cpus to 0. placement_option = {"num_cpus": 0} - if added_tune_callback: - # If Tune is using placement groups, then we force Queue and + current_pg = get_current_placement_group() + if current_pg is not None: + # If we are already in a placement group, let's use it + # Also, if we are specifically in Tune, let's + # ensure that we force Queue and # StopEvent onto same bundle as the Trainable. - # This forces all 3 to be on the same node. - current_pg = get_current_placement_group() - if current_pg is None: - # This means the user is not using Tune PGs after all - - # e.g. via setting an environment variable. - placement_option.update({"resources": {f"node:{node_ip}": 0.01}}) - else: - placement_option.update({ - "placement_group": current_pg, - "placement_group_bundle_index": 0 - }) + placement_option.update({ + "placement_group": current_pg, + "placement_group_bundle_index": 0 if added_tune_callback else -1 + }) else: + # Create Queue and Event actors and make sure to colocate with + # driver node. + node_ip = get_node_ip_address() placement_option.update({"resources": {f"node:{node_ip}": 0.01}}) queue = Queue(actor_options=placement_option) # Queue actor stop_event = Event(actor_options=placement_option) # Stop event actor @@ -1327,7 +1331,7 @@ def _wrapped(*args, **kwargs): "Please disable elastic_training in RayParams in " "order to use xgboost_ray with Tune.") - if added_tune_callback: + if added_tune_callback or get_current_placement_group(): # Don't autodetect resources when used with Tune. cpus_per_actor = ray_params.cpus_per_actor gpus_per_actor = max(0, ray_params.gpus_per_actor) @@ -1408,7 +1412,7 @@ def _wrapped(*args, **kwargs): placement_strategy = None if not ray_params.elastic_training: - if added_tune_callback: + if added_tune_callback or get_current_placement_group(): # Tune is using placement groups, so the strategy has already # been set. Don't create an additional placement_group here. placement_strategy = None diff --git a/xgboost_ray/matrix.py b/xgboost_ray/matrix.py index 914a8ca3..bc1410c4 100644 --- a/xgboost_ray/matrix.py +++ b/xgboost_ray/matrix.py @@ -83,6 +83,7 @@ def __init__( label_upper_bound: List[Optional[Data]], feature_names: Optional[List[str]], feature_types: Optional[List[np.dtype]], + enable_categorical: Optional[bool], ): super(RayDataIter, self).__init__() @@ -98,6 +99,7 @@ def __init__( self._label_upper_bound = label_upper_bound self._feature_names = feature_names self._feature_types = feature_types + self.enable_categorical = enable_categorical self._iter = 0 @@ -129,7 +131,8 @@ def next(self, input_data: Callable): label_lower_bound=self._prop(self._label_lower_bound), label_upper_bound=self._prop(self._label_upper_bound), feature_names=self._feature_names, - feature_types=self._feature_types) + feature_types=self._feature_types, + enable_categorical=self._enable_categorical) self._iter += 1 return 1 @@ -146,6 +149,7 @@ def __init__(self, feature_names: Optional[List[str]] = None, feature_types: Optional[List[np.dtype]] = None, qid: Optional[Data] = None, + enable_categorical: Optional[bool] = None, filetype: Optional[RayFileType] = None, ignore: Optional[List[str]] = None, **kwargs): @@ -159,6 +163,7 @@ def __init__(self, self.feature_names = feature_names self.feature_types = feature_types self.qid = qid + self.enable_categorical = enable_categorical self.data_source = None self.actor_shards = None @@ -655,6 +660,7 @@ def __init__(self, feature_names: Optional[List[str]] = None, feature_types: Optional[List[np.dtype]] = None, qid: Optional[Data] = None, + enable_categorical: Optional[bool] = None, num_actors: Optional[int] = None, filetype: Optional[RayFileType] = None, ignore: Optional[List[str]] = None, @@ -677,6 +683,7 @@ def __init__(self, self.feature_names = feature_names self.feature_types = feature_types self.qid = qid + self.enable_categorical = enable_categorical self.missing = missing self.num_actors = num_actors @@ -706,6 +713,7 @@ def __init__(self, label_upper_bound=label_upper_bound, feature_names=feature_names, feature_types=feature_types, + enable_categorical=enable_categorical, filetype=filetype, ignore=ignore, qid=qid, @@ -721,6 +729,7 @@ def __init__(self, label_upper_bound=label_upper_bound, feature_names=feature_names, feature_types=feature_types, + enable_categorical=enable_categorical, filetype=filetype, ignore=ignore, qid=qid, @@ -829,6 +838,7 @@ def __init__(self, feature_names: Optional[List[str]] = None, feature_types: Optional[List[np.dtype]] = None, qid: Optional[Data] = None, + enable_categorical: Optional[bool] = None, *args, **kwargs): if cp is None: @@ -852,6 +862,7 @@ def __init__(self, feature_names=feature_names, feature_types=feature_types, qid=qid, + enable_categorical=enable_categorical, *args, **kwargs)