Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve StatefulOp/FCompute storage fallback #134

Merged
merged 2 commits into from
Aug 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions src/c_api/c_api_ndarray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,28 +301,28 @@ void PushFCompute(const FCompute& fn,
RunContext rctx,
engine::CallbackOnComplete on_complete) {
std::vector<TBlob> input_blobs, output_blobs;
std::vector<NDArray> temp_in, temp_out;
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
OpContext opctx{is_train, rctx,
engine::CallbackOnComplete(),
requested};
GetDefaultBlobs(ndinputs, &input_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(ndoutputs, &output_blobs, &temp_out_src, &temp_out_dst);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
if (ctx.dev_mask() == gpu::kDevMask) {
#if MXNET_USE_CUDA
GetDefaultBlobs<gpu>(ndinputs, &input_blobs, &temp_in, opctx);
GetDefaultBlobs<gpu>(ndoutputs, &output_blobs, &temp_out, opctx);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
CastNonDefaultStorage<gpu>(temp_in_src, temp_in_dst, opctx);
fn(attrs, opctx, input_blobs, req, output_blobs);
// cast to original storage type, if necessary
CastNonDefaultStorage<gpu>(ndoutputs, temp_out, opctx);
CastNonDefaultStorage<gpu>(temp_out_dst, temp_out_src, opctx);
rctx.get_stream<gpu>()->Wait();
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
GetDefaultBlobs<cpu>(ndinputs, &input_blobs, &temp_in, opctx);
GetDefaultBlobs<cpu>(ndoutputs, &output_blobs, &temp_out, opctx);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
CastNonDefaultStorage<cpu>(temp_in_src, temp_in_dst, opctx);
fn(attrs, opctx, input_blobs, req, output_blobs);
CastNonDefaultStorage<cpu>(ndoutputs, temp_out, opctx);
// cast to original storage type, if necessary
CastNonDefaultStorage<cpu>(temp_out_dst, temp_out_src, opctx);
}
on_complete();
}, ctx, read_vars, write_vars, FnProperty::kNormal,
Expand Down Expand Up @@ -383,25 +383,24 @@ void PushOperator(const OpStatePtr& state,
RunContext rctx,
engine::CallbackOnComplete on_complete) {
OpContext opctx{is_train, rctx, on_complete, requested};

std::vector<TBlob> input_blobs, output_blobs;
std::vector<NDArray> temp_in, temp_out;
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
GetDefaultBlobs(ndinputs, &input_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(ndoutputs, &output_blobs, &temp_out_src, &temp_out_dst);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
if (rctx.get_ctx().dev_mask() == gpu::kDevMask) {
#if MXNET_USE_CUDA
GetDefaultBlobs<gpu>(ndinputs, &input_blobs, &temp_in, opctx);
GetDefaultBlobs<gpu>(ndoutputs, &output_blobs, &temp_out, opctx);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
CastNonDefaultStorage<gpu>(temp_in_src, temp_in_dst, opctx);
fcompute(state, opctx, input_blobs, req, output_blobs);
// cast to original storage type, if necessary
CastNonDefaultStorage<gpu>(ndoutputs, temp_out, opctx);
CastNonDefaultStorage<gpu>(temp_our_dst, temp_out_src, opctx);
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
GetDefaultBlobs<cpu>(ndinputs, &input_blobs, &temp_in, opctx);
GetDefaultBlobs<cpu>(ndoutputs, &output_blobs, &temp_out, opctx);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
CastNonDefaultStorage<cpu>(temp_in_src, temp_in_dst, opctx);
fcompute(state, opctx, input_blobs, req, output_blobs);
CastNonDefaultStorage<cpu>(ndoutputs, temp_out, opctx);
CastNonDefaultStorage<cpu>(temp_out_dst, temp_out_src, opctx);
}
if (exec_type == ExecType::kSync) {
if (rctx.get_ctx().dev_mask() == gpu::kDevMask) {
Expand Down
70 changes: 27 additions & 43 deletions src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,82 +31,66 @@ template<typename xpu>
void CastStorageDispatch(const OpContext& ctx, const NDArray& input, const NDArray& output);

/*
* \brief Get the corresponding tensor blobs from default storage NDArrays.
* If any NDArray is of non-default storage, it is casted to default storage and
* the temporary NDArrays are stored in `temps`. When storage_fallback is false,
* and `MXNET_EXEC_STORAGE_FALLBACK` == 0, storage fallback is disallowed.
* \return true if any input is casted
* \brief get the corresponding tensor blobs from default storage NDArrays.
* If any NDArray is of non-default storage, it will be added to `temp_src`
* \return true if any input storage needs to be casted
*/
template <typename xpu>
inline bool GetDefaultBlobs(const std::vector<NDArray>& nds,
inline bool GetDefaultBlobs(const std::vector<NDArray>& src,
std::vector<TBlob> *blobs,
std::vector<NDArray> *temps,
const OpContext& ctx,
bool storage_fallback = false) {
bool casted = false;
if (storage_fallback == false) {
storage_fallback = dmlc::GetEnv("MXNET_EXEC_STORAGE_FALLBACK", true);
}
for (auto& nd : nds) {
std::vector<NDArray> *temp_src,
std::vector<NDArray> *temp_dst) {
bool require_cast = false;
for (size_t i = 0; i < src.size(); i++) {
auto& nd = src[i];
if (nd.storage_type() != kDefaultStorage) {
if (storage_fallback == false) {
LOG(FATAL) << "Storage type conversion detected during execution. "
<< "You are probably executing an operator which "
<< "doesn't support NDArray inputs with non-default storage.";
}
NDArray temp(nd.shape(), nd.ctx(), false);
CastStorageDispatch<xpu>(ctx, nd, temp);
temps->push_back(temp);
blobs->push_back(temp.data());
casted = true;
temp_src->emplace_back(nd);
temp_dst->emplace_back(temp);
blobs->emplace_back(temp.data());
require_cast = true;
} else {
blobs->push_back(nd.data());
}
}
return casted;
return require_cast;
}

/*
* \brief Cast the NDArrays in `src` according to the storage types of the NDArrays
* in `dst`. The ones with default storage in `dst` are ignored.
* \brief cast the NDArrays in `src` to NDArrays in `dst`. This is only used
* for storage fallback mechanism in executor.
* When storage_fallback is false, and `MXNET_EXEC_STORAGE_FALLBACK` == 0,
* storage fallback is disallowed.
*/
template <typename xpu>
inline void CastNonDefaultStorage(const std::vector<NDArray>& dst,
const std::vector<NDArray>& src,
inline void CastNonDefaultStorage(const std::vector<NDArray>& src,
const std::vector<NDArray>& dst,
const OpContext& ctx,
bool storage_fallback = false) {
CHECK_GE(dst.size(), src.size());
if (src.size() == 0) return;
if (storage_fallback == false) {
storage_fallback = dmlc::GetEnv("MXNET_EXEC_STORAGE_FALLBACK", true);
}
size_t src_idx = 0;
for (size_t i = 0; i < dst.size(); i++) {
auto stype = dst[i].storage_type();
if (stype != kDefaultStorage) {
if (storage_fallback == false) {
LOG(FATAL) << "Storage type conversion detected during execution. "
<< "You are probably executing an operator which "
<< "doesn't support NDArray inputs with non-default storage.";
}
CastStorageDispatch<xpu>(ctx, src[src_idx++], dst[i]);
}
if (storage_fallback == false) {
LOG(FATAL) << "Storage type conversion detected during execution. "
<< "You are probably executing an operator which "
<< "doesn't support NDArray inputs with non-default storage.";
}
for (size_t i = 0; i < src.size(); i++) {
CastStorageDispatch<xpu>(ctx, src[i], dst[i]);
}
CHECK_EQ(src_idx, src.size()) << "Not all src NDArrays are casted";
}

// Check if any storage type is not default storage
inline bool ContainsNonDefaultStorage(const StorageTypeVector& vstorage) {
for (auto& i : vstorage) {
for (const auto& i : vstorage) {
if (i != kUndefinedStorage && i != kDefaultStorage) return true;
}
return false;
}

inline bool ContainsDefaultStorage(const std::vector<NDArray>& ndarrays) {
for (auto &nd : ndarrays) {
for (const auto &nd : ndarrays) {
if (nd.storage_type() == kDefaultStorage) {
return true;
}
Expand Down
56 changes: 30 additions & 26 deletions src/executor/attach_op_execs_pass.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,36 @@ class StatefulComputeExecutor : public OpExecutor {
public:
void Run(RunContext rctx, bool is_gpu) override {
using namespace common;
// TODO(haibin) avoid repeating this if all inputs are already in default-storage
op_ctx.run_ctx = rctx;
in_data_.clear();
out_data_.clear();
temp_in_.clear();
temp_out_.clear();
if (is_gpu) {
#if MXNET_USE_CUDA
GetDefaultBlobs<gpu>(in_array, &in_data_, &temp_in_, op_ctx);
GetDefaultBlobs<gpu>(out_array, &out_data_, &temp_out_, op_ctx);
CastNonDefaultStorage<gpu>(temp_in_src_, temp_in_dst_, op_ctx);
CastNonDefaultStorage<gpu>(temp_out_src_, temp_out_dst_, op_ctx);
fcompute_(state_, op_ctx, in_data_, req, out_data_);
CastNonDefaultStorage<gpu>(out_array, temp_out_, op_ctx);
CastNonDefaultStorage<gpu>(temp_out_dst_, temp_out_src_, op_ctx);
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
GetDefaultBlobs<cpu>(in_array, &in_data_, &temp_in_, op_ctx);
GetDefaultBlobs<cpu>(out_array, &out_data_, &temp_out_, op_ctx);
CastNonDefaultStorage<cpu>(temp_in_src_, temp_in_dst_, op_ctx);
CastNonDefaultStorage<cpu>(temp_out_src_, temp_out_dst_, op_ctx);
fcompute_(state_, op_ctx, in_data_, req, out_data_);
CastNonDefaultStorage<cpu>(out_array, temp_out_, op_ctx);
CastNonDefaultStorage<cpu>(temp_out_dst_, temp_out_src_, op_ctx);
}
#if MKL_EXPERIMENTAL == 1
mkl_tblobs_prv_to_cpu(in_data_);
mkl_tblobs_prv_to_cpu(out_data_);
#endif
}

void Setup() override {}
void Setup() override {
using namespace common;
in_data_.clear(); out_data_.clear();
temp_in_src_.clear(); temp_in_dst_.clear();
temp_out_src_.clear(); temp_out_dst_.clear();
GetDefaultBlobs(in_array, &in_data_, &temp_in_src_, &temp_in_dst_);
GetDefaultBlobs(out_array, &out_data_, &temp_out_src_, &temp_out_dst_);
}

ExecType exec_type() const override {
return exec_type_;
Expand All @@ -76,7 +78,7 @@ class StatefulComputeExecutor : public OpExecutor {
FStatefulCompute fcompute_;
ExecType exec_type_;
std::vector<TBlob> in_data_, out_data_;
std::vector<NDArray> temp_in_, temp_out_;
std::vector<NDArray> temp_in_src_, temp_in_dst_, temp_out_src_, temp_out_dst_;
};


Expand Down Expand Up @@ -118,33 +120,35 @@ class FComputeExecutor : public OpExecutor {
using namespace common;
// TODO(haibin) avoid repeating this if all inputs are already in default-storage
op_ctx.run_ctx = rctx;
in_data_.clear();
out_data_.clear();
temp_in_.clear();
temp_out_.clear();
if (is_gpu) {
#if MXNET_USE_CUDA
GetDefaultBlobs<gpu>(in_array, &in_data_, &temp_in_, op_ctx);
GetDefaultBlobs<gpu>(out_array, &out_data_, &temp_out_, op_ctx);
CastNonDefaultStorage<gpu>(temp_in_src_, temp_in_dst_, op_ctx);
CastNonDefaultStorage<gpu>(temp_out_src_, temp_out_dst_, op_ctx);
fcompute_(attrs_, op_ctx, in_data_, req, out_data_);
CastNonDefaultStorage<gpu>(out_array, temp_out_, op_ctx);
CastNonDefaultStorage<gpu>(temp_out_dst_, temp_out_src_, op_ctx);
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
GetDefaultBlobs<cpu>(in_array, &in_data_, &temp_in_, op_ctx);
GetDefaultBlobs<cpu>(out_array, &out_data_, &temp_out_, op_ctx);
CastNonDefaultStorage<cpu>(temp_in_src_, temp_in_dst_, op_ctx);
CastNonDefaultStorage<cpu>(temp_out_src_, temp_out_dst_, op_ctx);
fcompute_(attrs_, op_ctx, in_data_, req, out_data_);
CastNonDefaultStorage<cpu>(out_array, temp_out_, op_ctx);
CastNonDefaultStorage<cpu>(temp_out_dst_, temp_out_src_, op_ctx);
}
#if MKL_EXPERIMENTAL == 1
// TODO(haibin) handle MKL mem with non-default NDArray
mkl_tblobs_prv_to_cpu(in_data_);
mkl_tblobs_prv_to_cpu(out_data_);
#endif
}

void Setup() override {}
void Setup() override {
using namespace common;
in_data_.clear(); out_data_.clear();
temp_in_src_.clear(); temp_in_dst_.clear();
temp_out_src_.clear(); temp_out_dst_.clear();
GetDefaultBlobs(in_array, &in_data_, &temp_in_src_, &temp_in_dst_);
GetDefaultBlobs(out_array, &out_data_, &temp_out_src_, &temp_out_dst_);
}

ExecType exec_type() const override {
return exec_type_;
Expand All @@ -160,7 +164,7 @@ class FComputeExecutor : public OpExecutor {
FCompute fcompute_;
ExecType exec_type_;
std::vector<TBlob> in_data_, out_data_;
std::vector<NDArray> temp_in_, temp_out_;
std::vector<NDArray> temp_in_src_, temp_in_dst_, temp_out_src_, temp_out_dst_;
};

// fcompute_ex executor
Expand Down
9 changes: 5 additions & 4 deletions src/operator/operator_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,12 @@ void FCompExFallback(const nnvm::NodeAttrs& attrs,
const std::string& fname) {
using namespace mxnet::common;
std::vector<TBlob> in_blobs, out_blobs;
std::vector<NDArray> temp_in, temp_out;
GetDefaultBlobs<xpu>(inputs, &in_blobs, &temp_in, ctx, true);
GetDefaultBlobs<xpu>(outputs, &out_blobs, &temp_out, ctx, true);
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
GetDefaultBlobs(inputs, &in_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(outputs, &out_blobs, &temp_out_src, &temp_out_dst);
CastNonDefaultStorage<xpu>(temp_in_src, temp_in_dst, ctx, true);
fcompute(attrs, ctx, in_blobs, req, out_blobs);
CastNonDefaultStorage<xpu>(outputs, temp_out, ctx, true);
CastNonDefaultStorage<xpu>(temp_out_dst, temp_out_src, ctx, true);
}

#define CHECK_RSP_ALL_ROWS_NON_ZERO(rsp, func, param) \
Expand Down
Loading