Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use spark barrier mode to ensure all xgboost worker launched parallelly #5625

Closed
wants to merge 5 commits into from

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented May 1, 2020

Current xgoost-4j-spark use SparkParallelismTracker to wait and check whether there's enough task slots. This approach has two issue:

  • If we set spark cluster executor to be auto scale mode, then this approach won't work. If worker is not enough it won't trigger more executor allocation.
  • Suppose two xgboost training started parallelly and task slots are not enough for parallel run the two training jobs. It is possible that each job allocated part of the slots and then they both get stuck
    in dead lock.

In spark 2.4, barrier mode was introduced, which used to ensure a spark job stage launch all tasks parallelly. This address the issue above.

In this PR , I remove SparkParallelismTracker and update code to use spark barrier mode.

Note: now we cannot set xgboost params timeout_request_workers for each individual job. We have to set spark.scheduler.barrier.maxConcurrentTasksCheck.interval and spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures in spark cluster config. The timeout to wait for enough workers will be equal to spark.scheduler.barrier.maxConcurrentTasksCheck.interval * spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures

Will add test soon but it is ready for first pass review.

@CodingCat
Copy link
Member

is there any battle-tested cases of barrier execution in Spark?

I didn't see enough ROI for such a fundamental change....(and I didn't see changes covering the original behavior that, e.g., stop the application after a single worker failed )

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented May 2, 2020

@CodingCat

... stop the application after a single worker failed

In barrier mode, if one worker failed, all other workers will be also killed and spark job failed.

ROI for such a fundamental change ?

The main benefit will be If we set spark cluster executor to be auto scale mode, then old approach won't work. If worker is not enough it won't trigger more executor allocation. But barrier mode will address it, it will trigger spark cluster to create more executors.

What about we keep old behavior, but add an option for add new barrier mode ?

@CodingCat
Copy link
Member

If we set spark cluster executor to be auto scale mode, then this approach won't work. If worker is not enough it won't trigger more executor allocation.

you mean dynamic allocation? then just check whether users sets minExecutors in dynamic allocation no smaller than numWorkers

Suppose two xgboost training started parallelly and task slots are not enough for parallel run the two training jobs. It is possible that each job allocated part of the slots and then they both get stuck in dead lock.

we never officially support such cases (due to some issues in rabit layer)...even we support, to work around these, add several lines to detect

if static allocation, when launching a training, always check if it is possible to get enough resources given the configured number of executors

if dynamic allocation, I think we can do similarly, (but I really don't think we should spend too much time in dynamic allocation...in Uber, dynamic allocation leads to many difficulties for tuning)

@WeichenXu123
Copy link
Contributor Author

@CodingCat

you mean dynamic allocation? then just check whether users sets minExecutors in dynamic allocation no smaller than numWorkers

This don't address issue I think. e.g., if one xgboost job occupy current executors resources, and started a new xgboost job on the same spark cluster, spark should dynamic allocate more executors rather than waiting task slots on current executors become idle.

@trivialfis
Copy link
Member

@CodingCat

due to some issues in rabit layer

Could you please elaborate a bit on this?

Copy link

@liangz1 liangz1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits

@@ -548,9 +546,6 @@ object XGBoost extends Serializable {
// Train for every ${savingRound} rounds and save the partially completed booster
val tracker = startTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
val (booster, metrics) = try {
val parallelismTracker = new SparkParallelismTracker(sc,
xgbExecParams.timeoutRequestWorkers,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeoutRequestWorkers can be removed from XGBoostExecutionParams.

logger.info(s"Rabit returns with exit code $trackerReturnVal")
val (booster, metrics) = postTrackerReturnProcessing(trackerReturnVal,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L665-692 private def postTrackerReturnProcessing(... is no longer used.

@WeichenXu123
Copy link
Contributor Author

@CodingCat

Suppose two xgboost training started parallelly and task slots are not enough for parallel run the two training jobs. It is possible that each job allocated part of the slots and then they both get stuck in dead lock.

we never officially support such cases (due to some issues in rabit layer)...even we support, to work around these, add several lines to detect

There is a senario that use spark CrossValidation on Xgboost estimator, it will launch Xgboost jobs parallelly. If do not address this issue, deadlock may happen. And seemingly we have no way to workaround (such as add several lines to detect, it does not work, e.g., if two threads check resources and both get available response, then they start there own xgboost jobs parallely, then deadlock will still happen)

And what's the issue you mentioned in "due to some issues in rabit layer" ?

What do you think ?
Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants