Skip to content

Commit

Permalink
Initial implementation of DaphneSerializer (#467)
Browse files Browse the repository at this point in the history
- Initial implementation of DaphneSerializer.
  - For Structure, DenseMatrix, CSRMatrix, and fundamental types (Frame still missing, see #545.
  - Variants for (de)serializing a data object as a whole or in chunks (for in-order and out-of-order deserialization).
- Updated WriteDaphne.h and ReadDaphne.h (reader/writer of DAPHNE binary data format) to use DaphneSerializer. 
- Updated distributed kernels with DaphneSerializer.
  - Distribute.h, Broadcast.h and DistributedCollect.h kernels updated with DaphneSerializer.
  - Updated gRPC and MPI distributed backends (e.g., workers).
  - Removed DAPHNE ProtoDataConverter.
- Test cases.
- API documentation in the source code.
- Fixed a small bug for MPI: MPI helper function, called by the distribute kernel, received the wrong rank.
- Contributes to #103, #465.

Co-authored-by: Stratos Psomadakis <[email protected]>
  • Loading branch information
aristotelis96 and psomas authored Jun 14, 2023
1 parent 349bc39 commit 478ae7c
Show file tree
Hide file tree
Showing 35 changed files with 2,594 additions and 1,297 deletions.
1 change: 0 additions & 1 deletion src/api/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ set(LIBS
DaphneIrExecutor
DaphneConfigParser
DaphneMetaDataParser
ProtoDataConverter
Util
WorkerImpl
)
Expand Down
53 changes: 29 additions & 24 deletions src/runtime/distributed/coordinator/kernels/Broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <runtime/local/context/DistributedContext.h>
#include <runtime/local/datastructures/DataObjectFactory.h>
#include <runtime/local/datastructures/DenseMatrix.h>
#include <runtime/distributed/proto/ProtoDataConverter.h>
#include <runtime/local/datastructures/DistributedAllocationHelpers.h>
#include <runtime/local/io/DaphneSerializer.h>

#include <runtime/local/datastructures/AllocationDescriptorGRPC.h>
#include <runtime/local/datastructures/DataPlacement.h>
#include <runtime/local/datastructures/Range.h>
Expand All @@ -32,7 +32,6 @@
#include <runtime/distributed/worker/MPIHelper.h>
#include <runtime/local/datastructures/AllocationDescriptorMPI.h>
#include <runtime/distributed/worker/MPIWorker.h>
#include <runtime/distributed/worker/MPISerializer.h>
#endif

#include <cassert>
Expand Down Expand Up @@ -72,9 +71,17 @@ struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT>
static void apply(DT *&mat, bool isScalar, DCTX(dctx))
{
size_t messageLength=0;
void * dataToSend;
//auto ptr = (double*)(&mat);
MPISerializer::serializeStructure<DT>(&dataToSend, mat, isScalar, &messageLength);
std::vector<char> dataToSend;
if (isScalar){
auto ptr = (double*)(&mat);
double val = *ptr;
mat = DataObjectFactory::create<DenseMatrix<double>>(0, 0, false);
dataToSend.reserve(sizeof(double));
messageLength = DaphneSerializer<double>::serialize(val, dataToSend);
}
else {
messageLength = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(mat, dataToSend);
}
std::vector<int> targetGroup; // We will not be able to take the advantage of broadcast if some mpi processes have the data

LoadPartitioningDistributed<DT, AllocationDescriptorMPI> partioner(DistributionSchema::BROADCAST, mat, dctx);
Expand All @@ -85,36 +92,35 @@ struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT>
if (dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
{
//std::cout<<"data is already placed at rank "<<rank<<std::endl;
auto data =dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
MPIHelper::sendObjectIdentifier(data.identifier, rank);
// std::cout<<"Identifier ( "<<data.identifier<< " ) has been send to " <<(rank+1)<<std::endl;
continue;
}
targetGroup.push_back(rank);
}
if((int)targetGroup.size()==MPIHelper::getCommSize() - 1){ // exclude coordinator
MPIHelper::sendData(messageLength, dataToSend);
MPIHelper::sendData(messageLength, dataToSend.data());
// std::cout<<"data has been send to all "<<std::endl;
}
else{
for(int i=0;i<(int)targetGroup.size();i++){
MPIHelper::distributeData(messageLength, dataToSend, targetGroup.at(i));
MPIHelper::distributeData(messageLength, dataToSend.data(), targetGroup.at(i));
//std::cout<<"data has been send to rank "<<targetGroup.at(i)<<std::endl;
}
}
free(dataToSend);
for(int i=0;i<(int)targetGroup.size();i++)
{
//std::cout<<"From broadcast waiting for ack " << std::endl;

int rank=targetGroup.at(i);
if (rank==COORDINATOR)
int rank = targetGroup.at(i);
if (rank == COORDINATOR)
{

// std::cout<<"coordinator doe not need ack from itself" << std::endl;
continue;
}
WorkerImpl::StoredInfo dataAcknowledgement=MPIHelper::getDataAcknowledgement(&rank);
WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank);
//std::cout<<"received ack form worker " << rank<<std::endl;
std::string address=std::to_string(rank);
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
Expand Down Expand Up @@ -148,22 +154,21 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC, DT>

distributed::Data protoMsg;

double *val;
std::vector<char> buffer;
if (isScalar) {
auto ptr = (double*)(&mat);
val = ptr;
auto protoVal = protoMsg.mutable_value();
protoVal->set_f64(*val);
auto ptr = (double*)(&mat);
double val = *ptr;
auto length = DaphneSerializer<double>::serialize(val, buffer);
protoMsg.set_bytes(buffer.data(), length);

// Need matrix for metadata, type of matrix does not really matter.
mat = DataObjectFactory::create<DenseMatrix<double>>(0, 0, false);
}
else { // Not scalar
assert(mat != nullptr && "Matrix to broadcast is nullptr");
auto denseMat = dynamic_cast<const DenseMatrix<double>*>(mat);
if (!denseMat){
throw std::runtime_error("Distribute grpc only supports DenseMatrix<double> for now");
}
ProtoDataConverter<DenseMatrix<double>>::convertToProto(denseMat, protoMsg.mutable_matrix());
// DT is const Structure, but we only provide template specialization for structure.
// TODO should we implement an additional specialization or remove constness from template parameter?
size_t length = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(mat, buffer);
protoMsg.set_bytes(buffer.data(), length);
}
LoadPartitioningDistributed<DT, AllocationDescriptorGRPC> partioner(DistributionSchema::BROADCAST, mat, dctx);

Expand Down
32 changes: 12 additions & 20 deletions src/runtime/distributed/coordinator/kernels/Distribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
#include <runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h>

#include <runtime/local/datastructures/AllocationDescriptorGRPC.h>
#include <runtime/distributed/proto/ProtoDataConverter.h>
#include <runtime/distributed/proto/DistributedGRPCCaller.h>
#include <runtime/distributed/worker/WorkerImpl.h>

#ifdef USE_MPI
#include <runtime/distributed/worker/MPISerializer.h>
#include <runtime/distributed/worker/MPIHelper.h>
#endif

Expand Down Expand Up @@ -66,7 +64,7 @@ template<class DT>
struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT>
{
static void apply(DT *mat, DCTX(dctx)) {
void *dataToSend;
std::vector<char> dataToSend;
std::vector<int> targetGroup;

LoadPartitioningDistributed<DT, AllocationDescriptorMPI> partioner(DistributionSchema::DISTRIBUTE, mat, dctx);
Expand All @@ -84,11 +82,10 @@ struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT>
//std::cout<<"Identifier ( "<<data.identifier<< " ) has been send to " <<(rank+1)<<std::endl;
continue;
}
size_t messageLength;
MPISerializer::serializeStructure<DT>(&dataToSend, mat ,false, &messageLength, dp->range->r_start, dp->range->r_len, dp->range->c_start, dp->range->c_len);
MPIHelper::distributeData(messageLength, dataToSend,rank);
targetGroup.push_back(rank);
free(dataToSend);
auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto len = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(slicedMat, dataToSend);
MPIHelper::distributeData(len, dataToSend.data(),rank);
targetGroup.push_back(rank);
}
for(size_t i=0;i<targetGroup.size();i++)
{
Expand Down Expand Up @@ -140,19 +137,14 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC, DT>
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
continue;
distributed::Data protoMsg;


// TODO: We need to handle different data types
// (this will be simplified when serialization is implemented)
auto denseMat = dynamic_cast<const DenseMatrix<double>*>(mat);
if (!denseMat){
throw std::runtime_error("Distribute grpc only supports DenseMatrix<double> for now");
}
ProtoDataConverter<DenseMatrix<double>>::convertToProto(denseMat, protoMsg.mutable_matrix(),
dp->range->r_start,
dp->range->r_start + dp->range->r_len,
dp->range->c_start,
dp->range->c_start + dp->range->c_len);
std::vector<char> buffer;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
// DT is const Structure, but we only provide template specialization for structure.
// TODO should we implement an additional specialization or remove constness from template parameter?
auto length = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(slicedMat, buffer);
protoMsg.set_bytes(buffer.data(), length);

StoredInfo storedInfo({dp->dp_id});
caller.asyncStoreCall(dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation(), storedInfo, protoMsg);
Expand Down
43 changes: 25 additions & 18 deletions src/runtime/distributed/coordinator/kernels/DistributedCollect.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* limitations under the License.
*/

#ifndef SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOLLECT_H
#define SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOLLECT_H
#pragma once

#include <runtime/local/context/DaphneContext.h>
#include <runtime/local/datastructures/DataObjectFactory.h>
#include <runtime/local/datastructures/DenseMatrix.h>

#include <runtime/local/datastructures/AllocationDescriptorGRPC.h>
#include <runtime/distributed/proto/ProtoDataConverter.h>
#include <runtime/local/io/DaphneSerializer.h>
#include <runtime/distributed/proto/DistributedGRPCCaller.h>
#include <runtime/distributed/proto/worker.pb.h>
#include <runtime/distributed/proto/worker.grpc.pb.h>
Expand Down Expand Up @@ -78,11 +77,11 @@ struct DistributedCollect<ALLOCATION_TYPE::DIST_MPI, DT>
//if(rank==COORDINATOR)
// continue;
int target_rank;
distributed::Data protoMessage=MPIHelper::getResults(&target_rank);
std::vector<char> buffer = MPIHelper::getResults(&target_rank);
std::string address = std::to_string(target_rank);
auto dp=mat->getMetaDataObject()->getDataPlacementByLocation(address);
auto distributedData = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
if(std::stoi(address)==COORDINATOR)
if(std::stoi(address) == COORDINATOR)
continue;
//std::cout<<"from distributed collect address " <<address<< " rows from "<< dp->range->r_start<< " to "<< (dp->range->r_start + dp->range->r_len) <<" cols from " << dp->range->c_start << " to " << (dp->range->c_start + dp->range->c_len) <<std::endl;
auto data = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
Expand All @@ -92,14 +91,17 @@ struct DistributedCollect<ALLOCATION_TYPE::DIST_MPI, DT>
throw std::runtime_error("Distribute grpc only supports DenseMatrix<double> for now");
}

//ProtoDataConverter<DenseMatrix<double>>::convertFromProto(protoMessage.matrix(),toDisplay);
//std::string message="coordinator got the following from (" + address +") ";
//MPIHelper::displayDataStructure(toDisplay,message);

ProtoDataConverter<DenseMatrix<double>>::convertFromProto(
protoMessage.matrix(), denseMat,
dp->range->r_start, dp->range->r_start + dp->range->r_len,
dp->range->c_start, dp->range->c_start + dp->range->c_len);
auto slicedMat = dynamic_cast<DenseMatrix<double>*>(DF_deserialize(buffer));
auto resValues = denseMat->getValues() + (dp->range->r_start * denseMat->getRowSkip());
auto slicedMatValues = slicedMat->getValues();
for (size_t r = 0; r < dp->range->r_len; r++) {
memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double));
resValues += denseMat->getRowSkip();
slicedMatValues += slicedMat->getRowSkip();
}
collectedDataItems+= dp->range->r_len * dp->range->c_len;
data.isPlacedAtWorker = false;
dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).updateDistributedData(data);
Expand All @@ -125,7 +127,7 @@ struct DistributedCollect<ALLOCATION_TYPE::DIST_GRPC, DT>
struct StoredInfo{
size_t dp_id;
};
DistributedGRPCCaller<StoredInfo, distributed::StoredData, distributed::Matrix> caller;
DistributedGRPCCaller<StoredInfo, distributed::StoredData, distributed::Data> caller;


auto dpVector = mat->getMetaDataObject()->getDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC);
Expand All @@ -152,18 +154,23 @@ struct DistributedCollect<ALLOCATION_TYPE::DIST_GRPC, DT>

auto matProto = response.result;

// TODO: We need to handle different data types
auto denseMat = dynamic_cast<DenseMatrix<double>*>(mat);
if (!denseMat){
throw std::runtime_error("Distribute grpc only supports DenseMatrix<double> for now");
}
ProtoDataConverter<DenseMatrix<double>>::convertFromProto(
matProto, denseMat,
dp->range->r_start, dp->range->r_start + dp->range->r_len,
dp->range->c_start, dp->range->c_start + dp->range->c_len);
}
// Zero copy buffer
std::vector<char> buf(static_cast<const char*>(matProto.bytes().data()), static_cast<const char*>(matProto.bytes().data()) + matProto.bytes().size());
auto slicedMat = dynamic_cast<DenseMatrix<double>*>(DF_deserialize(buf));
auto resValues = denseMat->getValues() + (dp->range->r_start * denseMat->getRowSkip());
auto slicedMatValues = slicedMat->getValues();
for (size_t r = 0; r < dp->range->r_len; r++){
memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double));
resValues += denseMat->getRowSkip();
slicedMatValues += slicedMat->getRowSkip();
}
data.isPlacedAtWorker = false;
dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).updateDistributedData(data);
}
};
};

#endif //SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOLLECT_H
28 changes: 10 additions & 18 deletions src/runtime/distributed/coordinator/kernels/DistributedCompute.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include <runtime/distributed/proto/worker.pb.h>
#include <runtime/distributed/proto/worker.grpc.pb.h>
#include <runtime/distributed/proto/ProtoDataConverter.h>
#include <runtime/distributed/proto/DistributedGRPCCaller.h>
#include <runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h>
#ifdef USE_MPI
Expand Down Expand Up @@ -80,30 +79,23 @@ struct DistributedCompute<ALLOCATION_TYPE::DIST_MPI, DTRes, const Structure>

LoadPartitioningDistributed<DTRes, AllocationDescriptorMPI>::SetOutputsMetadata(res, numOutputs, vectorCombine, dctx);

std::vector<char> taskBuffer;
void *taskToSend;
size_t messageLengths[worldSize];
std::vector<char> taskBuffer;
for (int rank=0;rank<worldSize;rank++) // we currently exclude the coordinator
{

distributed::Task task;
MPIHelper::Task task;
std::string addr= std::to_string(rank+1);
for (size_t i = 0; i < numOutputs; i++)
for (size_t i = 0; i < numInputs; i++)
{
auto dp = args[i]->getMetaDataObject()->getDataPlacementByLocation(addr);
auto distrData = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
distributed::StoredData protoData;
//std::cout<<"identifier " << distrData.identifier<<std::endl;
protoData.set_identifier(distrData.identifier);
protoData.set_num_cols(distrData.numCols);
protoData.set_num_rows(distrData.numRows);

*task.add_inputs()->mutable_stored() = protoData;

MPIHelper::StoredInfo storedData({distrData.identifier, distrData.numRows, distrData.numCols});
task.inputs.push_back(storedData);
}
task.set_mlir_code(mlirCode);
MPISerializer::serializeTask(&taskToSend, &messageLengths[rank], &task);
MPIHelper::distributeTask(messageLengths[rank], taskToSend,rank+1);
free(taskToSend);
task.mlir_code = mlirCode;
task.serialize(taskBuffer);
auto len = task.sizeInBytes();
MPIHelper::distributeTask(len, taskBuffer.data(),rank+1);
}

}
Expand Down
10 changes: 0 additions & 10 deletions src/runtime/distributed/proto/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,3 @@ set(LIBS Proto)

add_library(CallData ${SOURCES})
target_link_libraries(CallData PRIVATE ${LIBS})

# *****************************************************************************
# ProtoDataConverter library
# *****************************************************************************

set(SOURCES ProtoDataConverter.cpp)
set(LIBS DataStructures Proto)

add_library(ProtoDataConverter ${SOURCES})
target_link_libraries(ProtoDataConverter PRIVATE ${LIBS})
4 changes: 2 additions & 2 deletions src/runtime/distributed/proto/CallData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ void TransferCallData::Proceed() {

new TransferCallData(worker, cq_);

grpc::Status status = worker->TransferGRPC(&ctx_, &storedData, &matrix);
grpc::Status status = worker->TransferGRPC(&ctx_, &storedData, &data);

responder_.Finish(matrix, status, this);
responder_.Finish(data, status, this);
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/distributed/proto/CallData.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ class TransferCallData final : public CallData
// What we get from the client.
distributed::StoredData storedData;
// What we send back to the client.
distributed::Matrix matrix;
distributed::Data data;
// The means to get back to the client.
grpc::ServerAsyncResponseWriter<distributed::Matrix> responder_;
grpc::ServerAsyncResponseWriter<distributed::Data> responder_;

// Let's implement a tiny state machine with the following states.
enum CallStatus
Expand Down
Loading

0 comments on commit 478ae7c

Please sign in to comment.