Skip to content

Commit

Permalink
Merge pull request #1753 from TeslaZhao/develop
Browse files Browse the repository at this point in the history
大模型分布式推理
  • Loading branch information
TeslaZhao authored Apr 25, 2022
2 parents 89f24dd + 447d970 commit 984f969
Show file tree
Hide file tree
Showing 31 changed files with 1,164 additions and 188 deletions.
4 changes: 2 additions & 2 deletions cmake/paddlepaddle.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ message( "WITH_GPU = ${WITH_GPU}")
# Paddle Version should be one of:
# latest: latest develop build
# version number like 1.5.2
SET(PADDLE_VERSION "2.2.2")
SET(PADDLE_VERSION "2.3.0-rc0")
if (WITH_GPU)
message("CUDA: ${CUDA_VERSION}, CUDNN_MAJOR_VERSION: ${CUDNN_MAJOR_VERSION}")
# cuda 11.0 is not supported, 11.2 would be added.
Expand Down Expand Up @@ -177,7 +177,7 @@ if (NOT WITH_MKLML)
endif()

ADD_LIBRARY(paddle_inference STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.a)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.so)
if (WITH_ASCEND_CL)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.so)
endif()
Expand Down
27 changes: 23 additions & 4 deletions core/configure/proto/server_configure.proto
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,27 @@ message EngineDesc {
optional bool enable_overrun = 32 [ default = false ];
optional bool allow_split_request = 33 [ default = true ];
optional int32 min_subgraph_size = 34 [ default = 3 ];
map<string,string> min_input_shape = 35;
map<string,string> max_input_shape = 36;
map<string,string> opt_input_shape = 37;
map<string, string> min_input_shape = 35;
map<string, string> max_input_shape = 36;
map<string, string> opt_input_shape = 37;

/*
* Distributed inference params
* "enable_dist_model": enable distributed model, false default.
* "carrier_id": mark carrier
* "dist_cfg_file": file name of distributed configure.
* "dist_nranks": number of distributed nodes.
* "dist_endpoints": all endpoints(ip:port) of distributed nodes.
* "dist_subgraph_index": distributed subgraph index, auto increment from 0.
* It is
* used to select the endpoint of the current shard in distribute model.
*/
optional bool enable_dist_model = 40 [ default = false ];
optional string dist_carrier_id = 41 [ default = "inference" ];
optional string dist_cfg_file = 42;
optional int32 dist_nranks = 43 [ default = 0 ];
repeated string dist_endpoints = 44;
optional int32 dist_subgraph_index = 45 [ default = 0 ];
};

// model_toolkit conf
Expand Down Expand Up @@ -100,7 +118,8 @@ message DAGNodeDependency {
message DAGNode {
required string name = 1;
required string type = 2;
repeated DAGNodeDependency dependencies = 3;
repeated string address = 3;
repeated DAGNodeDependency dependencies = 4;
};

// workflow entry
Expand Down
24 changes: 15 additions & 9 deletions core/general-server/op/general_reader_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") var num: " << var_num
<< ") start to call load general model_conf op";
if (var_num < 1) {
LOG(ERROR) << "(logid=" << log_id << ") Failed get feed_var, var_num="
<< var_num;
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get feed_var, var_num=" << var_num;
return -1;
}

Expand All @@ -98,7 +98,7 @@ int GeneralReaderOp::inference() {
int64_t elem_type = 0;
int64_t elem_size = 0;
int64_t databuf_size = 0;
const void* src_ptr = nullptr;
const void *src_ptr = nullptr;
for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor paddleTensor;
const Tensor &tensor = req->tensor(i);
Expand All @@ -107,7 +107,7 @@ int GeneralReaderOp::inference() {
elem_size = 0;
databuf_size = 0;
elem_type = tensor.elem_type();
src_ptr = nullptr ;
src_ptr = nullptr;
if (elem_type == P_INT64) { // int64
elem_size = sizeof(int64_t);
paddleTensor.dtype = paddle::PaddleDType::INT64;
Expand Down Expand Up @@ -157,18 +157,24 @@ int GeneralReaderOp::inference() {
<< "dtype=" << paddleTensor.dtype << ";"
<< "data_len=" << data_len;
if (src_ptr == nullptr) {
LOG(ERROR) << "Not support var[" << i << "] with elem_type["
<< elem_type << "]";
LOG(ERROR) << "Not support var[" << i << "] with elem_type[" << elem_type
<< "]";
continue;
}
// implement lod tensor here
// only support 1-D lod
// TODO(HexToString): support 2-D lod
if (tensor.lod_size() > 0) {
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
paddleTensor.lod.resize(1);
int lod_index = -1;
for (int k = 0; k < tensor.lod_size(); ++k) {
paddleTensor.lod[0].push_back(tensor.lod(k));
if (tensor.lod(k) == 0) {
lod_index++;
paddleTensor.lod.resize(lod_index + 1);
}
paddleTensor.lod[lod_index].push_back(tensor.lod(k));
VLOG(2) << "(logid=" << log_id << ") lod[" << lod_index
<< "]=" << tensor.lod(k);
}
}

Expand All @@ -191,7 +197,7 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has lod_tensor and len=" << out->at(i).lod[0].back();
}
void* dst_ptr = out->at(i).data.data();
void *dst_ptr = out->at(i).data.data();
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
Expand Down
6 changes: 4 additions & 2 deletions core/general-server/proto/general_model_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,13 @@ message Request {
message Response {
repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2;
bool profile_server = 3;
uint64 log_id = 4;
// Error code
int32 err_no = 3;
int32 err_no = 5;

// Error messages
string err_msg = 4;
string err_msg = 6;
};

message ModelOutput {
Expand Down
2 changes: 1 addition & 1 deletion core/predictor/framework/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ int CubeCache::reload_data(const std::string& cache_path) {

// loading data from cache files
if (stat(cache_path.c_str(), &st) < 0 || !S_ISDIR(st.st_mode)) {
LOG(ERROR) << "invalid cache path " << cache_path;
LOG(WARNING) << "No cube cache directory " << cache_path << " provided, ignore it";
return -1;
}
if ((dp = opendir(cache_path.c_str())) == nullptr) {
Expand Down
7 changes: 6 additions & 1 deletion core/predictor/framework/dag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) {
node->id = i + 1; // 0 is reserved for begginer-op
node->name = conf.nodes(i).name();
node->type = conf.nodes(i).type();
for (int add_index = 0; add_index < conf.nodes(i).address_size();
++add_index) {
node->address.push_back(conf.nodes(i).address(add_index));
}
uint32_t depend_size = conf.nodes(i).dependencies_size();
for (uint32_t j = 0; j < depend_size; j++) {
const configure::DAGNodeDependency& depend =
Expand Down Expand Up @@ -159,7 +163,8 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) {
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) {
DagNode* node = _index_nodes[nid];
LOG(INFO) << "OP-" << node->id << "-" << node->name << "-" << node->type
<< " depends: " << node->depends.size();
<< " depends: " << node->depends.size()
<< " address: " << node->address.size();

boost::unordered_map<std::string, EdgeMode>::iterator it;
for (it = node->depends.begin(); it != node->depends.end(); it++) {
Expand Down
1 change: 1 addition & 0 deletions core/predictor/framework/dag.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct DagNode {
std::string name; // opname
std::string full_name; // workflow_stageindex_opname
std::string type;
std::vector<std::string> address;
void* conf;
boost::unordered_map<std::string, EdgeMode> depends;
};
Expand Down
1 change: 1 addition & 0 deletions core/predictor/framework/dag_view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ int DagView::init(Dag* dag,
node->name,
node->type,
node->conf,
node->address,
log_id) != 0) {
LOG(WARNING) << "(logid=" << log_id
<< ") Failed init op, type:" << node->type;
Expand Down
15 changes: 8 additions & 7 deletions core/predictor/framework/infer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
#include "core/predictor/framework/memory.h"
#include "core/predictor/framework/predictor_metric.h"
#include "paddle_inference_api.h" // NOLINT
#include "experimental/float16.h"
//#include "experimental/float16.h"
#include "experimental/phi/common/float16.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
Expand Down Expand Up @@ -548,9 +549,9 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
int8_t* data = static_cast<int8_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
} else if ((*tensorVector_in_pointer)[i].dtype ==
paddle::PaddleDType::FLOAT16) {
paddle::platform::float16* data =
static_cast<paddle::platform::float16*>(origin_data);
paddle::PaddleDType::FLOAT16) {
phi::dtype::float16* data =
static_cast<phi::dtype::float16*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
} else {
LOG(ERROR) << "Inference not support type["
Expand Down Expand Up @@ -646,14 +647,14 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
} else if (dataType == paddle::PaddleDType::FLOAT16) {
databuf_size = out_num * sizeof(paddle::platform::float16);
databuf_size = out_num * sizeof(phi::dtype::float16);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
paddle::platform::float16* data_out =
reinterpret_cast<paddle::platform::float16*>(databuf_data);
phi::dtype::float16* data_out =
reinterpret_cast<phi::dtype::float16*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
}
Expand Down
4 changes: 4 additions & 0 deletions core/predictor/framework/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ int ServerManager::start_and_wait() {
LOG(ERROR) << "Failed to start Paddle Inference Server";
return -1;
}

std::cout << "C++ Serving service started successfully!" << std::endl;
LOG(INFO) << "C++ Serving service started successfully!";

_server.RunUntilAskedToQuit();

ServerManager::stop_reloader();
Expand Down
10 changes: 8 additions & 2 deletions core/predictor/op/op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ int Op::init(Bus* bus,
const std::string& name,
const std::string& type,
void* conf,
const std::vector<std::string>& address,
const uint64_t log_id) {
_bus = bus;
_dag = dag;
_id = id;
_name = name;
_type = type;
_address = address;
set_config(conf);

_timer = butil::get_object<TimerFlow>();
Expand Down Expand Up @@ -110,11 +112,13 @@ int Op::process(const uint64_t log_id, bool debug) {
return ERR_INTERNAL_FAILURE;
}

/*
if (_has_calc) {
LOG(INFO) << "(logid=" << log_id << ") Op: " << _name
<< " already processed before";
return ERR_OK;
}
*/

// 1. dependency inference
/*
Expand Down Expand Up @@ -147,8 +151,10 @@ int Op::process(const uint64_t log_id, bool debug) {
}

// 3. share output to bus
Channel* channel = mutable_channel();
channel->share_to_bus(_bus, log_id);
if (!_has_calc) {
Channel* channel = mutable_channel();
channel->share_to_bus(_bus, log_id);
}

// 4. mark has calculated
_has_calc = true;
Expand Down
4 changes: 4 additions & 0 deletions core/predictor/op/op.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class Op {
const std::string& name,
const std::string& type,
void* conf,
const std::vector<std::string>& address,
const uint64_t log_id);

int deinit();
Expand All @@ -135,6 +136,8 @@ class Op {

const std::string& full_name() const { return _full_name; }

const std::vector<std::string>& address() const { return _address; }

const std::vector<std::string>& pre_names() const { return _pre_node_names; }

void set_full_name(const std::string full_name) { _full_name = full_name; }
Expand Down Expand Up @@ -206,6 +209,7 @@ class Op {
std::string _name;
std::string _full_name; // service_workflow_stageindex_opname
std::string _type;
std::vector<std::string> _address;
bool _has_calc;
bool _has_init;
TimerFlow* _timer;
Expand Down
6 changes: 4 additions & 2 deletions core/sdk-cpp/proto/general_model_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,13 @@ message Request {
message Response {
repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2;
bool profile_server = 3;
uint64 log_id = 4;
// Error code
int32 err_no = 3;
int32 err_no = 5;

// Error messages
string err_msg = 4;
string err_msg = 6;
};

message ModelOutput {
Expand Down
2 changes: 1 addition & 1 deletion doc/Offical_Docs/7-0_Python_Pipeline_Int_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Paddle Serving 实现了一套通用的多模型组合服务编程框架 Python

Python Pipeline 使用案例请阅读[Python Pipeline 快速部署案例](./3-2_QuickStart_Pipeline_OCR_CN.md)

通过阅读以下内容掌握 Python Pipeline 设计方案、高阶用法和优化指南等
通过阅读以下内容掌握 Python Pipeline 核心功能和使用方法、高阶功能用法和性能优化指南等
- [Python Pipeline 框架设计](7-1_Python_Pipeline_Design_CN.md)
- [Python Pipeline 高阶用法](7-2_Python_Pipeline_Senior_CN.md)
- [Python Pipeline 优化指南](7-3_Python_Pipeline_Optimize_CN.md)
Loading

0 comments on commit 984f969

Please sign in to comment.