Skip to content

Commit

Permalink
[Breaking][jvm-packages] Use barrier execution mode (dmlc#7836)
Browse files Browse the repository at this point in the history
With the introduction of the barrier execution mode. we don't need to kill SparkContext when some xgboost tasks failed. Instead, Spark will handle the errors for us. So in this PR, `killSparkContextOnWorkerFailure` parameter is deleted.
  • Loading branch information
wbo4958 authored and trivialfis committed Apr 29, 2022
1 parent f24b592 commit 98c3db2
Show file tree
Hide file tree
Showing 15 changed files with 60 additions and 516 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import scala.collection.mutable
import scala.util.Random
import scala.collection.JavaConverters._

import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker}
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
import ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams
Expand All @@ -30,8 +31,9 @@ import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import org.apache.commons.io.FileUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.fs.FileSystem

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkParallelismTracker, TaskContext}
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.sql.SparkSession

/**
Expand Down Expand Up @@ -79,8 +81,7 @@ private[scala] case class XGBoostExecutionParams(
earlyStoppingParams: XGBoostExecutionEarlyStoppingParams,
cacheTrainingSet: Boolean,
treeMethod: Option[String],
isLocal: Boolean,
killSparkContextOnWorkerFailure: Boolean) {
isLocal: Boolean) {

private var rawParamMap: Map[String, Any] = _

Expand Down Expand Up @@ -224,9 +225,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
val cacheTrainingSet = overridedParams.getOrElse("cache_training_set", false)
.asInstanceOf[Boolean]

val killSparkContext = overridedParams.getOrElse("kill_spark_context_on_worker_failure", true)
.asInstanceOf[Boolean]

val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
missing, allowNonZeroForMissing, trackerConf,
timeoutRequestWorkers,
Expand All @@ -235,8 +233,7 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
xgbExecEarlyStoppingParams,
cacheTrainingSet,
treeMethod,
isLocal,
killSparkContext)
isLocal)
xgbExecParam.setRawParamMap(overridedParams)
xgbExecParam
}
Expand Down Expand Up @@ -351,7 +348,11 @@ object XGBoost extends Serializable {
watches.toMap, metrics, obj, eval,
earlyStoppingRound = numEarlyStoppingRounds, prevBooster)
}
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
if (TaskContext.get().partitionId() == 0) {
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
} else {
Iterator.empty
}
} catch {
case xgbException: XGBoostError =>
logger.error(s"XGBooster worker $taskId has failed $attempt times due to ", xgbException)
Expand Down Expand Up @@ -409,15 +410,10 @@ object XGBoost extends Serializable {
// Train for every ${savingRound} rounds and save the partially completed booster
val tracker = startTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
val (booster, metrics) = try {
val parallelismTracker = new SparkParallelismTracker(sc,
xgbExecParams.timeoutRequestWorkers,
xgbExecParams.numWorkers,
xgbExecParams.killSparkContextOnWorkerFailure)

tracker.getWorkerEnvs().putAll(xgbRabitParams)
val rabitEnv = tracker.getWorkerEnvs

val boostersAndMetrics = trainingRDD.mapPartitions { iter => {
val boostersAndMetrics = trainingRDD.barrier().mapPartitions { iter => {
var optionWatches: Option[() => Watches] = None

// take the first Watches to train
Expand All @@ -430,24 +426,14 @@ object XGBoost extends Serializable {
xgbExecParams.eval, prevBooster)}
.getOrElse(throw new RuntimeException("No Watches to train"))

}}.cache()

val sparkJobThread = new Thread() {
override def run() {
// force the job
boostersAndMetrics.foreachPartition(() => _)
}
}
sparkJobThread.setUncaughtExceptionHandler(tracker)

val trackerReturnVal = parallelismTracker.execute {
sparkJobThread.start()
tracker.waitFor(0L)
}
}}

val (booster, metrics) = boostersAndMetrics.collect()(0)
val trackerReturnVal = tracker.waitFor(0L)
logger.info(s"Rabit returns with exit code $trackerReturnVal")
val (booster, metrics) = postTrackerReturnProcessing(trackerReturnVal,
boostersAndMetrics, sparkJobThread)
if (trackerReturnVal != 0) {
throw new XGBoostError("XGBoostModel training failed.")
}
(booster, metrics)
} finally {
tracker.stop()
Expand All @@ -467,42 +453,12 @@ object XGBoost extends Serializable {
case t: Throwable =>
// if the job was aborted due to an exception
logger.error("the job was aborted due to ", t)
if (xgbExecParams.killSparkContextOnWorkerFailure) {
sc.stop()
}
throw t
} finally {
optionalCachedRDD.foreach(_.unpersist())
}
}

private def postTrackerReturnProcessing(
trackerReturnVal: Int,
distributedBoostersAndMetrics: RDD[(Booster, Map[String, Array[Float]])],
sparkJobThread: Thread): (Booster, Map[String, Array[Float]]) = {
if (trackerReturnVal == 0) {
// Copies of the final booster and the corresponding metrics
// reside in each partition of the `distributedBoostersAndMetrics`.
// Any of them can be used to create the model.
// it's safe to block here forever, as the tracker has returned successfully, and the Spark
// job should have finished, there is no reason for the thread cannot return
sparkJobThread.join()
val (booster, metrics) = distributedBoostersAndMetrics.first()
distributedBoostersAndMetrics.unpersist(false)
(booster, metrics)
} else {
try {
if (sparkJobThread.isAlive) {
sparkJobThread.interrupt()
}
} catch {
case _: InterruptedException =>
logger.info("spark job thread is interrupted")
}
throw new XGBoostError("XGBoostModel training failed")
}
}

}

class Watches private[scala] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,8 @@ private[spark] trait LearningTaskParams extends Params {

final def getMaximizeEvaluationMetrics: Boolean = $(maximizeEvaluationMetrics)

/**
* whether killing SparkContext when training task fails
*/
final val killSparkContextOnWorkerFailure = new BooleanParam(this,
"killSparkContextOnWorkerFailure", "whether killing SparkContext when training task fails")

setDefault(objective -> "reg:squarederror", baseScore -> 0.5, trainTestRatio -> 1.0,
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false, killSparkContextOnWorkerFailure -> true)
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false)
}

private[spark] object LearningTaskParams {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1 +1 @@
log4j.logger.org.apache.spark=ERROR
log4j.logger.org.apache.spark=ERROR
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,7 @@ package ml.dmlc.xgboost4j.scala.spark
import java.io.File

import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, ExternalCheckpointManager, XGBoost => SXGBoost}
import org.scalatest.{FunSuite, Ignore}
import org.scalatest.FunSuite
import org.apache.hadoop.fs.{FileSystem, Path}

class ExternalCheckpointManagerSuite extends FunSuite with TmpFolderPerSuite with PerTest {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,8 @@

package ml.dmlc.xgboost4j.scala.spark

import ml.dmlc.xgboost4j.java.XGBoostError
import org.apache.spark.Partitioner
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import org.scalatest.FunSuite
import org.apache.spark.sql.functions._

Expand Down Expand Up @@ -53,7 +51,7 @@ class FeatureSizeValidatingSuite extends FunSuite with PerTest {
"objective" -> "binary:logistic",
"num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true, "missing" -> 0)
import DataUtils._
val sparkSession = SparkSession.builder().getOrCreate()
val sparkSession = ss
import sparkSession.implicits._
val repartitioned = sc.parallelize(Synthetic.trainWithDiffFeatureSize, 2)
.map(lp => (lp.label, lp)).partitionBy(
Expand Down
Loading

0 comments on commit 98c3db2

Please sign in to comment.