Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support column-split in row partitioner #8828

Merged
merged 10 commits into from
Feb 25, 2023
100 changes: 82 additions & 18 deletions src/common/partition_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ namespace common {
// BlockSize is template to enable memory alignment easily with C++11 'alignas()' feature
template<size_t BlockSize>
class PartitionBuilder {
using BitVector = RBitField8;

public:
template<typename Func>
void Init(const size_t n_tasks, size_t n_nodes, Func funcNTask) {
Expand Down Expand Up @@ -121,27 +123,11 @@ class PartitionBuilder {
bool default_left = tree[nid].DefaultLeft();
bool is_cat = tree.GetSplitTypes()[nid] == FeatureType::kCategorical;
auto node_cats = tree.NodeCats(nid);

auto const& index = gmat.index;
auto const& cut_values = gmat.cut.Values();
auto const& cut_ptrs = gmat.cut.Ptrs();

auto gidx_calc = [&](auto ridx) {
auto begin = gmat.RowIdx(ridx);
if (gmat.IsDense()) {
return static_cast<bst_bin_t>(index[begin + fid]);
}
auto end = gmat.RowIdx(ridx + 1);
auto f_begin = cut_ptrs[fid];
auto f_end = cut_ptrs[fid + 1];
// bypassing the column matrix as we need the cut value instead of bin idx for categorical
// features.
return BinarySearchBin(begin, end, index, f_begin, f_end);
};

auto pred_hist = [&](auto ridx, auto bin_id) {
if (any_cat && is_cat) {
auto gidx = gidx_calc(ridx);
auto gidx = gmat.GetGindex(ridx, fid);
bool go_left = default_left;
if (gidx > -1) {
go_left = Decision(node_cats, cut_values[gidx]);
Expand All @@ -153,7 +139,7 @@ class PartitionBuilder {
};

auto pred_approx = [&](auto ridx) {
auto gidx = gidx_calc(ridx);
auto gidx = gmat.GetGindex(ridx, fid);
bool go_left = default_left;
if (gidx > -1) {
if (is_cat) {
Expand Down Expand Up @@ -199,6 +185,84 @@ class PartitionBuilder {
SetNRightElems(node_in_set, range.begin(), n_right);
}

/**
* @brief When data is split by column, we don't have all the features locally on the current
* worker, so we go through all the rows and mark the bit vectors on whether the decision is made
* to go right, or if the feature value used for the split is missing.
*/
void MaskRows(const size_t node_in_set, std::vector<xgboost::tree::CPUExpandEntry> const &nodes,
const common::Range1d range, GHistIndexMatrix const& gmat,
const common::ColumnMatrix& column_matrix,
const RegTree& tree, const size_t* rid,
BitVector* decision_bits, BitVector* missing_bits) {
common::Span<const size_t> rid_span(rid + range.begin(), rid + range.end());
std::size_t nid = nodes[node_in_set].nid;
bst_feature_t fid = tree[nid].SplitIndex();
bool is_cat = tree.GetSplitTypes()[nid] == FeatureType::kCategorical;
auto node_cats = tree.NodeCats(nid);
auto const& cut_values = gmat.cut.Values();

if (!column_matrix.IsInitialized()) {
for (auto row_id : rid_span) {
auto gidx = gmat.GetGindex(row_id, fid);
if (gidx > -1) {
bool go_left = false;
if (is_cat) {
go_left = Decision(node_cats, cut_values[gidx]);
} else {
go_left = cut_values[gidx] <= nodes[node_in_set].split.split_value;
}
if (go_left) {
decision_bits->Set(row_id - gmat.base_rowid);
}
} else {
missing_bits->Set(row_id - gmat.base_rowid);
}
}
} else {
LOG(FATAL) << "Column data split is only supported for the `approx` tree method";
}
}

/**
* @brief Once we've aggregated the decision and missing bits from all the workers, we can then
* use them to partition the rows accordingly.
*/
void PartitionByMask(const size_t node_in_set,
std::vector<xgboost::tree::CPUExpandEntry> const& nodes,
const common::Range1d range, GHistIndexMatrix const& gmat,
const common::ColumnMatrix& column_matrix, const RegTree& tree,
const size_t* rid, BitVector const& decision_bits,
BitVector const& missing_bits) {
common::Span<const size_t> rid_span(rid + range.begin(), rid + range.end());
common::Span<size_t> left = GetLeftBuffer(node_in_set, range.begin(), range.end());
common::Span<size_t> right = GetRightBuffer(node_in_set, range.begin(), range.end());
std::size_t nid = nodes[node_in_set].nid;
bool default_left = tree[nid].DefaultLeft();

auto pred_approx = [&](auto ridx) {
bool go_left = default_left;
bool is_missing = missing_bits.Check(ridx - gmat.base_rowid);
if (!is_missing) {
go_left = decision_bits.Check(ridx - gmat.base_rowid);
}
return go_left;
};

std::pair<size_t, size_t> child_nodes_sizes;
if (!column_matrix.IsInitialized()) {
child_nodes_sizes = PartitionRangeKernel(rid_span, left, right, pred_approx);
} else {
LOG(FATAL) << "Column data split is only supported for the `approx` tree method";
}

const size_t n_left = child_nodes_sizes.first;
const size_t n_right = child_nodes_sizes.second;

SetNLeftElems(node_in_set, range.begin(), n_left);
SetNRightElems(node_in_set, range.begin(), n_right);
}

// allocate thread local memory, should be called for each specific task
void AllocateForTask(size_t id) {
if (mem_blocks_[id].get() == nullptr) {
Expand Down
18 changes: 13 additions & 5 deletions src/data/gradient_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,24 @@ common::ColumnMatrix const &GHistIndexMatrix::Transpose() const {
return *columns_;
}

bst_bin_t GHistIndexMatrix::GetGindex(size_t ridx, size_t fidx) const {
auto begin = RowIdx(ridx);
if (IsDense()) {
return static_cast<bst_bin_t>(index[begin + fidx]);
}
auto end = RowIdx(ridx + 1);
auto const& cut_ptrs = cut.Ptrs();
auto f_begin = cut_ptrs[fidx];
auto f_end = cut_ptrs[fidx + 1];
return BinarySearchBin(begin, end, index, f_begin, f_end);
}

float GHistIndexMatrix::GetFvalue(size_t ridx, size_t fidx, bool is_cat) const {
auto const &values = cut.Values();
auto const &mins = cut.MinValues();
auto const &ptrs = cut.Ptrs();
if (is_cat) {
auto f_begin = ptrs[fidx];
auto f_end = ptrs[fidx + 1];
auto begin = RowIdx(ridx);
auto end = RowIdx(ridx + 1);
auto gidx = BinarySearchBin(begin, end, index, f_begin, f_end);
auto gidx = GetGindex(ridx, fidx);
if (gidx == -1) {
return std::numeric_limits<float>::quiet_NaN();
}
Expand Down
2 changes: 2 additions & 0 deletions src/data/gradient_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ class GHistIndexMatrix {

common::ColumnMatrix const& Transpose() const;

bst_bin_t GetGindex(size_t ridx, size_t fidx) const;

float GetFvalue(size_t ridx, size_t fidx, bool is_cat) const;

private:
Expand Down
103 changes: 87 additions & 16 deletions src/tree/common_row_partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,92 @@
#include <limits> // std::numeric_limits
#include <vector>

#include "../collective/communicator-inl.h"
#include "../common/numeric.h" // Iota
#include "../common/partition_builder.h"
#include "hist/expand_entry.h" // CPUExpandEntry
#include "xgboost/context.h" // Context

namespace xgboost {
namespace tree {
class CommonRowPartitioner {
static constexpr size_t kPartitionBlockSize = 2048;
common::PartitionBuilder<kPartitionBlockSize> partition_builder_;
common::RowSetCollection row_set_collection_;

static constexpr size_t kPartitionBlockSize = 2048;

class ColumnSplitHelper {
public:
ColumnSplitHelper() = default;

ColumnSplitHelper(bst_row_t num_row,
common::PartitionBuilder<kPartitionBlockSize>* partition_builder,
common::RowSetCollection* row_set_collection)
: partition_builder_{partition_builder}, row_set_collection_{row_set_collection} {
decision_storage_.resize(num_row);
decision_bits_ = BitVector(common::Span<BitVector::value_type>(decision_storage_));
missing_storage_.resize(num_row);
missing_bits_ = BitVector(common::Span<BitVector::value_type>(missing_storage_));
}

void Partition(common::BlockedSpace2d const& space, std::int32_t n_threads,
GHistIndexMatrix const& gmat, common::ColumnMatrix const& column_matrix,
std::vector<CPUExpandEntry> const& nodes, RegTree const* p_tree) {
// When data is split by column, we don't have all the feature values in the local worker, so
// we first collect all the decisions and whether the feature is missing into bit vectors.
std::fill(decision_storage_.begin(), decision_storage_.end(), 0);
std::fill(missing_storage_.begin(), missing_storage_.end(), 0);
common::ParallelFor2d(space, n_threads, [&](size_t node_in_set, common::Range1d r) {
const int32_t nid = nodes[node_in_set].nid;
partition_builder_->MaskRows(node_in_set, nodes, r, gmat, column_matrix, *p_tree,
(*row_set_collection_)[nid].begin, &decision_bits_,
&missing_bits_);
});

// Then aggregate the bit vectors across all the workers.
collective::Allreduce<collective::Operation::kBitwiseOR>(decision_storage_.data(),
decision_storage_.size());
collective::Allreduce<collective::Operation::kBitwiseAND>(missing_storage_.data(),
missing_storage_.size());

// Finally use the bit vectors to partition the rows.
common::ParallelFor2d(space, n_threads, [&](size_t node_in_set, common::Range1d r) {
size_t begin = r.begin();
const int32_t nid = nodes[node_in_set].nid;
const size_t task_id = partition_builder_->GetTaskIdx(node_in_set, begin);
partition_builder_->AllocateForTask(task_id);
partition_builder_->PartitionByMask(node_in_set, nodes, r, gmat, column_matrix, *p_tree,
(*row_set_collection_)[nid].begin, decision_bits_,
missing_bits_);
});
}

private:
using BitVector = RBitField8;
std::vector<BitVector::value_type> decision_storage_{};
BitVector decision_bits_{};
std::vector<BitVector::value_type> missing_storage_{};
BitVector missing_bits_{};
common::PartitionBuilder<kPartitionBlockSize>* partition_builder_;
common::RowSetCollection* row_set_collection_;
};

class CommonRowPartitioner {
public:
bst_row_t base_rowid = 0;

CommonRowPartitioner() = default;
CommonRowPartitioner(Context const* ctx, bst_row_t num_row, bst_row_t _base_rowid)
: base_rowid{_base_rowid} {
CommonRowPartitioner(Context const* ctx, bst_row_t num_row, bst_row_t _base_rowid,
bool is_col_split)
: base_rowid{_base_rowid}, is_col_split_{is_col_split} {
row_set_collection_.Clear();
std::vector<size_t>& row_indices = *row_set_collection_.Data();
row_indices.resize(num_row);

std::size_t* p_row_indices = row_indices.data();
common::Iota(ctx, p_row_indices, p_row_indices + row_indices.size(), base_rowid);
row_set_collection_.Init();

if (is_col_split_) {
column_split_helper_ = ColumnSplitHelper{num_row, &partition_builder_, &row_set_collection_};
}
}

void FindSplitConditions(const std::vector<CPUExpandEntry>& nodes, const RegTree& tree,
Expand Down Expand Up @@ -156,16 +217,20 @@ class CommonRowPartitioner {

// 2.3 Split elements of row_set_collection_ to left and right child-nodes for each node
// Store results in intermediate buffers from partition_builder_
common::ParallelFor2d(space, ctx->Threads(), [&](size_t node_in_set, common::Range1d r) {
size_t begin = r.begin();
const int32_t nid = nodes[node_in_set].nid;
const size_t task_id = partition_builder_.GetTaskIdx(node_in_set, begin);
partition_builder_.AllocateForTask(task_id);
bst_bin_t split_cond = column_matrix.IsInitialized() ? split_conditions[node_in_set] : 0;
partition_builder_.template Partition<BinIdxType, any_missing, any_cat>(
node_in_set, nodes, r, split_cond, gmat, column_matrix, *p_tree,
row_set_collection_[nid].begin);
});
if (is_col_split_) {
column_split_helper_.Partition(space, ctx->Threads(), gmat, column_matrix, nodes, p_tree);
} else {
common::ParallelFor2d(space, ctx->Threads(), [&](size_t node_in_set, common::Range1d r) {
size_t begin = r.begin();
const int32_t nid = nodes[node_in_set].nid;
const size_t task_id = partition_builder_.GetTaskIdx(node_in_set, begin);
partition_builder_.AllocateForTask(task_id);
bst_bin_t split_cond = column_matrix.IsInitialized() ? split_conditions[node_in_set] : 0;
partition_builder_.template Partition<BinIdxType, any_missing, any_cat>(
node_in_set, nodes, r, split_cond, gmat, column_matrix, *p_tree,
row_set_collection_[nid].begin);
});
}

// 3. Compute offsets to copy blocks of row-indexes
// from partition_builder_ to row_set_collection_
Expand Down Expand Up @@ -205,6 +270,12 @@ class CommonRowPartitioner {
ctx, tree, this->Partitions(), p_out_position,
[&](size_t idx) -> bool { return gpair[idx].GetHess() - .0f == .0f; });
}

private:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should extract these into a different class or struct with its own utility member functions? Also, the CommonRowPartitioner has private members, please place them together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

common::PartitionBuilder<kPartitionBlockSize> partition_builder_;
common::RowSetCollection row_set_collection_;
bool is_col_split_;
ColumnSplitHelper column_split_helper_;
};

} // namespace tree
Expand Down
2 changes: 1 addition & 1 deletion src/tree/updater_approx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class GloablApproxBuilder {
} else {
CHECK_EQ(n_total_bins, page.cut.TotalBins());
}
partitioner_.emplace_back(this->ctx_, page.Size(), page.base_rowid);
partitioner_.emplace_back(this->ctx_, page.Size(), page.base_rowid, p_fmat->IsColumnSplit());
n_batches_++;
}

Expand Down
2 changes: 1 addition & 1 deletion src/tree/updater_quantile_hist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ void QuantileHistMaker::Builder::InitData(DMatrix *fmat, const RegTree &tree,
} else {
CHECK_EQ(n_total_bins, page.cut.TotalBins());
}
partitioner_.emplace_back(this->ctx_, page.Size(), page.base_rowid);
partitioner_.emplace_back(this->ctx_, page.Size(), page.base_rowid, fmat->IsColumnSplit());
++page_id;
}
histogram_builder_->Reset(n_total_bins, HistBatch(param_), ctx_->Threads(), page_id,
Expand Down
Loading