Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support vertical federated learning #8932

Merged
merged 9 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions include/xgboost/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ class MetaInfo {
*/
void Extend(MetaInfo const& that, bool accumulate_rows, bool check_column);

/**
* @brief Synchronize the number of columns across all workers.
*
* Normally we just need to find the maximum number of columns across all workers, but
* in vertical federated learning, since each worker loads its own list of columns,
* we need to sum them.
*/
void SynchronizeNumberOfColumns();

private:
void SetInfoFromHost(Context const& ctx, StringView key, Json arr);
void SetInfoFromCUDA(Context const& ctx, StringView key, Json arr);
Expand Down Expand Up @@ -325,6 +334,10 @@ class SparsePage {
* \brief Check wether the column index is sorted.
*/
bool IsIndicesSorted(int32_t n_threads) const;
/**
* \brief Reindex the column index with an offset.
*/
void Reindex(uint64_t feature_offset, int32_t n_threads);

void SortRows(int32_t n_threads);

Expand Down Expand Up @@ -559,17 +572,18 @@ class DMatrix {
* \brief Creates a new DMatrix from an external data adapter.
*
* \tparam AdapterT Type of the adapter.
* \param [in,out] adapter View onto an external data.
* \param missing Values to count as missing.
* \param nthread Number of threads for construction.
* \param cache_prefix (Optional) The cache prefix for external memory.
* \param page_size (Optional) Size of the page.
* \param [in,out] adapter View onto an external data.
* \param missing Values to count as missing.
* \param nthread Number of threads for construction.
* \param cache_prefix (Optional) The cache prefix for external memory.
* \param data_split_mode (Optional) Data split mode.
*
* \return a Created DMatrix.
*/
template <typename AdapterT>
static DMatrix* Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix = "");
const std::string& cache_prefix = "",
DataSplitMode data_split_mode = DataSplitMode::kRow);

/**
* \brief Create a new Quantile based DMatrix used for histogram based algorithm.
Expand Down
57 changes: 38 additions & 19 deletions src/data/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,14 @@ void MetaInfo::Extend(MetaInfo const& that, bool accumulate_rows, bool check_col
}
}

void MetaInfo::SynchronizeNumberOfColumns() {
if (collective::IsFederated() && data_split_mode == DataSplitMode::kCol) {
collective::Allreduce<collective::Operation::kSum>(&num_col_, 1);
} else {
collective::Allreduce<collective::Operation::kMax>(&num_col_, 1);
}
}

void MetaInfo::Validate(std::int32_t device) const {
if (group_ptr_.size() != 0 && weights_.Size() != 0) {
CHECK_EQ(group_ptr_.size(), weights_.Size() + 1)
Expand Down Expand Up @@ -870,7 +878,7 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
dmlc::Parser<uint32_t>::Create(fname.c_str(), partid, npart, file_format.c_str()));
data::FileAdapter adapter(parser.get());
dmat = DMatrix::Create(&adapter, std::numeric_limits<float>::quiet_NaN(), Context{}.Threads(),
cache_file);
cache_file, data_split_mode);
} else {
data::FileIterator iter{fname, static_cast<uint32_t>(partid), static_cast<uint32_t>(npart),
file_format};
Expand Down Expand Up @@ -906,11 +914,6 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
LOG(FATAL) << "Encountered parser error:\n" << e.what();
}

/* sync up number of features after matrix loaded.
* partitioned data will fail the train/val validation check
* since partitioned data not knowing the real number of features. */
collective::Allreduce<collective::Operation::kMax>(&dmat->Info().num_col_, 1);

if (need_split && data_split_mode == DataSplitMode::kCol) {
if (!cache_file.empty()) {
LOG(FATAL) << "Column-wise data split is not support for external memory.";
Expand All @@ -920,7 +923,6 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
delete dmat;
return sliced;
} else {
dmat->Info().data_split_mode = data_split_mode;
return dmat;
}
}
Expand Down Expand Up @@ -957,39 +959,49 @@ template DMatrix *DMatrix::Create<DataIterHandle, DMatrixHandle,
XGDMatrixCallbackNext *next, float missing, int32_t n_threads, std::string);

template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread, const std::string&) {
return new data::SimpleDMatrix(adapter, missing, nthread);
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread, const std::string&,
DataSplitMode data_split_mode) {
return new data::SimpleDMatrix(adapter, missing, nthread, data_split_mode);
}

template DMatrix* DMatrix::Create<data::DenseAdapter>(data::DenseAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::ArrayAdapter>(data::ArrayAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSRAdapter>(data::CSRAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSCAdapter>(data::CSCAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::DataTableAdapter>(data::DataTableAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::FileAdapter>(data::FileAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSRArrayAdapter>(data::CSRArrayAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSCArrayAdapter>(data::CSCArrayAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create(
data::IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>* adapter,
float missing, int nthread, const std::string& cache_prefix);
float missing, int nthread, const std::string& cache_prefix, DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::RecordBatchesIterAdapter>(
data::RecordBatchesIterAdapter* adapter, float missing, int nthread, const std::string&);
data::RecordBatchesIterAdapter* adapter, float missing, int nthread, const std::string&,
DataSplitMode data_split_mode);

SparsePage SparsePage::GetTranspose(int num_columns, int32_t n_threads) const {
SparsePage transpose;
Expand Down Expand Up @@ -1051,6 +1063,13 @@ void SparsePage::SortIndices(int32_t n_threads) {
});
}

void SparsePage::Reindex(uint64_t feature_offset, int32_t n_threads) {
auto& h_data = this->data.HostVector();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This potentially pulls data from the device to the host.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but it's not much different from some of the other methods there.

common::ParallelFor(h_data.size(), n_threads, [&](auto i) {
h_data[i].index += feature_offset;
});
}

void SparsePage::SortRows(int32_t n_threads) {
auto& h_offset = this->offset.HostVector();
auto& h_data = this->data.HostVector();
Expand Down
8 changes: 4 additions & 4 deletions src/data/data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,17 @@ void MetaInfo::SetInfoFromCUDA(Context const& ctx, StringView key, Json array) {

template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix) {
const std::string& cache_prefix, DataSplitMode data_split_mode) {
CHECK_EQ(cache_prefix.size(), 0)
<< "Device memory construction is not currently supported with external "
"memory.";
return new data::SimpleDMatrix(adapter, missing, nthread);
return new data::SimpleDMatrix(adapter, missing, nthread, data_split_mode);
}

template DMatrix* DMatrix::Create<data::CudfAdapter>(
data::CudfAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix);
const std::string& cache_prefix, DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CupyAdapter>(
data::CupyAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix);
const std::string& cache_prefix, DataSplitMode data_split_mode);
} // namespace xgboost
2 changes: 1 addition & 1 deletion src/data/iterative_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
// From here on Info() has the correct data shape
Info().num_row_ = accumulated_rows;
Info().num_nonzero_ = nnz;
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
Info().SynchronizeNumberOfColumns();
CHECK(std::none_of(column_sizes.cbegin(), column_sizes.cend(), [&](auto f) {
return f > accumulated_rows;
})) << "Something went wrong during iteration.";
Expand Down
2 changes: 1 addition & 1 deletion src/data/iterative_dmatrix.cu
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing,

iter.Reset();
// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.SynchronizeNumberOfColumns();
}

BatchSet<EllpackPage> IterativeDMatrix::GetEllpackBatches(BatchParam const& param) {
Expand Down
56 changes: 42 additions & 14 deletions src/data/simple_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ DMatrix* SimpleDMatrix::SliceCol(int num_slices, int slice_id) {
return out;
}

void SimpleDMatrix::ReindexFeatures() {
if (collective::IsFederated() && info_.data_split_mode == DataSplitMode::kCol) {
std::vector<uint64_t> buffer(collective::GetWorldSize());
buffer[collective::GetRank()] = info_.num_col_;
collective::Allgather(buffer.data(), buffer.size() * sizeof(uint64_t));
auto offset = std::accumulate(buffer.cbegin(), buffer.cbegin() + collective::GetRank(), 0);
if (offset == 0) {
return;
}
sparse_page_->Reindex(offset, ctx_.Threads());
}
}

BatchSet<SparsePage> SimpleDMatrix::GetRowBatches() {
// since csr is the default data structure so `source_` is always available.
auto begin_iter = BatchIterator<SparsePage>(
Expand Down Expand Up @@ -151,7 +164,8 @@ BatchSet<ExtSparsePage> SimpleDMatrix::GetExtBatches(BatchParam const&) {
}

template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
DataSplitMode data_split_mode) {
this->ctx_.nthread = nthread;

std::vector<uint64_t> qids;
Expand Down Expand Up @@ -217,7 +231,9 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {


// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
ReindexFeatures();
info_.SynchronizeNumberOfColumns();

if (adapter->NumRows() == kAdapterUnknownSize) {
using IteratorAdapterT
Expand Down Expand Up @@ -272,22 +288,31 @@ void SimpleDMatrix::SaveToLocalFile(const std::string& fname) {
fo->Write(sparse_page_->data.HostVector());
}

template SimpleDMatrix::SimpleDMatrix(DenseAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(ArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSRAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSRArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSCArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSCAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(DataTableAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(FileAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(DenseAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(ArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSRAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSRArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSCArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSCAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(DataTableAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(FileAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(
IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>
*adapter,
float missing, int nthread);
float missing, int nthread, DataSplitMode data_split_mode);

template <>
SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, int nthread) {
ctx_.nthread = nthread;
SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode) {
ctx_.nthread = nthread;

auto& offset_vec = sparse_page_->offset.HostVector();
auto& data_vec = sparse_page_->data.HostVector();
Expand Down Expand Up @@ -346,7 +371,10 @@ SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, i
}
// Synchronise worker columns
info_.num_col_ = adapter->NumColumns();
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
ReindexFeatures();
info_.SynchronizeNumberOfColumns();

info_.num_row_ = total_batch_size;
info_.num_nonzero_ = data_vec.size();
CHECK_EQ(offset_vec.back(), info_.num_nonzero_);
Expand Down
11 changes: 7 additions & 4 deletions src/data/simple_dmatrix.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace data {
// Current implementation assumes a single batch. More batches can
// be supported in future. Does not currently support inferring row/column size
template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread*/) {
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread*/,
DataSplitMode data_split_mode) {
auto device = (adapter->DeviceIdx() < 0 || adapter->NumRows() == 0) ? dh::CurrentDevice()
: adapter->DeviceIdx();
CHECK_GE(device, 0);
Expand All @@ -35,12 +36,14 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread
info_.num_col_ = adapter->NumColumns();
info_.num_row_ = adapter->NumRows();
// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
ReindexFeatures();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mark it not implemented for now. This may pull the data back to CPU

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

info_.SynchronizeNumberOfColumns();
}

template SimpleDMatrix::SimpleDMatrix(CudfAdapter* adapter, float missing,
int nthread);
int nthread, DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CupyAdapter* adapter, float missing,
int nthread);
int nthread, DataSplitMode data_split_mode);
} // namespace data
} // namespace xgboost
12 changes: 11 additions & 1 deletion src/data/simple_dmatrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class SimpleDMatrix : public DMatrix {
public:
SimpleDMatrix() = default;
template <typename AdapterT>
explicit SimpleDMatrix(AdapterT* adapter, float missing, int nthread);
explicit SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
DataSplitMode data_split_mode = DataSplitMode::kRow);

explicit SimpleDMatrix(dmlc::Stream* in_stream);
~SimpleDMatrix() override = default;
Expand Down Expand Up @@ -61,6 +62,15 @@ class SimpleDMatrix : public DMatrix {
bool GHistIndexExists() const override { return static_cast<bool>(gradient_index_); }
bool SparsePageExists() const override { return true; }

/**
* \brief Reindex the features based on a global view.
*
* In some cases (e.g. vertical federated learning), features are loaded locally with indices
* starting from 0. However, all the algorithms assume the features are globally indexed, so we
* reindex the features based on the offset needed to obtain the global view.
*/
void ReindexFeatures();

private:
Context ctx_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/data/sparse_page_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ SparsePageDMatrix::SparsePageDMatrix(DataIterHandle iter_handle, DMatrixHandle p
this->info_.num_col_ = n_features;
this->info_.num_nonzero_ = nnz;

collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.SynchronizeNumberOfColumns();
CHECK_NE(info_.num_col_, 0);
}

Expand Down
Loading