Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Merging RABIT into XGBoost. #5995

Closed
6 of 7 tasks
trivialfis opened this issue Aug 8, 2020 · 17 comments
Closed
6 of 7 tasks

[RFC] Merging RABIT into XGBoost. #5995

trivialfis opened this issue Aug 8, 2020 · 17 comments

Comments

@trivialfis
Copy link
Member

trivialfis commented Aug 8, 2020

Background

This is a RFC for merging RABIT into XGBoost. For a long time, RABIT enables support of distributed training for XGBoost and is integrated as a git submodule. Most of tests are run on XGBoost and the code base of XGBoost is tightly connected to RABIT. For example, serialization is built on rabit serializable interface, nccl unique ID is obtained from RABIT, quantile merging between workers is based on RABIT's serializable allreduce handler. Because there are more matured MPI solutions like OpenMPI and UCX for CPU, RABIT did not get too much attention beyond XGBoost. Eventually maintaining RABIT in a separated repository creates more overhead on developers than actual benefits, which is one of the reasons that RABIT is rarely updated. Also we plan to sunset the allreduce implementation in RABIT in the future and utilize other widely adopted MPI solutions listed previously. Merging RABIT into XGBoost first will allow us achieve that incrementally with sufficient tests.

Plan

Future work

  • Rework on the OpenMPI backend to have better support for new backend (to be decided), probably adding nccl as a new backend too. This way we can have both old and new backends enabled for a smooth transition.

Concerns

As if we replace RABIT with other MPI solution and drop single point model recovery, would it be better if we don't merge it at all? This seems much cleaner, but as mentioned previously XGBoost is tightly connected to RABIT, also every change on RABIT must be tested on XGBoost first before merging. The replacement won't be trivial and I would like to do it incrementally and carefully.

@tqchen
Copy link
Member

tqchen commented Aug 9, 2020

Let me put my 2 cents here.

The rabit protocol important for integration with existing opensource ecosystem, e.g. Dask, spark and flink. The fault tolerance part is important for scaling in real world settings where a node can fail. The rabit protocol allows single/multiple worker to fail and restart at other machines, while continue the learning process.

Single worker failure and restart is quite common in real world production env, due to pre-emption or spot instance on the cloud. Normal MPI-based solutions usually requires all node to be alive and no failure at all, which brings more cost for real world production settings.

While NCCL or OpenMPI can be supported, they are only useful in an HPC based setting, which are more expensive, and less elastic overall.

Due to the above reasons. I think we should continue to support the rabit as the first class-citizen and should remain as the primary protocol we use for distributed training

@trivialfis
Copy link
Member Author

@tqchen Thanks for the comment. Thus far single point recovery doesn't work on XGBoost as bootstrapping is still an unresolved issue. We haven't decided what to do with it so we might continue extending the previous work on it or drop it completely in XGBoost. The best scenario from my perspective is we can have it enabled on top of other MPI solutions. If we want more development on it, incubating it inside XGBoost seems better than developing it as a stand alone project at this stage as most of the tests are running on XGBoost. From our perspective @CodingCat @hcho3 merging RABIT into XGBoost can ease the development a lot.

@tqchen
Copy link
Member

tqchen commented Aug 9, 2020

Thanks for the clarification. My comment is mainly to clarify the goals and technical path. Not the code merging

I did see single point recovery work in previous cases, as long as the distributed protocol strictly follows the rabit protocol.

Hopefully the motivation was clear, that in production systems single point failure can happen more often and being able to handle them is critical to make the system scalable in a prod setting. I did recall the code worked for single point failure before.

If it does not work for the current version m, we should spend some effort to look into it and resolve the issue.

I am not against moving rabit into the codebase.

However I do not agree that the path forward is to build the solution on top of existing MPI interfaces. Most MPI API brings requirements that goes beyond allreduce, and it is impossible to resolve single point of failure in common MPI based setting — this is the motivation of the original the rabit protocol.

One goal of distributed XGBoost is to be able to embed the job into common distributed runtimes with the minimum requirement of fail-restart for fault tolerance.

We should continue to make the rabit based protocol as the primary way to drive such needs and to only make MPI as something that is optional

@trivialfis
Copy link
Member Author

Got it. The RABIT protocol is difficult to follow for most of the existing pipelines in XGBoost as we need synchronization during DMatrix construction. So the real issue is how to recover DMatrix in a multi threaded (async) environment. I'm not sure about the case you have seen as who did the test did not made the test part of XGBoost and so far I haven't seen any working example. I believe it's possible to make it working in general XGBoost pipelines but with good amount of effort. Let me sort out a more detailed roadmap on how do we proceed with this.

@trivialfis
Copy link
Member Author

trivialfis commented Aug 9, 2020

@tqchen I just thought about the spot instance use case. I'm not sure how single point model recovery can help. Since once a worker got unplugged all the other workers can not proceed until there's a replacement worker available. So wouldn't it be better if we just check point the model and stop/restart the training altogether? XGBoost does work and being tested for training continuation. For a concrete example, if we are training using 4 workers and 1 of them got unplugged during the 3th iteration, we can simply checkpoint the 2 constructed forests/trees, and start training from 3 iteration again with re-partitioned data on remaining workers, or on a new set of workers according to some scheduling algorithms?

What I'm confused is, to me the only use case for single point model recovery in spot instance is we let all the other workers idle and wait for a new worker from scheduler.

The question is only for better understanding of the use case, not for arguing whether should we remove the feature.

@tqchen
Copy link
Member

tqchen commented Aug 9, 2020

In most of the distributed execution env(e.g. spark, hadoop, dask). The data partitioning is done as part of the pipeline, in most such cases, the basic primitive are

mapGroup(data, some-func)

Within the map function, the data is partitioned(by the execution env) into several group, and each of the group contains one partition of the dataset. Most of these frameworks also have fail-restart mechanism built in, which means if one of the worker, say worker0 get unplugged due to pre-emption or failure. The specific part of the job will restart on another instance somewhere. The rabit protocol fits into such setups naturally.

While we could do things like repartitioning the data, or using other tricks, the data shuffling mechanisms need to be implemented in xgboost itself and breaks the principle of natural integration with distributed exec env.

More importantly, in cases where hundreds of workers are involved in the training. The cost of restarting a single worker is much smaller than restarting all the workers all together. Imagine each worker have a probability p for each worker to be pre-empted during the iteration, and n is the number of workers. The success probability of restart all together approach(which means all workers need to be alive for a single iteration) goes down as we increase n, which is a quiet undesirable problem. That means the bigger the dataset we want to train, there is a larger probability of failure.

In the case of single point failure recovery, while it is true that other workers need to stay idle while waiting for a single worker to restart, the cost of restarting the a single, or small number of workers is small, and is done by most of the distributed exec env.

The problem of initial statistics sync in DMatrix construction can be resolved by dividing the protocol into initial and iteration phase. This is exactly the problem we aim to tackle in a real prod setting. For example, i know there are some uber's internal use-cases that relies on this feature, so that the distributed learning process can scale to the internal cluster where pre-emption can easily happen.

@trivialfis
Copy link
Member Author

trivialfis commented Aug 9, 2020

Thanks for the clarification, the failure probability bit helped a lot, also the possible cost savings part. I will consider putting more effort into supporting single point model recovery in the future, would also like to hear opinions from others.

But just to to clear, according to @CodingCat the feature is not used, also the bootstrapping work done by @chenqin laid down a solid foundation but it still has its unaddressed limitations so the feature can not be used yet. Hence I believe enabling the feature is not a process of simple debugging but requires more thoughts and works.

I have some initial ideas on how to extend @chenqin s work on this, but before anything can happen I need to get opinions from other maintainers whether they want to share the responsibility of maintaining it, as I personally don't want to maintain a comm primitive library. (Which is why I want this feature built on top of other matured MPI solution)

@trivialfis
Copy link
Member Author

Right now spark package kills the spark context in the face of failure, which I believe is to avoid hanging caused by model recovery.

@tqchen
Copy link
Member

tqchen commented Aug 9, 2020

Again back to the point of being able to embed into distributed execution env. Note-ably there are two problems in relying on an existing MPI solution:

  • A MPI library may not be readily available, nor compatible with the existing distributed execution env like Spark/Dask. As a matter of fact, most MPI implementations present themselves as exclusive execution environment without fault tolerance baked in.
    • This is one of the main reason that we go with rabit, as the primitive allows us to inject an allreduce mechanism that is compatible with these exec envs.
  • MPI contains a bigger collection of primitives that goes beyond allreduce, as a result, and it is impossible to design a similar single point fault tolerance protocol for a MPI based solution(because of its generality).

@trivialfis
Copy link
Member Author

Thanks for the detailed explanation. Let me dig into the issues you listed. :-)

@trivialfis
Copy link
Member Author

trivialfis commented Aug 9, 2020

As a matter of fact, most MPI implementations present themselves as exclusive execution environment

Indeed ..

This was referenced Aug 11, 2020
@CodingCat
Copy link
Member

my 2 cents here

for us to use in a production environment, fail all and recover from checkpoint is good enough. As many other frameworks like TF seems also go in this path. Based on this, to maintain Rabit the complexity of which are mostly comes from recovery logic is really an overhead

However, there are cases that the single point recovery is useful, e.g. I do see sometimes the Spark executors are preempted but returned back within 5 mins , for this case, re-starting the whole Spark applications is less efficient than just hanging all other executors there and wait for things to be stable...

I discussed with @trivialfis regarding a similar story happened in Spark community. Originally we had Akka as the RPC call framework, which just looks like rabit here, most of charming features are about fault-tolerance, etc. but Spark didn't use it at all but pay the maintenance cost for dependency, etc. Later, Spark implements its own RPC framework with the same interface with Akka. There were 1-2 versions where Akka/Spark-RPC co-exists for the convenience of falling back until Spark-RPC fully replaced Akka.

For us, can we have a simple allreduce framework based on the same interface with Rabit which facilities the co-existence of multiple frameworks. Even when someone wants to implement a new AllReduce framework like Tiger, as long as she/he implements these interface, it can be integrated with XGB easily (it's easy to do in JVM world, not sure C++....).

@RAMitchell
Copy link
Member

Some thoughts from me:

After @tqchen's explanation I see the benefit of single worker recovery. My concern is that we don't have enough contributors with expertise here to implement and make the feature robust. I think it needs a lot of work, and even then may not be super robust due to so many edge cases in xgboost, such as trying to recover with process_type=update or with GPU training.

If we remove single worker recovery and windows support (both broken), as well as apply clang-tidy analysis, we have a chance to get the code base into something we can maintain.

@tqchen
Copy link
Member

tqchen commented Aug 14, 2020

I think the bottom line is that we need to make sure that the interface are compatible, and can be embedded into the distributed exec env like Dask/Spark. There is a lot of values in ability to embed an allreduce into the distributed execution env. And I see value of that in rabit beyond the single point recovery ability.

It is fine to drop in and replace with other allreduce frameworks as long as the interface is standardized around the rabit style API for cases like GPU(where NCCL makes more sense)

@trivialfis
Copy link
Member Author

trivialfis commented Aug 14, 2020

It's fairly easy on C++, we just need to add a backend. MPI is a no-go as mentioned by @tqchen . But something more primitive like UCX might still be a viable option, I will need to look into it in with more details. Single point recovery is a nice idea, especially in the face of growing size of data. So if the RABIT protocol can be enabled as a thin layer on top of another comm library I'm fine with it. But actually developing a comm library seems to be a bit too much for this feature.

@trivialfis
Copy link
Member Author

#6112 Removes the single point recovery in XGBoost.

@trivialfis
Copy link
Member Author

Merging RABIT is considered complete. Future development will continue as part of XGBoost as discussed. Thanks for everyone who participated, suggestions and guidance are invaluable. @tqchen @CodingCat @RAMitchell

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants