Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring MetaDataObject out of DenseMatrix #758

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/api/cli/DaphneUserConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct DaphneUserConfig {
bool use_cuda = false;
bool use_vectorized_exec = false;
bool use_distributed = false;
bool use_grpc_async = false;
bool use_obj_ref_mgnt = true;
bool use_ipa_const_propa = true;
bool use_phy_op_selection = true;
Expand Down
6 changes: 4 additions & 2 deletions src/parser/config/ConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ int readLogLevel(const std::string &level) {
bool ConfigParser::fileExists(const std::string &filename) {
// Open the given config file.
std::ifstream ifs(filename, std::ios::in);
if (!ifs.good())
throw std::runtime_error("could not open file '" + filename + "' for reading user config");
if (!ifs.good()) {
spdlog::warn("could not open file {} for reading user config", filename);
return false;
}
return true;
}

Expand Down
25 changes: 13 additions & 12 deletions src/runtime/distributed/coordinator/kernels/Broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT> {
LoadPartitioningDistributed<DT, AllocationDescriptorMPI> partioner(DistributionSchema::BROADCAST, mat, dctx);
while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
auto rank = dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getRank();
auto rank = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getRank();

if (dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

// Minimum chunk size
Expand Down Expand Up @@ -114,12 +114,13 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT> {
WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank);
std::string address = std::to_string(rank);
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
auto data = dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getDistributedData();
data.identifier = dataAcknowledgement.identifier;
data.numRows = dataAcknowledgement.numRows;
data.numCols = dataAcknowledgement.numCols;
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).updateDistributedData(data);
auto alloc = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0));
alloc->updateDistributedData(data);
}
}
};
Expand Down Expand Up @@ -150,12 +151,12 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT> {

while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
if (dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto address = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getLocation();
auto address = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();

StoredInfo storedInfo({dp->dp_id});
StoredInfo storedInfo({dp->getID()});
caller.asyncStoreCall(address, storedInfo);
// Minimum chunk size
auto min_chunk_size =
Expand Down Expand Up @@ -187,15 +188,15 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT> {
auto dp_id = response.storedInfo.dp_id;
auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id);

auto data = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData();

auto storedData = response.result;
data.identifier = storedData.identifier();
data.numRows = storedData.num_rows();
data.numCols = storedData.num_cols();
data.isPlacedAtWorker = true;

dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->updateDistributedData(data);
}
};
};
Expand All @@ -222,10 +223,10 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {

while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
if (dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto workerAddr = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getLocation();
auto workerAddr = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();
std::thread t([=, &mat]() {
// TODO Consider saving channels inside DaphneContext
grpc::ChannelArguments ch_args;
Expand Down Expand Up @@ -260,7 +261,7 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {
newData.numRows = storedData.num_rows();
newData.numCols = storedData.num_cols();
newData.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).updateDistributedData(newData);
dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->updateDistributedData(newData);
});
threads_vector.push_back(move(t));
}
Expand Down
43 changes: 23 additions & 20 deletions src/runtime/distributed/coordinator/kernels/Distribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT> {

while (partioner.HasNextChunk()) {
DataPlacement *dp = partioner.GetNextChunk();
auto rank = dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getRank();
auto rank = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getRank();

if (dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);

auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);
// Minimum chunk size
auto min_chunk_size =
dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer<DT>::length(slicedMat)
Expand All @@ -91,12 +90,12 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT> {
WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank);
std::string address = std::to_string(rank);
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
auto data = dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getDistributedData();
data.identifier = dataAcknowledgement.identifier;
data.numRows = dataAcknowledgement.numRows;
data.numCols = dataAcknowledgement.numCols;
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->updateDistributedData(data);
}
}
};
Expand All @@ -121,17 +120,17 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT> {
while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;
distributed::Data protoMsg;

std::vector<char> buffer;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);

StoredInfo storedInfo({dp->dp_id});
StoredInfo storedInfo({dp->getID()});

auto address = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getLocation();
auto address = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();

caller.asyncStoreCall(address, storedInfo);
// Minimum chunk size
Expand Down Expand Up @@ -161,12 +160,12 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT> {

auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id);

auto data = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData();
data.identifier = storedData.identifier();
data.numRows = storedData.num_rows();
data.numCols = storedData.num_cols();
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->updateDistributedData(data);
}
}
};
Expand All @@ -187,23 +186,27 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {
LoadPartitioningDistributed<DT, AllocationDescriptorGRPC> partioner(DistributionSchema::DISTRIBUTE, mat, dctx);
while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
continue;

if (auto grpc_alloc = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))) {
auto dist_data = grpc_alloc->getDistributedData();
// Skip if already placed at workers
if (dist_data.isPlacedAtWorker)
continue;
} else
throw std::runtime_error("dynamic_cast<AllocationDescriptorGRPC*>(alloc) failed (returned nullptr)");
std::vector<char> buffer;

auto workerAddr = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getLocation();
auto workerAddr = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();
std::thread t([=, &mat]() {
auto stub = ctx->stubs[workerAddr].get();

distributed::StoredData storedData;
grpc::ClientContext grpc_ctx;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto slicedMat =
mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);
auto serializer =
DaphneSerializerChunks<DT>(slicedMat, dctx->config.max_distributed_serialization_chunk_size);

distributed::Data protoMsg;

// Send chunks
Expand All @@ -222,10 +225,10 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {
newData.numRows = storedData.num_rows();
newData.numCols = storedData.num_cols();
newData.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).updateDistributedData(newData);
dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->updateDistributedData(newData);
DataObjectFactory::destroy(slicedMat);
});
threads_vector.push_back(move(t));
threads_vector.push_back(std::move(t));
}
for (auto &thread : threads_vector)
thread.join();
Expand Down
Loading
Loading