Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet Executor] Refine runtime graph #37703

Merged
merged 4 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/fleet_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ endif()

cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc
interceptor.cc compute_interceptor.cc amplifier_interceptor.cc interceptor_message_service.cc message_bus.cc
DEPS proto_desc fleet_executor_desc_proto interceptor_message_proto collective_helper
DEPS proto_desc fleet_executor_desc_proto interceptor_message_proto collective_helper op_registry
${BRPC_DEPS})

if(WITH_DISTRIBUTE)
Expand Down
6 changes: 1 addition & 5 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ FleetExecutor::FleetExecutor(const std::string& exe_desc_str) {
"Error occurs while parsing string to proto"));
}

FleetExecutor::~FleetExecutor() {
// Destroy Executor
}
FleetExecutor::~FleetExecutor() { root_scope_->DropKids(); }

void FleetExecutor::Init(const framework::ProgramDesc& program_desc,
framework::Scope* scope,
Expand Down Expand Up @@ -113,8 +111,6 @@ void FleetExecutor::Run() {
carrier_instance.Start();
}

void FleetExecutor::Release() { root_scope_->DropKids(); }

void FleetExecutor::CopyParameters(int microbatch_id,
const framework::ProgramDesc& program) {
auto& global_block = program.Block(0);
Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/distributed/fleet_executor/fleet_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class FleetExecutor final {
void Init(const framework::ProgramDesc& program_desc, framework::Scope* scope,
const platform::Place& place);
void Run();
void Release();

private:
DISABLE_COPY_AND_ASSIGN(FleetExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ message RankInfo {
}

message FleetExecutorDesc {
optional string grain = 1 [ default = "coarse" ];
optional string strategy = 1 [ default = "Origin" ];
optional int64 cur_rank = 2 [ default = 0 ]; // Rank id of current processor
repeated RankInfo cluster_info = 3;
optional int32 dp_degree = 4 [ default = 1 ];
Expand Down
31 changes: 3 additions & 28 deletions paddle/fluid/distributed/fleet_executor/interceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,9 @@ void Interceptor::Join() {
void Interceptor::RegisterMsgHandle(MsgHandle handle) { handle_ = handle; }

void Interceptor::Handle(const InterceptorMessage& msg) {
if (handle_) {
handle_(msg);
} else {
VLOG(3) << "Interceptor is using default message handler. This handler is "
"only used for test purpose. Check whether you init interceptor "
"in the proper way.";
if (msg.message_type() == DATA_IS_READY) {
if (node_->role() != 2) {
VLOG(3) << "Fake handler is sending DATA_IS_READY message to: "
<< interceptor_id_ + 1 << ".";
InterceptorMessage data_is_ready_msg;
data_is_ready_msg.set_message_type(DATA_IS_READY);
Send(interceptor_id_ + 1, data_is_ready_msg);
} else {
// NOTE: max run time is reach for last interceptor
StopCarrier();
}
} else if (msg.message_type() == STOP) {
stop_ = true;
if (node_->role() != 2) {
VLOG(3) << "Fake handler is sending STOP message to: "
<< interceptor_id_ + 1 << ".";
InterceptorMessage stop_msg;
stop_msg.set_message_type(STOP);
Send(interceptor_id_ + 1, stop_msg);
}
}
}
PADDLE_ENFORCE_NOT_NULL(handle_, platform::errors::PreconditionNotMet(
"Message handle is not registered."));
handle_(msg);
}

void Interceptor::StopCarrier() {
Expand Down
16 changes: 15 additions & 1 deletion paddle/fluid/distributed/fleet_executor/runtime_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,25 @@ std::vector<OpRole> RuntimeGraph::functionality_order = {
RuntimeGraph::RuntimeGraph(const ProgramDesc& program,
const FleetExecutorDesc& exe_desc)
: exe_desc_(exe_desc) {
if (exe_desc.grain() == "coarse") {
if (exe_desc.strategy() == "1F1B") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉可以根据pp的degree来判断,以后可以只要1F1B,不要FThenB。可以以后慢慢完善,gradient merge之类的也可以加

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SplitProgramBasedFunctionality(program);
AssignTaskToIntercepter();
FakeDependence();
FakeRuntimeInfo();
} else if (exe_desc.strategy() == "Origin") {
int64_t cur_rank = exe_desc_.cur_rank();
int64_t max_run_times = exe_desc_.num_micro_batches();
int64_t max_slot_nums = exe_desc_.num_slots();
auto task_node = std::make_unique<TaskNode>(program, cur_rank,
max_run_times, max_slot_nums);
task_node->SetType("Compute");
task_nodes_.emplace_back(std::move(task_node));
int64_t task_id = task_nodes_[0]->task_id();
intercepter_id_to_rank_.insert({task_id, cur_rank});
intercepter_id_to_node_.insert({task_id, task_nodes_[0].get()});
} else {
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Strategy %s is None of 1F1B or Origin.", exe_desc.strategy()));
}
}

Expand Down
7 changes: 7 additions & 0 deletions paddle/fluid/distributed/fleet_executor/task_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"

namespace paddle {
Expand All @@ -30,6 +31,12 @@ TaskNode::TaskNode(const framework::ProgramDesc& program, int64_t rank,
// Should be serially invoked, not thread-safe
static int64_t task_node_cnt = 0;
task_id_ = task_node_cnt++;
for (const auto& op_desc : program.Block(0).AllOps()) {
ops_vec_.emplace_back(framework::OpRegistry::CreateOp(*op_desc));
}
for (const auto& op : ops_vec_) {
ops_.emplace_back(op.get());
}
}

TaskNode::TaskNode(int32_t role, const std::vector<OperatorBase*>& ops,
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/fleet_executor/task_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ class TaskNode final {
private:
DISABLE_COPY_AND_ASSIGN(TaskNode);
TaskNode() = default;
// ops_ will be removed in the future
std::vector<OperatorBase*> ops_;
std::unordered_set<int64_t> upstream_;
std::unordered_set<int64_t> downstream_;
framework::ProgramDesc program_;
std::vector<std::unique_ptr<OperatorBase>> ops_vec_;
int32_t role_;
int64_t rank_;
int64_t task_id_;
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/pybind/bind_fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <pybind11/stl.h>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/place.h"
Expand All @@ -32,8 +33,7 @@ void BindFleetExecutor(py::module* m) {
py::class_<FleetExecutor>(*m, "FleetExecutor")
.def(py::init<const std::string&>())
.def("init", &FleetExecutor::Init)
.def("run", &FleetExecutor::Run)
.def("release", &FleetExecutor::Release);
.def("run", &FleetExecutor::Run);

py::class_<TaskNode>(*m, "TaskNode")
.def(py::init<const framework::ProgramDesc&, int64_t, int64_t, int64_t>())
Expand Down
87 changes: 46 additions & 41 deletions python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ def __init__(self, place=None):
self._enable_interpreter_core = _is_enable_standalone_executor()
self._executor_cache = _ExecutorCache(self.place)

self._fleet_executor_cache = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

之后还是改成program_caches、ctc_caches吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def _get_scope_cache(self, program_cache_key):
return self.scope_caches.get(program_cache_key, None)

Expand Down Expand Up @@ -1960,49 +1962,52 @@ def _run_using_fleet_executor(self,
print_period=100,
fetch_handler=None,
use_program_cache=False):
scope, real_fetch_list, trainer_instance = \
self._prepare_pipeline_ctx(program, dataset, scope, thread,
is_infer, debug, fetch_list, fetch_info,
print_period, fetch_handler,
use_program_cache)
from ..distributed.fleet.proto import fleet_executor_desc_pb2
from google.protobuf import text_format
cur_rank = os.getenv("PADDLE_TRAINER_ID")
trainer_endpoints_str = os.getenv("PADDLE_TRAINER_ENDPOINTS")
fleet_exe_desc = fleet_executor_desc_pb2.FleetExecutorDesc()
nrank = 1
if cur_rank and trainer_endpoints_str:
fleet_exe_desc.cur_rank = int(cur_rank)
trainer_endpoints = trainer_endpoints_str.split(',')
for rank, endpoint in enumerate(trainer_endpoints):
if self._fleet_executor_cache is None:
from ..distributed.fleet.proto import fleet_executor_desc_pb2
from google.protobuf import text_format
cur_rank = os.getenv("PADDLE_TRAINER_ID")
trainer_endpoints_str = os.getenv("PADDLE_TRAINER_ENDPOINTS")
fleet_exe_desc = fleet_executor_desc_pb2.FleetExecutorDesc()
nrank = 1
if cur_rank and trainer_endpoints_str:
fleet_exe_desc.cur_rank = int(cur_rank)
trainer_endpoints = trainer_endpoints_str.split(',')
for rank, endpoint in enumerate(trainer_endpoints):
rank_info = fleet_executor_desc_pb2.RankInfo()
rank_info.rank = rank
rank_info.ip_port = endpoint
fleet_exe_desc.cluster_info.append(rank_info)
nrank = len(trainer_endpoints)
else:
fleet_exe_desc.cur_rank = 0
rank_info = fleet_executor_desc_pb2.RankInfo()
rank_info.rank = rank
rank_info.ip_port = endpoint
rank_info.rank = 0
rank_info.ip_port = ''
fleet_exe_desc.cluster_info.append(rank_info)
nrank = len(trainer_endpoints)
else:
fleet_exe_desc.cur_rank = 0
rank_info = fleet_executor_desc_pb2.RankInfo()
rank_info.rank = 0
rank_info.ip_port = ''
fleet_exe_desc.cluster_info.append(rank_info)
logging.warning("Fleet Executor will run on single device only.")
fleet_opt = program._pipeline_opt["fleet_opt"]
if "dist_strategy" in fleet_opt:
fleet_exe_desc.dp_degree = fleet_opt["dist_strategy"]["dp_degree"]
fleet_exe_desc.mp_degree = fleet_opt["dist_strategy"]["mp_degree"]
fleet_exe_desc.pp_degree = fleet_opt["dist_strategy"]["pp_degree"]
if "num_micro_batches" in fleet_opt:
fleet_exe_desc.num_micro_batches = fleet_opt["num_micro_batches"]
num_of_gpu = fleet_exe_desc.dp_degree * fleet_exe_desc.mp_degree * fleet_exe_desc.pp_degree
assert nrank == num_of_gpu, "The number of rank is not equal to the number of gpu."
fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString())
place = core.Place()
place.set_place(self.place)
fleet_exe.init(program._pipeline_opt["section_program"].desc, scope,
place)
fleet_exe.run()
fleet_exe.release()
logging.warning(
"Fleet Executor will run on single device only.")
fleet_opt = program._pipeline_opt["fleet_opt"]
if "dist_strategy" in fleet_opt:
fleet_exe_desc.dp_degree = fleet_opt["dist_strategy"][
"dp_degree"]
fleet_exe_desc.mp_degree = fleet_opt["dist_strategy"][
"mp_degree"]
fleet_exe_desc.pp_degree = fleet_opt["dist_strategy"][
"pp_degree"]
if "num_micro_batches" in fleet_opt:
fleet_exe_desc.num_micro_batches = fleet_opt[
"num_micro_batches"]
num_of_gpu = fleet_exe_desc.dp_degree * fleet_exe_desc.mp_degree * fleet_exe_desc.pp_degree
assert nrank == num_of_gpu, "The number of rank is not equal to the number of gpu."
fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString())
place = core.Place()
place.set_place(self.place)
if scope is None:
scope = global_scope()
fleet_exe.init(program._pipeline_opt["section_program"].desc, scope,
place)
self._fleet_executor_cache = fleet_exe
self._fleet_executor_cache.run()
return None

def _run_pipeline(self,
Expand Down