Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] add a new trainDistributed API for dataset #5950

Closed
wants to merge 1 commit into from

Conversation

wbo4958
Copy link
Contributor

@wbo4958 wbo4958 commented Jul 28, 2020

This PR adds a new API trainDistributed for dataset, and creates a
TrainIntf trait and four different implementations (train for rank with eval,
train for rank without eval, train for nonrank with eval, train for nonrank
without eval)

This PR does not change any logic of train pipeline, it's just like code-reorganization

This PR adds a new API trainDistributed for dataset, and creates a
TrainIntf trait and four different implementations (train for rank with eval,
train for rank without eval, train for nonrank with eval, train for nonrank
without eval)

This PR does not change any logical of train, it's just like code-reorganization
@wbo4958
Copy link
Contributor Author

wbo4958 commented Jul 28, 2020

Hi @CodingCat @trivialfis @hcho3 @sperlingxx,

Since gpu_hist #5171 has been merged, JVM can also be accelerated by GPUs in Spark3.

But Looking at the whole XGBoost workflow (loading data, ETL, train), specifically, It is only the training process now can be accelerated by GPUs. But Training models or doing prediction, that’s just a pretty small portion of the whole work (especially training has been accelerated by GPUs). Most of our work will be spent on data preparation (data loading, ETL), so the data preparation may slow down the whole pipeline.

So the question is can we also accelerate the ETL pipeline?
The answer is yes!

As of Apache Spark release 3.0 users can schedule GPU resources and can replace the backend for many SQL and dataframe operations so that they are accelerated using GPUs. Actually It is Rapids-plugin/CUDF https://github.com/NVIDIA/spark-rapids which can accelerate SQL/Dataframe.

What I am trying to say is we can leverage Rapids-plugin to accelerate XGBoost4j-Spark end to end from data loading to training. Rapids-plugin loads data into GPU memory with Columnar format and executes operations like join/filter/groupBy with GPUs, that's really pretty fast. After ETL, the data is still stored into GPU memory, So we can build Device-DMatrix in GPU memory, actually xgboost library has supported the cudf_interface way to build DMatrix.
In fact, we have some internal tests which can typically speed up XGBoost4j https://cloud.google.com/blog/products/data-analytics/ml-with-xgboost-gets-faster-with-dataproc-on-gpus and https://databricks.com/blog/2020/05/15/shrink-training-time-and-cost-using-nvidia-gpu-accelerated-xgboost-and-apache-spark-on-databricks.html.

Rapids-plugin/CUDF, I know, is in the rapidly developing. So, I would not like to add rapids-plugin into dependency directly. Instead, I'd like to add an XGBoost plugin to support this.

In order to support plugin for XGBoost. I'd like to have 2 PRs

The first PR (this PR), moving the RDD transforms closely, then we can replace the whole RDD transform in the plugin.

To achieve this,

1. deprecated the trainDistributed for RDD

  /**
   * @return A tuple of the booster and the metrics used to build training summary
   */
  @deprecated("use trainDistributed for Dataset instead of RDD", "1.2.0")
  @throws(classOf[XGBoostError])
  private[spark] def trainDistributed(
      trainingData: RDD[XGBLabeledPoint],
      params: Map[String, Any],
      hasGroup: Boolean = false,
      evalSetsMap: Map[String, RDD[XGBLabeledPoint]] = Map()):
    (Booster, Map[String, Array[Float]]) = {

2. added new trainDistributed for Dataset[_]

  /**
   * @return A tuple of the booster and the metrics used to build training summary
   */
  @throws(classOf[XGBoostError])
  private[spark] def trainDistributed(
      trainingData: Dataset[_],
      dsToRddParams: DataFrameToRDDParams,
      params: Map[String, Any],
      hasGroup: Boolean,
      evalData: Map[String, Dataset[_]]): (Booster, Map[String, Array[Float]]) = {

2.1. added a function in trainDistributed to choose the implementation of train

val trainImpl = chooseTrainImpl(hasGroup, xgbExecParams, evalData)

eg, if detecting XGBoost Plugin, then the trainImpl should be pointing to Plugin implementation, or else, falling back to default CPU pipeline

2.2. converted Dataset to RDD and build Watches

        val boostersAndMetrics = trainImpl.datasetToRDD(trainingData, dsToRddParams).mapPartitions(
          iter => {
            val watches = trainImpl.buildWatches(iter)
            buildDistributedBooster(watches, xgbExecParams, rabitEnv,
              xgbExecParams.obj, xgbExecParams.eval, prevBooster)
        }).cache()

the above code is the main framework. trainingData is a Dataset[_].

The main framework does not concern how trainImpl is implemented, by Plugin or default CPU implementation. actually, it treats everything is plugin.

So no matter that it is Plugin or CPU pipeline, Both should implement datasetToRDD and buildWatches.

BTW, This PR mainly re-architect CPU pipeline, and it will not change the original logic.

the second PR

The second PR should be plugin detecting and replacing. That should be simple

Talk is cheap, Please check the code. Many thx for your precious time.

Any suggestions and comments are appreciated. Thx in advance.

@CodingCat CodingCat self-requested a review July 31, 2020 20:25
Copy link
Member

@CodingCat CodingCat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this has to be done in XGBoost/XGBoost.scala? XGBoost.scala exposes RDD based API to XGBoostClassifier/Regressor to use instead of end users,

you can use various approach to accelerate the construction of DataFrame and pass the result DF to XGBoostClassifier/Regressor...and then start training with GPU-HIST

I don't understand why leaking the data preprocessing to XGBoost.scala and even XGBoost

@wbo4958
Copy link
Contributor Author

wbo4958 commented Aug 2, 2020

Hi @CodingCat, really thx for the review.

why this has to be done in XGBoost/XGBoost.scala? XGBoost.scala exposes RDD based API to XGBoostClassifier/Regressor to use instead of end users,

Yeah, Seems RDD based API of XGBoost.scala is only for RDD[XGBLabeledPoint], and some RDD transforms which also is based on XGBLabeledPoint. Finally, building a booster which is based on XGBLabeledPoint. So you can see, XGBoost.scala is coupling with XGBLabeledPoint.

The Plugin hopes XGBoost.scala to focus on the role of building DMatrix and pass DMatrix to XGB library and train, finally return the Model. As to how to build DMatrix, different plugins may have different implementations. For example, CPU pipeline is utilizing XGDMatrixCreateFromDataIter which based on Iterator to build DMatrix, while The GPU pipeline may need XGBoosterPredictFromArrayInterfaceColumns which is based on the array interface json format, not LabeledPoint any more.

you can use various approach to accelerate the construction of DataFrame and pass the result DF to XGBoostClassifier/Regressor...and then start training with GPU-HIST

I don't understand why leaking the data preprocessing to XGBoost.scala and even XGBoost

Yeah, If we use spark-rapids to implement the Plugin. Considering this situation, the spark-rapids (plugin of Spark can accelerate most of sql/dataframe operators) loads the data into GPU memory with Columnar format from the beginning and then does some operations like filter/agg/join in the GPU memory.

If we still use RDD[XGBLabeledPoint] which means we need to convert the Columnar to Row and construct LabeledPoint, which will cause the data copying from GPU to CPU. And then build DMatrix on CPU, finally train on GPU.

But if we use plugin way and the XGBoosterPredictFromArrayInterfaceColumns to build DMatrix, then the memory will not fallback to CPU any more and DMatrix building also happens in GPU, it's really pretty fast.

sql/dataframe building DMatrix train
RDD[XGBLabeldPoint cpu cpu gpu
Plugin gpu gpu gpu

@CodingCat
Copy link
Member

CodingCat commented Aug 2, 2020

@wbo4958 thanks for the explanation! now I understand why you choose to have this PR...

regarding your last comparison

sql/dataframe building DMatrix train
RDD[XGBLabeldPoint] cpu cpu gpu
Plugin gpu gpu gpu

if we use RDD[XGBLabeldPoint] , sql/dataframe is not necessarily in CPU? as I said, you can build dataframe with GPU and then only building DMatrix would be on CPU (tho I agree that there is an extra memory copy overhead)

  1. the major concern here is , we SHOULD AVOID build a data preprocessing interface framework in XGBoost, it is hard to imagine what will happen after years, if any change is needed, that means another round of API deprecating and renew

why don't we just adding a new API to accept RDD[DMatrix] in XGBoost.scala. we can do some type checking in XGBoost.scala, if RDD[DMatrix]is passed in , we can skip all transformations in current implementation and directly going to training phase

in this way, we leave a complete flexibility to different hardware vendors to have their own implementation of data transformers, for XGBoost, we only care about DMatrix

@CodingCat
Copy link
Member

in XGBoostClassifier/XGBoostRegressor, seems we can also accept Dataset[DMatrix] and skip all logic currently just for CPU?

trainImpl.cleanUpDriver()
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of code here are just copy paste from the original method?

at least we should provide some abstraction to avoid duplicate code

@CodingCat
Copy link
Member

CodingCat commented Aug 3, 2020

after checking the PR, I think there is some lack of understanding of the architecture of XGBoost-Spark. I would like to highlight the design principle of XGBoost.scala: this file should be kept as very low level, it integrates with Spark and XGBoost's JVM binding with their lowest level interface . Many years ago, when we started this project, it is through RDD[XGBoostLabeledPoint] which is the only option at that moment.

When the new hw surges up, it seems this level of abstraction is partial, as XGBoostLabeledPoint is essentially an abstraction for XGBoost input data on host RAM. I think we should adjust the abstraction provided by this file

Following this path, we should either improve XGBoostLabeledPoint to cover device memory which doesn't seem to be a good solution for many factors, or we should provide the integration with RDD[DMatrix]. As DMatrix is the unified interface to interact with XGBoost data.

In the current PR, Dataset is a high level abstraction of Spark's distributed structured data (higher than RDD) which means providing more restriction on how this file can be used, _ means a mixing of various layers in a layered design. Both of these are violating the original design principles

@wbo4958
Copy link
Contributor Author

wbo4958 commented Aug 3, 2020

Hi @CodingCat, many thx for your review in your spare time.

after checking the PR, I think there is some lack of understanding of the architecture of XGBoost-Spark. I would like to highlight the design principle of XGBoost.scala: this file should be kept as very low level, it integrates with Spark and XGBoost's JVM binding with their lowest level interface . Many years ago, when we started this project, it is through RDD[XGBoostLabeledPoint] which is the only option at that moment.

Thx for sharing the original design principle and Seem the design is quite reasonable.

When the new hw surges up, it seems this level of abstraction is partial, as XGBoostLabeledPoint is essentially an abstraction for XGBoost input data on host RAM. I think we should adjust the abstraction provided by this file

Following this path, we should either improve XGBoostLabeledPoint to cover device memory which doesn't seem to be a good solution for many factors, or we should provide the integration with RDD[DMatrix]. As DMatrix is the unified interface to interact with XGBoost data.

In the current PR, Dataset is a high level abstraction of Spark's distributed structured data (higher than RDD) which means providing more restriction on how this file can be used, _ means a mixing of various layers in a layered design. Both of these are violating the original design principles

For your two concerns, I may have my own explanation.

  1. extract the XGBLabeledPoint related code from XGBoost.scala into a separate file (because it's the CPU pipeline implementation, Seems it does not belong to the XGBoost.scala anymore if we plan to support plugins. (I didn't extract the XGBLabeledPoint in this PR, because that's a little aggressive. it may change a lot of cod )
  2. let XGBoost.scala behaves like a bridge that connects Spark and XGB JVM binding.
    xgboost-plugin

So if the XGBoost.scala is behaving like a bridge, then main framework of the bridge is converting dataset to RDD, build DMatrix in RDD, and finally, train. the bridge (XGBoost.scala) doesn't care about how to implement each step of the main framework of the bridge. it only cares about if the main framework is working.

But I would like to say, RDD[DMatrix] is a brilliant idea, I have an initial PR for RDD[DMatrix] #5972 implementation. There may be one issue, When building DMatrix, XGBoost native may need to do some sync between different workers (right now, the sync is not enabled, but I suppose it will be enabled recently, @trivialfis please correct me). So the building DMatrix should be put into Rabit env. So in this case, seems we can't move the dmatrix building into rabit env.

@CodingCat
Copy link
Member

this can be closed since we are in favor of #5972?

@CodingCat
Copy link
Member

close as the latest contribution is at #5972

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants