diff --git a/src/api/cli/DaphneUserConfig.h b/src/api/cli/DaphneUserConfig.h index d33a18e30..6738dd755 100644 --- a/src/api/cli/DaphneUserConfig.h +++ b/src/api/cli/DaphneUserConfig.h @@ -39,6 +39,7 @@ struct DaphneUserConfig { bool use_cuda = false; bool use_vectorized_exec = false; bool use_distributed = false; + bool use_grpc_async = false; bool use_obj_ref_mgnt = true; bool use_ipa_const_propa = true; bool use_phy_op_selection = true; diff --git a/src/runtime/distributed/coordinator/kernels/Broadcast.h b/src/runtime/distributed/coordinator/kernels/Broadcast.h index 1d34470c6..80004acab 100644 --- a/src/runtime/distributed/coordinator/kernels/Broadcast.h +++ b/src/runtime/distributed/coordinator/kernels/Broadcast.h @@ -76,9 +76,9 @@ template struct Broadcast { LoadPartitioningDistributed partioner(DistributionSchema::BROADCAST, mat, dctx); while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); - auto rank = dynamic_cast(*(dp->allocation)).getRank(); + auto rank = dynamic_cast(dp->getAllocation(0))->getRank(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; // Minimum chunk size @@ -114,12 +114,13 @@ template struct Broadcast { WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank); std::string address = std::to_string(rank); DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = dataAcknowledgement.identifier; data.numRows = dataAcknowledgement.numRows; data.numCols = dataAcknowledgement.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + auto alloc = dynamic_cast(dp->getAllocation(0)); + alloc->updateDistributedData(data); } } }; @@ -150,12 +151,12 @@ template struct Broadcast { while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto address = dynamic_cast(*(dp->allocation)).getLocation(); + auto address = dynamic_cast(dp->getAllocation(0))->getLocation(); - StoredInfo storedInfo({dp->dp_id}); + StoredInfo storedInfo({dp->getID()}); caller.asyncStoreCall(address, storedInfo); // Minimum chunk size auto min_chunk_size = @@ -187,7 +188,7 @@ template struct Broadcast { auto dp_id = response.storedInfo.dp_id; auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); auto storedData = response.result; data.identifier = storedData.identifier(); @@ -195,7 +196,7 @@ template struct Broadcast { data.numCols = storedData.num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }; }; @@ -222,10 +223,10 @@ template struct Broadcast { while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto workerAddr = dynamic_cast(*(dp->allocation)).getLocation(); + auto workerAddr = dynamic_cast(dp->getAllocation(0))->getLocation(); std::thread t([=, &mat]() { // TODO Consider saving channels inside DaphneContext grpc::ChannelArguments ch_args; @@ -260,7 +261,7 @@ template struct Broadcast { newData.numRows = storedData.num_rows(); newData.numCols = storedData.num_cols(); newData.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(newData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(newData); }); threads_vector.push_back(move(t)); } diff --git a/src/runtime/distributed/coordinator/kernels/Distribute.h b/src/runtime/distributed/coordinator/kernels/Distribute.h index 0d734395b..cb32266c3 100644 --- a/src/runtime/distributed/coordinator/kernels/Distribute.h +++ b/src/runtime/distributed/coordinator/kernels/Distribute.h @@ -63,13 +63,12 @@ template struct Distribute { while (partioner.HasNextChunk()) { DataPlacement *dp = partioner.GetNextChunk(); - auto rank = dynamic_cast(*(dp->allocation)).getRank(); + auto rank = dynamic_cast(dp->getAllocation(0))->getRank(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); - + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); // Minimum chunk size auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer
::length(slicedMat) @@ -91,12 +90,12 @@ template struct Distribute { WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank); std::string address = std::to_string(rank); DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = dataAcknowledgement.identifier; data.numRows = dataAcknowledgement.numRows; data.numCols = dataAcknowledgement.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } }; @@ -121,17 +120,17 @@ template struct Distribute { while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); // Skip if already placed at workers - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; distributed::Data protoMsg; std::vector buffer; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); - StoredInfo storedInfo({dp->dp_id}); + StoredInfo storedInfo({dp->getID()}); - auto address = dynamic_cast(*(dp->allocation)).getLocation(); + auto address = dynamic_cast(dp->getAllocation(0))->getLocation(); caller.asyncStoreCall(address, storedInfo); // Minimum chunk size @@ -161,12 +160,12 @@ template struct Distribute { auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = storedData.identifier(); data.numRows = storedData.num_rows(); data.numCols = storedData.num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } }; @@ -188,22 +187,21 @@ template struct Distribute { while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); // Skip if already placed at workers - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; std::vector buffer; - auto workerAddr = dynamic_cast(*(dp->allocation)).getLocation(); + auto workerAddr = dynamic_cast(dp->getAllocation(0))->getLocation(); std::thread t([=, &mat]() { auto stub = ctx->stubs[workerAddr].get(); distributed::StoredData storedData; grpc::ClientContext grpc_ctx; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); auto serializer = DaphneSerializerChunks
(slicedMat, dctx->config.max_distributed_serialization_chunk_size); - distributed::Data protoMsg; // Send chunks @@ -222,10 +220,10 @@ template struct Distribute { newData.numRows = storedData.num_rows(); newData.numCols = storedData.num_cols(); newData.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(newData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(newData); DataObjectFactory::destroy(slicedMat); }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/kernels/DistributedCollect.h b/src/runtime/distributed/coordinator/kernels/DistributedCollect.h index cee446f17..2432c82c0 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedCollect.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedCollect.h @@ -61,6 +61,7 @@ template void distributedCollect(DT *&mat, const // ---------------------------------------------------------------------------- #ifdef USE_MPI template struct DistributedCollect { + static void apply(DT *&mat, const VectorCombine &combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already " @@ -73,7 +74,7 @@ template struct DistributedCollect { std::string address = std::to_string(rank); auto dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); WorkerImpl::StoredInfo info = {distributedData.identifier, distributedData.numRows, distributedData.numCols}; MPIHelper::requestData(rank, info); @@ -98,21 +99,22 @@ template struct DistributedCollect { if (combine == VectorCombine::ADD) { ewBinaryMat(BinaryOpCode::ADD, denseMat, slicedMat, denseMat, nullptr); } else { - auto resValues = denseMat->getValues() + (dp->range->r_start * denseMat->getRowSkip()); + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++) { - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); + for (size_t r = 0; r < dp->getRange()->r_len; r++) { + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, + dp->getRange()->c_len * sizeof(double)); resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } } DataObjectFactory::destroy(slicedMat); - collectedDataItems += dp->range->r_len * dp->range->c_len; + collectedDataItems += dp->getRange()->r_len * dp->getRange()->c_len; - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributedData.isPlacedAtWorker = false; - dynamic_cast(*(dp->allocation)).updateDistributedData(distributedData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(distributedData); // this is to handle the case when not all workers participate in // the computation, i.e., number of workers is larger than of the // work items @@ -128,6 +130,7 @@ template struct DistributedCollect { // ---------------------------------------------------------------------------- template struct DistributedCollect { + static void apply(DT *&mat, const VectorCombine &combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already " @@ -139,12 +142,12 @@ template struct DistributedCollect caller(dctx); - auto dpVector = mat->getMetaDataObject()->getDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); + auto dpVector = mat->getMetaDataObject()->getRangeDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); for (auto &dp : *dpVector) { - auto address = dp->allocation->getLocation(); + auto address = dp->getAllocation(0)->getLocation(); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); - StoredInfo storedInfo({dp->dp_id}); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); + StoredInfo storedInfo({dp->getID()}); distributed::StoredData protoData; protoData.set_identifier(distributedData.identifier); protoData.set_num_rows(distributedData.numRows); @@ -157,7 +160,7 @@ template struct DistributedCollectgetMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); auto matProto = response.result; @@ -174,10 +177,11 @@ template struct DistributedCollectgetValues() + (dp->range->r_start * denseMat->getRowSkip()); + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++) { - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); + for (size_t r = 0; r < dp->getRange()->r_len; r++) { + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, + dp->getRange()->c_len * sizeof(double)); resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } @@ -185,7 +189,7 @@ template struct DistributedCollect(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }; }; @@ -195,6 +199,7 @@ template struct DistributedCollect struct DistributedCollect { + static void apply(DT *&mat, const VectorCombine &combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already " @@ -205,11 +210,11 @@ template struct DistributedCollect threads_vector; std::mutex lock; - auto dpVector = mat->getMetaDataObject()->getDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); + auto dpVector = mat->getMetaDataObject()->getRangeDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); for (auto &dp : *dpVector) { - auto address = dp->allocation->getLocation(); + auto address = dp->getAllocation(0)->getLocation(); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distributedData.identifier); protoData.set_num_rows(distributedData.numRows); @@ -236,19 +241,20 @@ template struct DistributedCollectgetValues() + (dp->range->r_start * denseMat->getRowSkip()); + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++) { - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); + for (size_t r = 0; r < dp->getRange()->r_len; r++) { + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, + dp->getRange()->c_len * sizeof(double)); resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } } DataObjectFactory::destroy(slicedMat); distributedData.isPlacedAtWorker = false; - dynamic_cast(*(dp->allocation)).updateDistributedData(distributedData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(distributedData); }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/kernels/DistributedCompute.h b/src/runtime/distributed/coordinator/kernels/DistributedCompute.h index f909906c2..508303c37 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedCompute.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedCompute.h @@ -14,8 +14,7 @@ * limitations under the License. */ -#ifndef SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOMPUTE_H -#define SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOMPUTE_H +#pragma once #include #include @@ -75,7 +74,7 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); MPIHelper::StoredInfo storedData({distrData.identifier, distrData.numRows, distrData.numCols}); task.inputs.push_back(storedData); @@ -94,12 +93,12 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(std::to_string(rank)); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = info.identifier; data.numRows = info.numRows; data.numCols = info.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } } @@ -126,13 +125,13 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distrData.identifier); @@ -143,7 +142,7 @@ template struct DistributedCompute struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = computeResult.outputs()[o].stored().identifier(); data.numRows = computeResult.outputs()[o].stored().num_rows(); data.numCols = computeResult.outputs()[o].stored().num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } } @@ -192,13 +191,13 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distrData.identifier); @@ -222,19 +221,17 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = computeResult.outputs()[o].stored().identifier(); data.numRows = computeResult.outputs()[o].stored().num_rows(); data.numCols = computeResult.outputs()[o].stored().num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); } }; - -#endif // SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOMPUTE_H diff --git a/src/runtime/distributed/coordinator/kernels/DistributedRead.h b/src/runtime/distributed/coordinator/kernels/DistributedRead.h index 0ab6f72a5..1a0fa2e33 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedRead.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedRead.h @@ -68,7 +68,9 @@ template void distributedRead(DTRes *&res, const char *filename, D // MPI // ---------------------------------------------------------------------------- template struct DistributedRead { - static void apply(DTRes *&res, const char *filename, DCTX(dctx)) { throw std::runtime_error("not implemented"); } + static void apply(DTRes *&res, const char *filename, DCTX(dctx)) { + throw std::runtime_error("DistributedRead not implemented"); + } }; #endif @@ -77,7 +79,9 @@ template struct DistributedRead // ---------------------------------------------------------------------------- template struct DistributedRead { - static void apply(DTRes *&res, const char *filename, DCTX(dctx)) { throw std::runtime_error("not implemented"); } + static void apply(DTRes *&res, const char *filename, DCTX(dctx)) { + throw std::runtime_error("DistributedRead not implemented"); + } }; // ---------------------------------------------------------------------------- @@ -96,39 +100,45 @@ template struct DistributedRead threads_vector; LoadPartitioningDistributed partioner(DistributionSchema::DISTRIBUTE, res, dctx); + auto hdfsFn = std::string(filename); + while (partioner.HasNextChunk()) { - auto hdfsFn = std::string(filename); auto dp = partioner.GetNextChunk(); - auto workerAddr = dynamic_cast(dp->allocation.get())->getLocation(); - std::thread t([=, &res]() { - auto stub = ctx->stubs[workerAddr].get(); - - distributed::HDFSFile fileData; - fileData.set_filename(hdfsFn); - fileData.set_start_row(dp->range->r_start); - fileData.set_num_rows(dp->range->r_len); - fileData.set_num_cols(dp->range->c_len); - - grpc::ClientContext grpc_ctx; - distributed::StoredData response; - - auto status = stub->ReadHDFS(&grpc_ctx, fileData, &response); - if (!status.ok()) - throw std::runtime_error(status.error_message()); - - DistributedData newData; - newData.identifier = response.identifier(); - newData.numRows = response.num_rows(); - newData.numCols = response.num_cols(); - newData.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(newData); - }); - threads_vector.push_back(move(t)); + if (auto grpc_alloc = dynamic_cast(dp->getAllocation(0))) { + auto workerAddr = grpc_alloc->getLocation(); + std::thread t([=, &res]() { + auto stub = ctx->stubs[workerAddr].get(); + + distributed::HDFSFile fileData; + fileData.set_filename(hdfsFn); + fileData.set_start_row(dp->getRange()->r_start); + fileData.set_num_rows(dp->getRange()->r_len); + fileData.set_num_cols(dp->getRange()->c_len); + + grpc::ClientContext grpc_ctx; + distributed::StoredData response; + + auto status = stub->ReadHDFS(&grpc_ctx, fileData, &response); + if (!status.ok()) + throw std::runtime_error(status.error_message()); + + DistributedData newData; + newData.identifier = response.identifier(); + newData.numRows = response.num_rows(); + newData.numCols = response.num_cols(); + newData.isPlacedAtWorker = true; + grpc_alloc->updateDistributedData(newData); + }); + threads_vector.push_back(move(t)); + } else + throw std::runtime_error("dynamic_cast(alloc) failed (returned nullptr)"); } for (auto &thread : threads_vector) thread.join(); +#else + throw std::runtime_error("DistributedRead not implemented"); #endif } }; diff --git a/src/runtime/distributed/coordinator/kernels/DistributedWrite.h b/src/runtime/distributed/coordinator/kernels/DistributedWrite.h index 3e4488520..248cd4d62 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedWrite.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedWrite.h @@ -118,41 +118,45 @@ template struct DistributedWritegetMetaDataObject()->getDataPlacementByLocation(workerAddr))) { - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); - if (data.isPlacedAtWorker) { - std::thread t([=, &mat]() { - auto stub = ctx->stubs[workerAddr].get(); - - distributed::HDFSWriteInfo fileData; - fileData.mutable_matrix()->set_identifier(data.identifier); - fileData.mutable_matrix()->set_num_rows(data.numRows); - fileData.mutable_matrix()->set_num_cols(data.numCols); - - fileData.set_dirname(hdfsfilename.c_str()); - fileData.set_segment(std::to_string(chunkId).c_str()); - - grpc::ClientContext grpc_ctx; - distributed::Empty empty; - - auto status = stub->WriteHDFS(&grpc_ctx, fileData, &empty); - if (!status.ok()) - throw std::runtime_error(status.error_message()); - }); - threads_vector.push_back(move(t)); - } else { - auto slicedMat = - mat->sliceRow(dp->range.get()->r_start, dp->range.get()->r_start + dp->range.get()->r_len); - if (extension == ".csv") { - writeHDFSCsv(slicedMat, hdfsfilename.c_str(), dctx); - } else if (extension == ".dbdf") { - writeDaphneHDFS(slicedMat, hdfsfilename.c_str(), dctx); + if (auto grpc_alloc = dynamic_cast(dp->getAllocation(0))) { + auto data = grpc_alloc->getDistributedData(); + + if (data.isPlacedAtWorker) { + std::thread t([=, &mat]() { + auto stub = ctx->stubs[workerAddr].get(); + + distributed::HDFSWriteInfo fileData; + fileData.mutable_matrix()->set_identifier(data.identifier); + fileData.mutable_matrix()->set_num_rows(data.numRows); + fileData.mutable_matrix()->set_num_cols(data.numCols); + + fileData.set_dirname(hdfsfilename.c_str()); + fileData.set_segment(std::to_string(chunkId).c_str()); + + grpc::ClientContext grpc_ctx; + distributed::Empty empty; + + auto status = stub->WriteHDFS(&grpc_ctx, fileData, &empty); + if (!status.ok()) + throw std::runtime_error(status.error_message()); + }); + threads_vector.push_back(move(t)); + } else { + auto slicedMat = + mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); + if (extension == ".csv") { + writeHDFSCsv(slicedMat, hdfsfilename.c_str(), dctx); + } else if (extension == ".dbdf") { + writeDaphneHDFS(slicedMat, hdfsfilename.c_str(), dctx); + } } + } else { + continue; } - } else { - continue; - } - // TODO we should also store ranges that did not have a - // dataplacement associated with them + // TODO we should also store ranges that did not have a + // dataplacement associated with them + } else + throw std::runtime_error("dynamic_cast(alloc) failed (returned nullptr)"); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h b/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h index a05a4bc30..362fe9c92 100644 --- a/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h +++ b/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h @@ -16,7 +16,8 @@ #pragma once -#include +#include + #include #include #include @@ -54,8 +55,7 @@ template class LoadPartitioningDistributed { // Here we provide the different implementations. // Another solution would be to make sure that every constructor is similar // so this would not be needed. - static ALLOCATOR CreateAllocatorDescriptor(DaphneContext *ctx, const std::string &addr, - const DistributedData &data) { + static ALLOCATOR CreateAllocatorDescriptor(DaphneContext* ctx, const std::string &addr, DistributedData& data) { if constexpr (std::is_same_v) return AllocationDescriptorMPI(std::stoi(addr), ctx, data); else if constexpr (std::is_same_v) @@ -106,25 +106,25 @@ template class LoadPartitioningDistributed { DataPlacement *dp; if ((dp = mat->getMetaDataObject()->getDataPlacementByLocation(workerAddr))) { - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); // Check if existing placement matches the same ranges we currently // need if (data.isPlacedAtWorker) { - auto existingRange = dp->range.get(); + auto existingRange = dp->getRange(); if (*existingRange == range) data.isPlacedAtWorker = true; else { - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); + mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); data.isPlacedAtWorker = false; } } else - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); + mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); // TODO Currently we do not support distributing/splitting // by columns. When we do, this should be changed (e.g. Index(0, // taskIndex)) This can be decided based on DistributionSchema data.ix = GetDistributedIndex(); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } else { // Else, create new object metadata entry DistributedData data; // TODO Currently we do not support distributing/splitting @@ -132,7 +132,9 @@ template class LoadPartitioningDistributed { // taskIndex)) data.ix = GetDistributedIndex(); auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data); - dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + dp = mat->getMetaDataObject()->addDataPlacement(allocations, &range); } taskIndex++; return dp; @@ -192,11 +194,13 @@ template class LoadPartitioningDistributed { // If dp already exists for this worker, update the range and // data if (auto dp = (*outputs[i])->getMetaDataObject()->getDataPlacementByLocation(workerAddr)) { - (*outputs[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + (*outputs[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } else { // else create new dp entry auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data); - ((*outputs[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + ((*outputs[i]))->getMetaDataObject()->addDataPlacement(allocations, &range); } } } diff --git a/src/runtime/distributed/worker/MPICoordinator.h b/src/runtime/distributed/worker/MPICoordinator.h index 4563af582..5162b6f45 100644 --- a/src/runtime/distributed/worker/MPICoordinator.h +++ b/src/runtime/distributed/worker/MPICoordinator.h @@ -95,11 +95,14 @@ class MPICoordinator { std::string addr = std::to_string(COORDINATOR); // If dp already exists for this worker, update the range and data if (auto dp = (*res[i])->getMetaDataObject()->getDataPlacementByLocation(addr)) { - (*res[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + (*res[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); + } else { // else create new dp entry - AllocationDescriptorMPI allocationDescriptor(dctx, COORDINATOR, data); - ((*res[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + AllocationDescriptorMPI allocationDescriptor(COORDINATOR, dctx, data); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + ((*res[i]))->getMetaDataObject()->addDataPlacement(allocations, &range); } } // solver.Compute(&outputsStoredInfo, inputsStoredInfo, mlirCode); diff --git a/src/runtime/distributed/worker/main.cpp b/src/runtime/distributed/worker/main.cpp index ec9eeb6f9..a77cc8193 100644 --- a/src/runtime/distributed/worker/main.cpp +++ b/src/runtime/distributed/worker/main.cpp @@ -21,25 +21,28 @@ #include "WorkerImplGRPCSync.h" #include +// global logger handle for this executable +static std::unique_ptr daphneLogger; + int main(int argc, char *argv[]) { DaphneUserConfig user_config{}; std::string configFile = "WorkerConfig.json"; - if (argc == 3) { + // initialize logging facility + daphneLogger = std::make_unique(user_config); + auto log = daphneLogger->getDefaultLogger(); + + if (argc == 3) configFile = argv[2]; - } - try { - if (ConfigParser::fileExists(configFile)) { + if (ConfigParser::fileExists(configFile)) { + try { ConfigParser::readUserConfig(configFile, user_config); + } catch (std::exception &e) { + log->warn("Parser error while reading worker config from {}:\n{}", configFile, e.what()); } - } catch (std::exception &e) { - spdlog::error("Parser error while reading worker config:\n{}", e.what()); - spdlog::error("You can create a WorkerConfig.json to configure the worker.\n"); } - user_config.resolveLibDir(); - auto logger = std::make_unique(user_config); if (argc < 2 || argc > 3) { std::cout << "Usage: " << argv[0] << " [ConfigFile]" << std::endl; @@ -47,11 +50,13 @@ int main(int argc, char *argv[]) { } auto addr = argv[1]; - // TODO choose specific implementation based on arguments or config file - WorkerImpl *service = new WorkerImplGRPCSync(addr, user_config); + std::unique_ptr service; + if (user_config.use_grpc_async) + service = std::make_unique(addr, user_config); + else + service = std::make_unique(addr, user_config); - std::cout << "Started Distributed Worker on `" << addr << "`\n"; + log->info(fmt::format("Started Distributed Worker on {}", addr)); service->Wait(); - return 0; } \ No newline at end of file diff --git a/src/runtime/local/context/CUDAContext.cpp b/src/runtime/local/context/CUDAContext.cpp index dddd530bb..c56012383 100644 --- a/src/runtime/local/context/CUDAContext.cpp +++ b/src/runtime/local/context/CUDAContext.cpp @@ -52,7 +52,6 @@ void CUDAContext::init() { logger->info("Using CUDA device {}: {}\n\tAvailable mem: {} Total mem: {} " "using {}% -> {}", device_id, device_properties.name, available, total, mem_usage * 100, mem_budget); - CHECK_CUBLAS(cublasCreate(&cublas_handle)); CHECK_CUSPARSE(cusparseCreate(&cusparse_handle)); CHECK_CUDNN(cudnnCreate(&cudnn_handle)); @@ -69,7 +68,6 @@ void CUDAContext::init() { CHECK_CUSOLVER(cusolverDnSetStream(cusolver_handle, cusolver_stream)); getCUDNNWorkspace(64 * 1024 * 1024); - // CHECK_CUBLAS(cublasLtCreate(&cublaslt_Handle)); // CHECK_CUDART(cudaMalloc(&cublas_workspace, cublas_workspace_size)); } @@ -134,4 +132,4 @@ void CUDAContext::free(size_t id) { allocations.erase(id); } -int CUDAContext::getMaxNumThreads() { return device_properties.maxThreadsPerBlock; } +int CUDAContext::getMaxNumThreads() const { return device_properties.maxThreadsPerBlock; } diff --git a/src/runtime/local/context/CUDAContext.h b/src/runtime/local/context/CUDAContext.h index 15c03d9a2..faf6882d6 100644 --- a/src/runtime/local/context/CUDAContext.h +++ b/src/runtime/local/context/CUDAContext.h @@ -77,7 +77,7 @@ class CUDAContext final : public IContext { void *getCUDNNWorkspace(size_t size); [[nodiscard]] size_t getMemBudget() const { return mem_budget; } - int getMaxNumThreads(); + int getMaxNumThreads() const; static CUDAContext *get(DaphneContext *ctx, size_t id) { return dynamic_cast(ctx->getCUDAContext(id)); } diff --git a/src/runtime/local/datastructures/AllocationDescriptorCUDA.h b/src/runtime/local/datastructures/AllocationDescriptorCUDA.h index 48fb9c099..bd79c175e 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorCUDA.h +++ b/src/runtime/local/datastructures/AllocationDescriptorCUDA.h @@ -28,8 +28,6 @@ class AllocationDescriptorCUDA : public IAllocationDescriptor { size_t alloc_id{}; public: - AllocationDescriptorCUDA() = delete; - AllocationDescriptorCUDA(DaphneContext *ctx, uint32_t device_id) : device_id(device_id), dctx(ctx) {} ~AllocationDescriptorCUDA() override { @@ -45,9 +43,12 @@ class AllocationDescriptorCUDA : public IAllocationDescriptor { // [[nodiscard]] uint32_t getLocation() const { return device_id; } [[nodiscard]] std::string getLocation() const override { return std::to_string(device_id); } - void createAllocation(size_t size, bool zero) override { + [[nodiscard]] std::unique_ptr createAllocation(size_t size, bool zero) const override { + auto new_alloc = std::make_unique(dctx, device_id); auto ctx = CUDAContext::get(dctx, device_id); - data = ctx->malloc(size, zero, alloc_id); + new_alloc->data = ctx->malloc(size, zero, new_alloc->alloc_id); + new_alloc->size = size; + return new_alloc; } std::shared_ptr getData() override { return data; } diff --git a/src/runtime/local/datastructures/AllocationDescriptorGRPC.h b/src/runtime/local/datastructures/AllocationDescriptorGRPC.h index 00e0a7658..0eb7aa147 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorGRPC.h +++ b/src/runtime/local/datastructures/AllocationDescriptorGRPC.h @@ -33,15 +33,17 @@ class AllocationDescriptorGRPC : public IAllocationDescriptor { std::shared_ptr data; public: - AllocationDescriptorGRPC(){}; + AllocationDescriptorGRPC() {}; AllocationDescriptorGRPC(DaphneContext *ctx, const std::string &address, const DistributedData &data) - : ctx(ctx), workerAddress(address), distributedData(data){}; + : ctx(ctx), workerAddress(address), distributedData(data) {}; - ~AllocationDescriptorGRPC() override{}; + ~AllocationDescriptorGRPC() override {}; [[nodiscard]] ALLOCATION_TYPE getType() const override { return type; }; std::string getLocation() const override { return workerAddress; }; - void createAllocation(size_t size, bool zero) override {} + std::unique_ptr createAllocation(size_t size, bool zero) const override { + return nullptr; + } // TODO: Implement transferTo and transferFrom functions std::shared_ptr getData() override { throw std::runtime_error("TransferTo/From functions are not implemented yet."); diff --git a/src/runtime/local/datastructures/AllocationDescriptorHost.h b/src/runtime/local/datastructures/AllocationDescriptorHost.h index af7e71e67..96ca3dcaa 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorHost.h +++ b/src/runtime/local/datastructures/AllocationDescriptorHost.h @@ -18,6 +18,7 @@ #include "DataPlacement.h" #include +#include class AllocationDescriptorHost : public IAllocationDescriptor { ALLOCATION_TYPE type = ALLOCATION_TYPE::HOST; @@ -25,14 +26,40 @@ class AllocationDescriptorHost : public IAllocationDescriptor { public: ~AllocationDescriptorHost() override = default; + [[nodiscard]] ALLOCATION_TYPE getType() const override { return type; } - void createAllocation(size_t size, bool zero) override {} - std::string getLocation() const override { return "Host"; } + + static std::unique_ptr createHostAllocation(std::shared_ptr data, size_t size, + bool zero) { + auto new_alloc = std::make_unique(); + new_alloc->data = std::move(data); + new_alloc->size = size; + if (zero) + memset(new_alloc->data.get(), 0, size); + return new_alloc; + } + + [[nodiscard]] std::unique_ptr createAllocation(size_t size, bool zero) const override { + auto new_alloc = std::make_unique(); + new_alloc->size = size; + new_alloc->data = std::shared_ptr(new std::byte[size], std::default_delete()); + + if (zero) + memset(new_alloc->data.get(), 0, size); + + return new_alloc; + } + + [[nodiscard]] std::string getLocation() const override { return "Host"; } std::shared_ptr getData() override { return data; } + + void setData(std::shared_ptr &_data) { data = _data; } + void transferTo(std::byte *src, size_t size) override {} void transferFrom(std::byte *dst, size_t size) override {} [[nodiscard]] std::unique_ptr clone() const override { return std::make_unique(*this); } + bool operator==(const IAllocationDescriptor *other) const override { return (getType() == other->getType()); } }; diff --git a/src/runtime/local/datastructures/AllocationDescriptorMPI.h b/src/runtime/local/datastructures/AllocationDescriptorMPI.h index 8f9d0cf72..ff7f26f7a 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorMPI.h +++ b/src/runtime/local/datastructures/AllocationDescriptorMPI.h @@ -14,8 +14,7 @@ * limitations under the License. */ -#ifndef SRC_RUNTIME_LOCAL_DATASTRUCTURE_ALLOCATION_DESCRIPTORMPH_H -#define SRC_RUNTIME_LOCAL_DATASTRUCTURE_ALLOCATION_DESCRIPTORMPH_H +#pragma once #include #include @@ -26,23 +25,24 @@ class AllocationDescriptorMPI : public IAllocationDescriptor { ALLOCATION_TYPE type = ALLOCATION_TYPE::DIST_MPI; - int processRankID; - DaphneContext *ctx; + int processRankID{}; + DaphneContext *ctx{}; DistributedData distributedData; std::shared_ptr data; public: - AllocationDescriptorMPI(){}; - AllocationDescriptorMPI(int id, DaphneContext *ctx, DistributedData data) - : processRankID(id), ctx(ctx), distributedData(data){}; + AllocationDescriptorMPI() = default; + AllocationDescriptorMPI(int id, DaphneContext *ctx, DistributedData &data) + : processRankID(id), ctx(ctx), distributedData(data) {} - ~AllocationDescriptorMPI() override{}; + ~AllocationDescriptorMPI() override = default; [[nodiscard]] ALLOCATION_TYPE getType() const override { return type; }; - std::string getLocation() const override { return std::to_string(processRankID); }; + [[nodiscard]] std::string getLocation() const override { return std::to_string(processRankID); }; + + std::unique_ptr createAllocation(size_t size, bool zero) const override { return nullptr; } - void createAllocation(size_t size, bool zero) override {}; std::shared_ptr getData() override { return nullptr; }; bool operator==(const IAllocationDescriptor *other) const override { @@ -58,10 +58,11 @@ class AllocationDescriptorMPI : public IAllocationDescriptor { void transferTo(std::byte *src, size_t size) override { /* TODO */ }; void transferFrom(std::byte *src, size_t size) override { /* TODO */ }; - const DistributedIndex getDistributedIndex() { return distributedData.ix; } - const DistributedData getDistributedData() { return distributedData; } - void updateDistributedData(DistributedData data_) { distributedData = data_; } - int getRank() { return processRankID; } -}; + DistributedIndex getDistributedIndex() const { return distributedData.ix; } + + DistributedData getDistributedData() { return distributedData; } -#endif // SRC_RUNTIME_LOCAL_DATASTRUCTURE_ALLOCATION_DESCRIPTORMPH_H + void updateDistributedData(DistributedData &data_) { distributedData = data_; } + + int getRank() const { return processRankID; } +}; diff --git a/src/runtime/local/datastructures/CSRMatrix.cpp b/src/runtime/local/datastructures/CSRMatrix.cpp index e49f88ff0..41c1a4ec3 100644 --- a/src/runtime/local/datastructures/CSRMatrix.cpp +++ b/src/runtime/local/datastructures/CSRMatrix.cpp @@ -18,6 +18,56 @@ #include "CSRMatrix.h" +template +CSRMatrix::CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero, + IAllocationDescriptor *allocInfo) + : Matrix(maxNumRows, numCols), numRowsAllocated(maxNumRows), isRowAllocatedBefore(false), is_view(false), + maxNumNonZeros(maxNumNonZeros), lastAppendedRowIdx(0) { + auto val_buf_size = maxNumNonZeros * sizeof(ValueType); + auto cidx_buf_size = maxNumNonZeros * sizeof(size_t); + auto rptr_buf_size = (numRows + 1) * sizeof(size_t); + + std::unique_ptr val_alloc; + std::unique_ptr cidx_alloc; + std::unique_ptr rptr_alloc; + + if (!allocInfo) { + values = std::shared_ptr(new ValueType[maxNumNonZeros], std::default_delete()); + auto bytes = std::reinterpret_pointer_cast(values); + val_alloc = AllocationDescriptorHost::createHostAllocation(bytes, val_buf_size, zero); + colIdxs = std::shared_ptr(new size_t[maxNumNonZeros], std::default_delete()); + bytes = std::reinterpret_pointer_cast(colIdxs); + cidx_alloc = AllocationDescriptorHost::createHostAllocation(bytes, cidx_buf_size, zero); + rowOffsets = std::shared_ptr(new size_t[numRows + 1], std::default_delete()); + bytes = std::reinterpret_pointer_cast(rowOffsets); + rptr_alloc = AllocationDescriptorHost::createHostAllocation(bytes, rptr_buf_size, zero); + } else { + val_alloc = allocInfo->createAllocation(val_buf_size, zero); + cidx_alloc = allocInfo->createAllocation(cidx_buf_size, zero); + rptr_alloc = allocInfo->createAllocation(rptr_buf_size, zero); + + // ToDo: refactor data storage into memory management + if (allocInfo->getType() == ALLOCATION_TYPE::HOST) { + values = std::reinterpret_pointer_cast(val_alloc->getData()); + colIdxs = std::reinterpret_pointer_cast(cidx_alloc->getData()); + rowOffsets = std::reinterpret_pointer_cast(rptr_alloc->getData()); + } + } + + spdlog::debug( + "Creating {} x {} sparse matrix of type: {}. Required memory: vals={}, cidxs={}, rptrs={}, total={} Mb", + numRows, numCols, val_alloc->getTypeName(), static_cast(val_buf_size) / (1048576), + static_cast(cidx_buf_size) / (1048576), static_cast(rptr_buf_size) / (1048576), + static_cast(val_buf_size + cidx_buf_size + rptr_buf_size) / (1048576)); + + std::vector> allocations; + allocations.emplace_back(std::move(val_alloc)); + allocations.emplace_back(std::move(cidx_alloc)); + allocations.emplace_back(std::move(rptr_alloc)); + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); +} + template size_t CSRMatrix::serialize(std::vector &buf) const { return DaphneSerializer>::serialize(this, buf); } diff --git a/src/runtime/local/datastructures/CSRMatrix.h b/src/runtime/local/datastructures/CSRMatrix.h index 08dc4a3e1..9741780b0 100644 --- a/src/runtime/local/datastructures/CSRMatrix.h +++ b/src/runtime/local/datastructures/CSRMatrix.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -60,6 +61,7 @@ template class CSRMatrix : public Matrix { bool isRowAllocatedBefore; + const bool is_view; /** * @brief The maximum number of non-zero values this matrix was allocated * to accommodate. @@ -88,18 +90,8 @@ template class CSRMatrix : public Matrix { * @param zero Whether the allocated memory of the internal arrays shall be * initialized to zeros (`true`), or be left uninitialized (`false`). */ - CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero) - : Matrix(maxNumRows, numCols), numRowsAllocated(maxNumRows), isRowAllocatedBefore(false), - maxNumNonZeros(maxNumNonZeros), values(new ValueType[maxNumNonZeros], std::default_delete()), - colIdxs(new size_t[maxNumNonZeros], std::default_delete()), - rowOffsets(new size_t[numRows + 1], std::default_delete()), lastAppendedRowIdx(0) { - if (zero) { - memset(values.get(), 0, maxNumNonZeros * sizeof(ValueType)); - memset(colIdxs.get(), 0, maxNumNonZeros * sizeof(size_t)); - memset(rowOffsets.get(), 0, (numRows + 1) * sizeof(size_t)); - } - } - + CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero, + IAllocationDescriptor *allocInfo = nullptr); /** * @brief Creates a `CSRMatrix` around a sub-matrix of another `CSRMatrix` * without copying the data. @@ -112,7 +104,7 @@ template class CSRMatrix : public Matrix { */ CSRMatrix(const CSRMatrix *src, size_t rowLowerIncl, size_t rowUpperExcl) : Matrix(rowUpperExcl - rowLowerIncl, src->numCols), - numRowsAllocated(src->numRowsAllocated - rowLowerIncl), isRowAllocatedBefore(rowLowerIncl > 0), + numRowsAllocated(src->numRowsAllocated - rowLowerIncl), isRowAllocatedBefore(rowLowerIncl > 0), is_view(true), lastAppendedRowIdx(0) { if (!src) throw std::runtime_error("CSRMatrix: src must not be null"); @@ -123,16 +115,18 @@ template class CSRMatrix : public Matrix { if (rowLowerIncl >= rowUpperExcl) throw std::runtime_error("CSRMatrix: rowLowerIncl must be lower than rowUpperExcl"); + this->row_offset = rowLowerIncl; maxNumNonZeros = src->maxNumNonZeros; values = src->values; colIdxs = src->colIdxs; rowOffsets = std::shared_ptr(src->rowOffsets, src->rowOffsets.get() + rowLowerIncl); + // ToDo: temporary fix to directly assign mdo + this->mdo = src->mdo; } - virtual ~CSRMatrix() { - // nothing to do - } + [[nodiscard]] size_t offset() const { return this->row_offset; } + virtual ~CSRMatrix() = default; void fillNextPosUntil(size_t nextPos, size_t rowIdx) { if (rowIdx > lastAppendedRowIdx) { for (size_t r = lastAppendedRowIdx + 2; r <= rowIdx + 1; r++) @@ -153,13 +147,16 @@ template class CSRMatrix : public Matrix { this->numRows = numRows; } - size_t getMaxNumNonZeros() const { return maxNumNonZeros; } - size_t getNumNonZeros() const { return rowOffsets.get()[numRows] - rowOffsets.get()[0]; } + [[nodiscard]] size_t getMaxNumNonZeros() const { return maxNumNonZeros; } + [[nodiscard]] size_t getNumNonZeros() const { return getRowOffsets()[numRows] - getRowOffsets()[0]; } - size_t getNumNonZeros(size_t rowIdx) const { + [[nodiscard]] size_t getNumNonZeros(size_t rowIdx) const { if (rowIdx >= numRows) throw std::runtime_error("CSRMatrix (getNumNonZeros): rowIdx is out of bounds"); - return rowOffsets.get()[rowIdx + 1] - rowOffsets.get()[rowIdx]; + auto roff = getRowOffsets(); + auto rownext = roff[rowIdx + 1]; + auto row = roff[rowIdx]; + return rownext - row; } void shrinkNumNonZeros(size_t numNonZeros) { @@ -170,39 +167,59 @@ template class CSRMatrix : public Matrix { // colIdxs arrays. } - ValueType *getValues() { return values.get(); } + ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) { + return reinterpret_cast(this->mdo->getData(0, alloc_desc, range)); + } - const ValueType *getValues() const { return values.get(); } + const ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const { + return reinterpret_cast(this->mdo->getData(0, alloc_desc, range)); + } - ValueType *getValues(size_t rowIdx) { + ValueType *getRowValues(size_t rowIdx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) { // We allow equality here to enable retrieving a pointer to the end. if (rowIdx > numRows) throw std::runtime_error("CSRMatrix (getValues): rowIdx is out of bounds"); - return values.get() + rowOffsets.get()[rowIdx]; + return &(reinterpret_cast(getValues(alloc_desc, range))[getRowOffsets()[rowIdx]]); } - const ValueType *getValues(size_t rowIdx) const { - return const_cast *>(this)->getValues(rowIdx); + const ValueType *getRowValues(size_t rowIdx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) const { + return &(reinterpret_cast(getValues(alloc_desc, range))[getRowOffsets()[rowIdx]]); } - size_t *getColIdxs() { return colIdxs.get(); } + size_t *getColIdxs(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) { + return reinterpret_cast(this->mdo->getData(1, alloc_desc, range)); + } - const size_t *getColIdxs() const { return colIdxs.get(); } + const size_t *getColIdxs(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const { + return reinterpret_cast(this->mdo->getData(1, alloc_desc, range)); + } - size_t *getColIdxs(size_t rowIdx) { + size_t *getColIdxsOfRow(size_t rowIdx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) { // We allow equality here to enable retrieving a pointer to the end. if (rowIdx > numRows) throw std::runtime_error("CSRMatrix (getColIdxs): rowIdx is out of bounds"); - return colIdxs.get() + rowOffsets.get()[rowIdx]; + auto data = this->mdo->getData(1, alloc_desc, range); + return &(reinterpret_cast(data)[getRowOffsets()[rowIdx]]); } - const size_t *getColIdxs(size_t rowIdx) const { - return const_cast *>(this)->getColIdxs(rowIdx); + const size_t *getColIdxsOfRow(size_t rowIdx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) const { + auto data = this->mdo->getData(1, alloc_desc, range); + return &(reinterpret_cast(data)[getRowOffsets()[rowIdx]]); } - size_t *getRowOffsets() { return rowOffsets.get(); } + size_t *getRowOffsets(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) { + auto ptr = reinterpret_cast(this->mdo->getData(2, alloc_desc, range)); + return ptr + offset(); + } - const size_t *getRowOffsets() const { return rowOffsets.get(); } + const size_t *getRowOffsets(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const { + auto ptr = reinterpret_cast(this->mdo->getData(2, alloc_desc, range)); + return ptr + offset(); + } ValueType get(size_t rowIdx, size_t colIdx) const override { if (rowIdx >= numRows) @@ -210,8 +227,8 @@ template class CSRMatrix : public Matrix { if (colIdx >= numCols) throw std::runtime_error("CSRMatrix (get): colIdx is out of bounds"); - const size_t *rowColIdxsBeg = getColIdxs(rowIdx); - const size_t *rowColIdxsEnd = getColIdxs(rowIdx + 1); + const size_t *rowColIdxsBeg = getColIdxsOfRow(rowIdx); + const size_t *rowColIdxsEnd = getColIdxsOfRow(rowIdx + 1); const size_t *ptrExpected = std::lower_bound(rowColIdxsBeg, rowColIdxsEnd, colIdx); if (ptrExpected == rowColIdxsEnd || *ptrExpected != colIdx) @@ -219,7 +236,7 @@ template class CSRMatrix : public Matrix { return ValueType(0); else // Entry for the given coordinates present. - return getValues(rowIdx)[ptrExpected - rowColIdxsBeg]; + return getRowValues(rowIdx)[ptrExpected - rowColIdxsBeg]; } void set(size_t rowIdx, size_t colIdx, ValueType value) override { @@ -228,13 +245,13 @@ template class CSRMatrix : public Matrix { if (colIdx >= numCols) throw std::runtime_error("CSRMatrix (set): colIdx is out of bounds"); - size_t *rowColIdxsBeg = getColIdxs(rowIdx); - size_t *rowColIdxsEnd = getColIdxs(rowIdx + 1); + size_t *rowColIdxsBeg = getColIdxsOfRow(rowIdx); + size_t *rowColIdxsEnd = getColIdxsOfRow(rowIdx + 1); const size_t *ptrExpected = std::lower_bound(rowColIdxsBeg, rowColIdxsEnd, colIdx); const size_t posExpected = ptrExpected - rowColIdxsBeg; const size_t posEnd = colIdxs.get() + rowOffsets.get()[numRowsAllocated] - rowColIdxsBeg; - ValueType *rowValuesBeg = getValues(rowIdx); + ValueType *rowValuesBeg = getRowValues(rowIdx); if (ptrExpected == rowColIdxsEnd || *ptrExpected != colIdx) { // No entry for the given coordinates present. @@ -306,7 +323,10 @@ template class CSRMatrix : public Matrix { void finishAppend() override { fillNextPosUntil(rowOffsets.get()[lastAppendedRowIdx + 1], numRows - 1); } - bool isView() const { return (numRowsAllocated > numRows || isRowAllocatedBefore); } + [[nodiscard]] bool isView() const { + // return (numRowsAllocated > numRows || isRowAllocatedBefore); + return is_view; + } void printValue(std::ostream &os, ValueType val) const { switch (ValueTypeUtils::codeFor) { @@ -327,12 +347,12 @@ template class CSRMatrix : public Matrix { << std::endl; // Note that, in general, the values within one row might not be sorted // by column index. Thus, the following is a little complicated. - ValueType *oneRow = new ValueType[numCols]; + auto oneRow = new ValueType[numCols]; for (size_t r = 0; r < numRows; r++) { memset(oneRow, 0, numCols * sizeof(ValueType)); const size_t rowNumNonZeros = getNumNonZeros(r); - const size_t *rowColIdxs = getColIdxs(r); - const ValueType *rowValues = getValues(r); + const size_t *rowColIdxs = getColIdxsOfRow(r); + const ValueType *rowValues = getRowValues(r); for (size_t i = 0; i < rowNumNonZeros; i++) oneRow[rowColIdxs[i]] = rowValues[i]; for (size_t c = 0; c < numCols; c++) { @@ -402,11 +422,10 @@ template class CSRMatrix : public Matrix { if (numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) return false; - const ValueType *valuesBegLhs = this->getValues(0); - const ValueType *valuesEndLhs = this->getValues(numRows); - const ValueType *valuesBegRhs = rhs.getValues(0); - const ValueType *valuesEndRhs = rhs.getValues(numRows); - + const ValueType *valuesBegLhs = this->getRowValues(0); + const ValueType *valuesEndLhs = this->getRowValues(numRows); + const ValueType *valuesBegRhs = rhs.getRowValues(0); + const ValueType *valuesEndRhs = rhs.getRowValues(numRows); const size_t nnzLhs = valuesEndLhs - valuesBegLhs; const size_t nnzRhs = valuesEndRhs - valuesBegRhs; @@ -421,7 +440,7 @@ template class CSRMatrix : public Matrix { const size_t *colIdxsBegRhs = rhs.getColIdxs(0); if (colIdxsBegLhs != colIdxsBegRhs) - if (memcmp(colIdxsBegLhs, colIdxsBegRhs, nnzLhs * sizeof(size_t))) + if (memcmp(colIdxsBegLhs, colIdxsBegRhs, nnzLhs * sizeof(size_t)) != 0) return false; return true; diff --git a/src/runtime/local/datastructures/DataPlacement.h b/src/runtime/local/datastructures/DataPlacement.h index d99a96879..8f6352add 100644 --- a/src/runtime/local/datastructures/DataPlacement.h +++ b/src/runtime/local/datastructures/DataPlacement.h @@ -23,20 +23,32 @@ #include /** - * The DataPlacement struct binds an allocation descriptor to a range + * The DataPlacement struct binds allocation descriptors to a range * description and stores an ID of an instantiated object. + * + * The number of allocations varys depending on the data type (1 allocation of values of a dense matrix + * three allocations for values/col_idxs/row_ptrs of a CSR matrix) */ -struct DataPlacement { +class DataPlacement { size_t dp_id; // used to generate object IDs static std::atomic_size_t instance_count; - std::unique_ptr allocation{}; + std::vector> allocations{}; std::unique_ptr range{}; + public: DataPlacement() = delete; - DataPlacement(std::unique_ptr _a, std::unique_ptr _r) - : dp_id(instance_count++), allocation(std::move(_a)), range(std::move(_r)) {} + DataPlacement(std::vector> _a, std::unique_ptr _r) + : dp_id(instance_count++), allocations(std::move(_a)), range(std::move(_r)) {} + + [[nodiscard]] size_t getID() const { return dp_id; } + std::string getLocation() { return allocations.front()->getLocation(); } + Range *getRange() { return range.get(); } + void setRange(Range *r) { range.reset(r); } + void setRange(std::unique_ptr r) { range = std::move(r); } + uint32_t getNumAllocations() { return allocations.size(); } + IAllocationDescriptor *getAllocation(uint32_t num) { return allocations[num].get(); } }; diff --git a/src/runtime/local/datastructures/DenseMatrix.cpp b/src/runtime/local/datastructures/DenseMatrix.cpp index 463a5ca0c..6177a8304 100644 --- a/src/runtime/local/datastructures/DenseMatrix.cpp +++ b/src/runtime/local/datastructures/DenseMatrix.cpp @@ -68,172 +68,74 @@ template DenseMatrix::DenseMatrix(size_t maxNumRows, size_t numCols, bool zero, IAllocationDescriptor *allocInfo) : Matrix(maxNumRows, numCols), is_view(false), rowSkip(numCols), bufferSize(numRows * numCols * sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - DataPlacement *new_data_placement; - if (allocInfo != nullptr) { - spdlog::debug("Creating {} x {} dense matrix of type: {}. Required memory: {} Mb", numRows, numCols, - static_cast(allocInfo->getType()), static_cast(getBufferSize()) / (1048576)); - - new_data_placement = this->mdo->addDataPlacement(allocInfo); - new_data_placement->allocation->createAllocation(getBufferSize(), zero); + std::unique_ptr val_alloc; + if (!allocInfo) { + alloc_shared_values(zero); + auto bytes = std::reinterpret_pointer_cast(values); + val_alloc = AllocationDescriptorHost::createHostAllocation(bytes, getBufferSize(), zero); } else { - AllocationDescriptorHost myHostAllocInfo; - alloc_shared_values(); + val_alloc = allocInfo->createAllocation(getBufferSize(), zero); - if (zero) - std::fill(values.get(), values.get() + maxNumRows * numCols, ValueTypeUtils::defaultValue); - new_data_placement = this->mdo->addDataPlacement(&myHostAllocInfo); + // ToDo: refactor data storage into memory management + if (allocInfo->getType() == ALLOCATION_TYPE::HOST) { + alloc_shared_values(zero); + + auto bytes = std::reinterpret_pointer_cast(values); + dynamic_cast(val_alloc.get())->setData(bytes); + } } - this->mdo->addLatest(new_data_placement->dp_id); + spdlog::debug("Creating {} x {} dense matrix of type: {}. Required memory: {} Mb", numRows, numCols, + val_alloc->getTypeName(), static_cast(getBufferSize()) / (1048576)); + + std::vector> allocations; + allocations.emplace_back(std::move(val_alloc)); + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); } template DenseMatrix::DenseMatrix(size_t numRows, size_t numCols, std::shared_ptr &values) : Matrix(numRows, numCols), is_view(false), rowSkip(numCols), values(values), bufferSize(numRows * numCols * sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - AllocationDescriptorHost myHostAllocInfo; - DataPlacement *new_data_placement = this->mdo->addDataPlacement(&myHostAllocInfo); - this->mdo->addLatest(new_data_placement->dp_id); + auto bytes = std::reinterpret_pointer_cast(values); + + std::vector> allocations; + auto vec_alloc = AllocationDescriptorHost::createHostAllocation(bytes, getBufferSize(), false); + allocations.emplace_back(std::move(vec_alloc)); + + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); } template DenseMatrix::DenseMatrix(const DenseMatrix *src, int64_t rowLowerIncl, int64_t rowUpperExcl, int64_t colLowerIncl, int64_t colUpperExcl) - : Matrix(rowUpperExcl - rowLowerIncl, colUpperExcl - colLowerIncl), is_view(true), + : Matrix(rowUpperExcl - rowLowerIncl, colUpperExcl - colLowerIncl), is_view(true), rowSkip(src->rowSkip), bufferSize(numRows * numCols * sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { validateArgs(src, rowLowerIncl, rowUpperExcl, colLowerIncl, colUpperExcl); - this->row_offset = rowLowerIncl; - this->col_offset = colLowerIncl; - rowSkip = src->rowSkip; + this->row_offset = isView() ? src->row_offset + rowLowerIncl : rowLowerIncl; + this->col_offset = isView() ? src->col_offset + colLowerIncl : colLowerIncl; // ToDo: manage host mem (values) in a data placement if (src->values) { - alloc_shared_values(src->values, offset()); bufferSize = numRows * rowSkip * sizeof(ValueType); + this->values = src->values; } - this->clone_mdo(src); + this->mdo = src->mdo; } template DenseMatrix::DenseMatrix(size_t numRows, size_t numCols, const DenseMatrix *src) - : Matrix(numRows, numCols), is_view(false), rowSkip(numCols), + : Matrix(numRows, numCols), is_view(src->is_view), rowSkip(src->getRowSkip()), bufferSize(numRows * numCols * sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - if (src->values) - values = src->values; - this->clone_mdo(src); -} + if (!is_view) + rowSkip = numCols; -template -auto DenseMatrix::getValuesInternal(const IAllocationDescriptor *alloc_desc, - const Range *range) -> std::tuple { - // If no range information is provided we assume the full range that this - // matrix covers - if (range == nullptr || *range == Range(*this)) { - if (alloc_desc) { - auto ret = this->mdo->findDataPlacementByType(alloc_desc, range); - if (!ret) { - // find other allocation type X (preferably host allocation) to - // transfer from in latest_version - - // tuple content: - std::tuple result = std::make_tuple(false, 0, nullptr); - auto latest = this->mdo->getLatest(); - DataPlacement *placement; - for (auto &placement_id : latest) { - placement = this->mdo->getDataPlacementByID(placement_id); - if (placement->range == nullptr || - *(placement->range) == Range{0, 0, this->getNumRows(), this->getNumCols()}) { - std::get<0>(result) = true; - std::get<1>(result) = placement->dp_id; - // prefer host allocation - if (placement->allocation->getType() == ALLOCATION_TYPE::HOST) { - std::get<2>(result) = reinterpret_cast(values.get()); - break; - } - } - } - - // if we found a data placement that is not in host memory, - // transfer it there before returning - if (std::get<0>(result) == true && std::get<2>(result) == nullptr) { - AllocationDescriptorHost myHostAllocInfo; - if (!values) - this->alloc_shared_values(); - this->mdo->addDataPlacement(&myHostAllocInfo); - placement->allocation->transferFrom(reinterpret_cast(startAddress()), getBufferSize()); - std::get<2>(result) = startAddress(); - } - - // create new data placement - auto new_data_placement = const_cast *>(this)->mdo->addDataPlacement(alloc_desc); - new_data_placement->allocation->createAllocation(getBufferSize(), false); - - // transfer to requested data placement - new_data_placement->allocation->transferTo(reinterpret_cast(startAddress()), - getBufferSize()); - return std::make_tuple(false, new_data_placement->dp_id, - reinterpret_cast(new_data_placement->allocation->getData().get())); - } else { - bool latest = this->mdo->isLatestVersion(ret->dp_id); - if (!latest) { - ret->allocation->transferTo(reinterpret_cast(startAddress()), getBufferSize()); - } - return std::make_tuple(latest, ret->dp_id, - reinterpret_cast(ret->allocation->getData().get())); - } - } else { - // if no alloc info was provided we try to get/create a full host - // allocation and return that - std::tuple result = std::make_tuple(false, 0, nullptr); - auto latest = this->mdo->getLatest(); - DataPlacement *placement; - for (auto &placement_id : latest) { - placement = this->mdo->getDataPlacementByID(placement_id); - - // only consider allocations covering full range of matrix - if (placement->range == nullptr || - *(placement->range) == - Range{this->row_offset, this->col_offset, this->getNumRows(), this->getNumCols()}) { - std::get<0>(result) = true; - std::get<1>(result) = placement->dp_id; - // prefer host allocation - if (placement->allocation->getType() == ALLOCATION_TYPE::HOST) { - std::get<2>(result) = reinterpret_cast(startAddress()); - break; - } - } - } - - // if we found a data placement that is not in host memory, transfer - // it there before returning - if (std::get<0>(result) == true && std::get<2>(result) == nullptr) { - AllocationDescriptorHost myHostAllocInfo; - - // ToDo: this needs fixing in the context of matrix views - if (!values) - const_cast *>(this)->alloc_shared_values(); - - this->mdo->addDataPlacement(&myHostAllocInfo); - // std::cout << "bufferSize: " << getBufferSize() - // << " RxRS: " << this->getNumRows() * - // this->getRowSkip() * sizeof(ValueType) << - // std::endl; if(getBufferSize() != - // this->getNumRows() * this->getRowSkip()) { - // placement->allocation->transferFrom(reinterpret_cast(startAddress()), this->getNumRows() * - // this->getRowSkip() * sizeof(ValueType)); - // } - // else - placement->allocation->transferFrom(reinterpret_cast(startAddress()), getBufferSize()); - std::get<2>(result) = startAddress(); - } - if (std::get<2>(result) == nullptr) - throw std::runtime_error("No object meta data in matrix"); - else - return result; - } - } else - throw std::runtime_error("Range support under construction"); + this->row_offset = src->row_offset; + this->col_offset = src->col_offset; + this->values = src->values; + this->mdo = src->mdo; } template void DenseMatrix::printValue(std::ostream &os, ValueType val) const { @@ -251,14 +153,50 @@ template <> [[maybe_unused]] void DenseMatrix::printValue(std::ostream & } template -void DenseMatrix::alloc_shared_values(std::shared_ptr src, size_t offset) { +void DenseMatrix::alloc_shared_values(bool zero, std::shared_ptr src, size_t offset) { // correct since C++17: Calls delete[] instead of simple delete if (src) { values = std::shared_ptr(src, src.get() + offset); - } else - // values = std::shared_ptr(new - // ValueType[numRows*numCols]); - values = std::shared_ptr(new ValueType[numRows * getRowSkip()]); + } else { + values = std::shared_ptr(new ValueType[getBufferSize()]); + if (zero) + std::fill(values.get(), values.get() + this->getNumRows() * this->getNumCols(), + ValueTypeUtils::defaultValue); + } +} + +template bool DenseMatrix::operator==(const DenseMatrix &rhs) const { + // Note that we do not use the generic `get` interface to matrices here since + // this operator is meant to be used for writing tests for, besides others, + // those generic interfaces. + + if (this == &rhs) + return true; + + const size_t numRows = this->getNumRows(); + const size_t numCols = this->getNumCols(); + + if (numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) + return false; + + const ValueType *valuesLhs = this->getValues(); + const ValueType *valuesRhs = rhs.getValues(); + + const size_t rowSkipLhs = this->getRowSkip(); + const size_t rowSkipRhs = rhs.getRowSkip(); + + if (valuesLhs == valuesRhs && rowSkipLhs == rowSkipRhs) + return true; + + for (size_t r = 0; r < numRows; ++r) { + for (size_t c = 0; c < numCols; ++c) { + if (*(valuesLhs + c) != *(valuesRhs + c)) + return false; + } + valuesLhs += rowSkipLhs; + valuesRhs += rowSkipRhs; + } + return true; } template size_t DenseMatrix::serialize(std::vector &buf) const { @@ -269,6 +207,40 @@ template <> size_t DenseMatrix::serialize(std::vector &buf) const { throw std::runtime_error("DenseMatrix serialization not implemented"); } +template +const ValueType *DenseMatrix::getValues(const IAllocationDescriptor *alloc_desc, const Range *range) const { + const ValueType *ptr; + if (this->isPinned(alloc_desc)) { + ptr = reinterpret_cast(this->pinned_mem); + } else { + this->pinned_mem = this->mdo->getData(0, alloc_desc, range); + ptr = reinterpret_cast(this->pinned_mem); + const_cast *>(this)->pin(alloc_desc); + } + + if (isView()) + return ptr + offset(); + else + return ptr; +} + +template +ValueType *DenseMatrix::getValues(const IAllocationDescriptor *alloc_desc, const Range *range) { + ValueType *ptr; + if (this->isPinned(alloc_desc)) { + ptr = reinterpret_cast(this->pinned_mem); + } else { + this->pinned_mem = this->mdo->getData(0, alloc_desc, range); + ptr = reinterpret_cast(this->pinned_mem); + this->pin(alloc_desc); + } + + if (isView()) + return ptr + offset(); + else + return ptr; +} + // ---------------------------------------------------------------------------- // const char* specialization DenseMatrix::DenseMatrix(size_t maxNumRows, size_t numCols, bool zero, size_t strBufferCapacity_, diff --git a/src/runtime/local/datastructures/DenseMatrix.h b/src/runtime/local/datastructures/DenseMatrix.h index ec4c9bd05..19a2f7d08 100644 --- a/src/runtime/local/datastructures/DenseMatrix.h +++ b/src/runtime/local/datastructures/DenseMatrix.h @@ -101,12 +101,13 @@ template class DenseMatrix : public Matrix { : DenseMatrix(src, rowLowerIncl, rowUpperExcl, 0, src->numCols){}; /** - * @brief Creates a `DenseMatrix` around an existing array of values without + * @brief Creates a `DenseMatrix` around an existing `DenseMatrix` without * copying the data. + * This is used in transpose and reshape operations for example; * * @param numRows The exact number of rows. * @param numCols The exact number of columns. - * @param values A `std::shared_ptr` to an existing array of values. + * @param src A source matrix */ DenseMatrix(size_t numRows, size_t numCols, const DenseMatrix *src); @@ -126,12 +127,11 @@ template class DenseMatrix : public Matrix { const size_t endPosExcl = pos(rowIdx, colIdx); if (startPosIncl < endPosExcl) - std::fill(values.get() + startPosIncl, values.get() + endPosExcl, + std::fill(getValues() + startPosIncl, values.get() + endPosExcl, ValueTypeUtils::defaultValue); } else { - auto v = values.get() + lastAppendedRowIdx * rowSkip; + auto v = getValues() + lastAppendedRowIdx * rowSkip; std::fill(v + lastAppendedColIdx + 1, v + numCols, ValueTypeUtils::defaultValue); - v += rowSkip; for (size_t r = lastAppendedRowIdx + 1; r < rowIdx; r++) { @@ -145,42 +145,10 @@ template class DenseMatrix : public Matrix { void printValue(std::ostream &os, ValueType val) const; - void alloc_shared_values(std::shared_ptr src = nullptr, size_t offset = 0); - - /** - * @brief The getValuesInternal method fetches a pointer to an allocation of - * values. Optionally a sub range can be specified. - * - * This method is called by the public getValues() methods either in const - * or non-const fashion. The const version returns a data pointer that is - * meant to be read-only. Read only access updates the list of up-to-date - * allocations (the latest_versions "list"). This way several copies of the - * data in various allocations can be kept without invalidating their copy. - * If the read write version of getValues() is used, the latest_versions - * list is cleared because a write to an allocation is assumed, which - * renders the other allocations of the same data out of sync. - * - * @param alloc_desc An instance of an IAllocationDescriptor derived class - * that is used to specify what type of allocation is requested. If no - * allocation descriptor is provided, a host allocation (plain main memory) - * is assumed by default. - * - * @param range An optional range describing which rows and columns of a - * matrix-like structure are requested. By default this is null and means - * all rows and columns. - * @return A tuple of three values is returned: - * 1: bool - is the returend allocation in the latest_versions list - * 2: size_t - the ID of the data placement (a structure relating an - * allocation to a range) 3: ValueType* - the pointer to the actual data - */ - - auto getValuesInternal(const IAllocationDescriptor *alloc_desc = nullptr, - const Range *range = nullptr) -> std::tuple; + void alloc_shared_values(bool zero = false, std::shared_ptr src = nullptr, size_t offset = 0); [[nodiscard]] size_t offset() const { return this->row_offset * rowSkip + this->col_offset; } - ValueType *startAddress() const { return isPartialBuffer() ? values.get() + offset() : values.get(); } - public: template using WithValueType = DenseMatrix; @@ -218,12 +186,7 @@ template class DenseMatrix : public Matrix { * of a data structure. * @return A pointer to the data in the requested memory space */ - const ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const { - auto [isLatest, id, ptr] = const_cast *>(this)->getValuesInternal(alloc_desc, range); - if (!isLatest) - this->mdo->addLatest(id); - return ptr; - } + const ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const; /** * @brief Fetch a pointer to the data held by this structure meant for @@ -237,21 +200,17 @@ template class DenseMatrix : public Matrix { * memory is requested (e.g. main memory in the current system, memory in an * accelerator card or memory in another host) * + * * @param range A Range object describing optionally requesting a sub range * of a data structure. * @return A pointer to the data in the requested memory space */ - ValueType *getValues(IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) { - auto [isLatest, id, ptr] = const_cast *>(this)->getValuesInternal(alloc_desc, range); - if (!isLatest) - this->mdo->setLatest(id); - return ptr; - } + ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr); std::shared_ptr getValuesSharedPtr() const { return values; } - ValueType get(size_t rowIdx, size_t colIdx) const override { - return getValues()[pos(rowIdx, colIdx, isPartialBuffer())]; + auto position = pos(rowIdx, colIdx, isPartialBuffer()); + return getValues()[position]; } void set(size_t rowIdx, size_t colIdx, ValueType value) override { @@ -262,7 +221,7 @@ template class DenseMatrix : public Matrix { void prepareAppend() override { // The matrix might be empty. if (numRows != 0 && numCols != 0) - values.get()[0] = ValueType(ValueTypeUtils::defaultValue); + getValues()[0] = ValueType(ValueTypeUtils::defaultValue); lastAppendedRowIdx = 0; lastAppendedColIdx = 0; } @@ -271,7 +230,7 @@ template class DenseMatrix : public Matrix { // Set all cells since the last one that was appended to zero. fillZeroUntil(rowIdx, colIdx); // Set the specified cell. - values.get()[pos(rowIdx, colIdx)] = value; + getValues()[pos(rowIdx, colIdx)] = value; // Update append state. lastAppendedRowIdx = rowIdx; lastAppendedColIdx = colIdx; @@ -308,39 +267,7 @@ template class DenseMatrix : public Matrix { [[nodiscard]] size_t getBufferSize() const { return bufferSize; } - bool operator==(const DenseMatrix &rhs) const { - // Note that we do not use the generic `get` interface to matrices here - // since this operator is meant to be used for writing tests for, - // besides others, those generic interfaces. - - if (this == &rhs) - return true; - - const size_t numRows = this->getNumRows(); - const size_t numCols = this->getNumCols(); - - if (numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) - return false; - - const ValueType *valuesLhs = this->getValues(); - const ValueType *valuesRhs = rhs.getValues(); - - const size_t rowSkipLhs = this->getRowSkip(); - const size_t rowSkipRhs = rhs.getRowSkip(); - - if (valuesLhs == valuesRhs && rowSkipLhs == rowSkipRhs) - return true; - - for (size_t r = 0; r < numRows; ++r) { - for (size_t c = 0; c < numCols; ++c) { - if (*(valuesLhs + c) != *(valuesRhs + c)) - return false; - } - valuesLhs += rowSkipLhs; - valuesRhs += rowSkipRhs; - } - return true; - } + bool operator==(const DenseMatrix &rhs) const; size_t serialize(std::vector &buf) const override; }; diff --git a/src/runtime/local/datastructures/IAllocationDescriptor.h b/src/runtime/local/datastructures/IAllocationDescriptor.h index eb17eb8f8..fce62c950 100644 --- a/src/runtime/local/datastructures/IAllocationDescriptor.h +++ b/src/runtime/local/datastructures/IAllocationDescriptor.h @@ -17,6 +17,7 @@ #pragma once #include +#include // An alphabetically sorted wishlist of supported allocation types ;-) // Supporting all of that is probably unmaintainable :-/ @@ -36,6 +37,11 @@ enum class ALLOCATION_TYPE { NUM_ALLOC_TYPES }; +// string representation of enum +static std::string_view allocation_types[] = { + "DIST_GRPC", "DIST_GRPC_ASYNC", "DIST_GRPC_SYNC", "DIST_MPI", "DIST_SPARK", "GPU_CUDA", "GPU_HIP", + "HOST", "HOST_PINNED_CUDA", "FPGA_INT", "FPGA_XLX", "ONEAPI", "NUM_ALLOC_TYPES"}; + /** * @brief The IAllocationDescriptor interface class describes an abstract * interface to handle memory allocations @@ -50,14 +56,20 @@ enum class ALLOCATION_TYPE { * by the allocator. */ class IAllocationDescriptor { + protected: + size_t size = 0; + public: virtual ~IAllocationDescriptor() = default; [[nodiscard]] virtual ALLOCATION_TYPE getType() const = 0; - virtual void createAllocation(size_t size, bool zero) = 0; + [[nodiscard]] virtual std::unique_ptr createAllocation(size_t size, bool zero) const = 0; [[nodiscard]] virtual std::string getLocation() const = 0; virtual std::shared_ptr getData() = 0; virtual void transferTo(std::byte *src, size_t size) = 0; virtual void transferFrom(std::byte *dst, size_t size) = 0; [[nodiscard]] virtual std::unique_ptr clone() const = 0; virtual bool operator==(const IAllocationDescriptor *other) const { return (getType() == other->getType()); } + [[nodiscard]] virtual size_t getSize() const { return size; }; + + virtual std::string_view getTypeName() const { return allocation_types[static_cast(getType())]; } }; diff --git a/src/runtime/local/datastructures/MetaDataObject.cpp b/src/runtime/local/datastructures/MetaDataObject.cpp index 5f8b36028..ea8131ba2 100644 --- a/src/runtime/local/datastructures/MetaDataObject.cpp +++ b/src/runtime/local/datastructures/MetaDataObject.cpp @@ -17,21 +17,35 @@ #include "MetaDataObject.h" #include "DataPlacement.h" -DataPlacement *MetaDataObject::addDataPlacement(const IAllocationDescriptor *allocInfo, Range *r) { - data_placements[static_cast(allocInfo->getType())].emplace_back( - std::make_unique(allocInfo->clone(), r == nullptr ? nullptr : r->clone())); - return data_placements[static_cast(allocInfo->getType())].back().get(); +#include + +DataPlacement *MetaDataObject::addDataPlacement(std::vector> &allocInfos, + Range *r) { + if (r) { + auto type = static_cast(allocInfos.front()->getType()); + range_data_placements[type].emplace_back(std::make_unique(std::move(allocInfos), r->clone())); + return range_data_placements[type].back().get(); + } else { + auto type = static_cast(allocInfos.front()->getType()); + data_placements[type] = std::make_unique(std::move(allocInfos), nullptr); + return data_placements[type].get(); + } } -auto MetaDataObject::getDataPlacementByType(ALLOCATION_TYPE type) const +auto MetaDataObject::getRangeDataPlacementByType(ALLOCATION_TYPE type) const -> const std::vector> * { - return &(data_placements[static_cast(type)]); + return &(range_data_placements[static_cast(type)]); +} + +auto MetaDataObject::getDataPlacementByType(ALLOCATION_TYPE type) const -> DataPlacement * { + return data_placements[static_cast(type)].get(); } DataPlacement *MetaDataObject::getDataPlacementByLocation(const std::string &location) const { - for (const auto &_omdType : data_placements) { + // ToDo: no range? + for (const auto &_omdType : range_data_placements) { for (auto &_omd : _omdType) { - if (_omd->allocation->getLocation() == location) + if (_omd->getLocation() == location) return const_cast(_omd.get()); } } @@ -39,10 +53,10 @@ DataPlacement *MetaDataObject::getDataPlacementByLocation(const std::string &loc } void MetaDataObject::updateRangeDataPlacementByID(size_t id, Range *r) { - for (auto &_omdType : data_placements) { + for (auto &_omdType : range_data_placements) { for (auto &_omd : _omdType) { - if (_omd->dp_id == id) { - _omd->range = r->clone(); + if (_omd->getID() == id) { + _omd->setRange(r->clone()); return; } } @@ -50,31 +64,19 @@ void MetaDataObject::updateRangeDataPlacementByID(size_t id, Range *r) { } DataPlacement *MetaDataObject::getDataPlacementByID(size_t id) const { - for (const auto &_omdType : data_placements) { - for (auto &_omd : _omdType) { - if (_omd->dp_id == id) - return const_cast(_omd.get()); - } + for (const auto &dp : data_placements) { + if (dp) + if (dp->getID() == id) + return const_cast(dp.get()); } - return nullptr; -} - -const DataPlacement *MetaDataObject::findDataPlacementByType(const IAllocationDescriptor *alloc_desc, - const Range *range) const { - auto res = getDataPlacementByType(alloc_desc->getType()); - if (res->empty()) - return nullptr; - else { - for (size_t i = 0; i < res->size(); ++i) { - if ((*res)[i]->allocation->operator==(alloc_desc)) { - if (((*res)[i]->range == nullptr && range == nullptr) || - ((*res)[i]->range != nullptr && (*res)[i]->range->operator==(range))) { - return (*res)[i].get(); - } - } + for (const auto &rdp_by_type : range_data_placements) { + for (auto &rdp : rdp_by_type) { + if (rdp) + if (rdp->getID() == id) + return const_cast(rdp.get()); } - return nullptr; } + return nullptr; } bool MetaDataObject::isLatestVersion(size_t placement) const { @@ -89,3 +91,70 @@ void MetaDataObject::setLatest(size_t id) { } auto MetaDataObject::getLatest() const -> std::vector { return latest_version; } + +DataPlacement *MetaDataObject::getDataPlacement(const IAllocationDescriptor *alloc_desc) { + auto dp = getDataPlacementByType(alloc_desc->getType()); + if (!dp) { + // find other allocation type X (preferably host allocation) to transfer from in latest_version + auto latest = getLatest(); + DataPlacement *placement = getDataPlacementByID(latest.front()); + + std::vector> allocations; + for (uint32_t i = 0; i < placement->getNumAllocations(); ++i) { + auto other_alloc = placement->getAllocation(i); + auto new_alloc = alloc_desc->createAllocation(other_alloc->getSize(), false); + + // transfer to requested data placement + // ToDo: this works for the HOST <-> CUDA use case for now + // ToDo: update *Matrix own shared ptrs when transferring back from other storage + if (new_alloc->getType() == ALLOCATION_TYPE::HOST) + other_alloc->transferFrom(new_alloc->getData().get(), new_alloc->getSize()); + else + new_alloc->transferTo(other_alloc->getData().get(), other_alloc->getSize()); + allocations.emplace_back(std::move(new_alloc)); + } + return addDataPlacement(allocations); + } else { + bool isLatest = isLatestVersion(dp->getID()); + if (!isLatest) { + auto latest = getDataPlacementByID(getLatest().front()); + for (uint32_t i = 0; i < latest->getNumAllocations(); ++i) { + auto other_alloc = latest->getAllocation(i); + dp->getAllocation(i)->transferTo(other_alloc->getData().get(), other_alloc->getSize()); + } + } + return dp; + } +} + +std::pair +MetaDataObject::getDataInternal(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, const Range *range) { + DataPlacement *dp; + if (!range) { + if (!alloc_desc) { + auto ad = AllocationDescriptorHost(); + dp = getDataPlacement(&ad); + } else + dp = getDataPlacement(alloc_desc); + + return std::make_pair(dp->getID(), dp->getAllocation(alloc_idx)->getData().get()); + } else + throw std::runtime_error("Range support under construction"); +} + +const std::byte *MetaDataObject::getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, + const Range *range) const { + const std::lock_guard lock(mtx); + + auto [id, ptr] = const_cast(this)->getDataInternal(alloc_idx, alloc_desc, range); + const_cast(this)->addLatest(id); + return ptr; +} + +std::byte *MetaDataObject::getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, const Range *range) { + const std::lock_guard lock(mtx); + + auto [id, ptr] = getDataInternal(alloc_idx, alloc_desc, range); + setLatest(id); + return ptr; +} \ No newline at end of file diff --git a/src/runtime/local/datastructures/MetaDataObject.h b/src/runtime/local/datastructures/MetaDataObject.h index 994b0ca73..5a8d3b591 100644 --- a/src/runtime/local/datastructures/MetaDataObject.h +++ b/src/runtime/local/datastructures/MetaDataObject.h @@ -38,20 +38,33 @@ struct DataPlacement; */ class MetaDataObject { std::array>, static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES)> - data_placements; + range_data_placements; + + std::array, static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES)> data_placements; + mutable std::mutex mtx{}; std::vector latest_version; + std::pair getDataInternal(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, + const Range *range); public: - DataPlacement *addDataPlacement(const IAllocationDescriptor *allocInfo, Range *r = nullptr); - const DataPlacement *findDataPlacementByType(const IAllocationDescriptor *alloc_desc, const Range *range) const; + DataPlacement *addDataPlacement(std::vector> &allocInfos, + Range *r = nullptr); [[nodiscard]] DataPlacement *getDataPlacementByID(size_t id) const; [[nodiscard]] DataPlacement *getDataPlacementByLocation(const std::string &location) const; [[nodiscard]] auto - getDataPlacementByType(ALLOCATION_TYPE type) const -> const std::vector> *; + getRangeDataPlacementByType(ALLOCATION_TYPE type) const -> const std::vector> *; + [[nodiscard]] auto getDataPlacementByType(ALLOCATION_TYPE type) const -> DataPlacement *; void updateRangeDataPlacementByID(size_t id, Range *r); + DataPlacement *getDataPlacement(const IAllocationDescriptor *alloc_desc); + [[nodiscard]] bool isLatestVersion(size_t placement) const; void addLatest(size_t id); void setLatest(size_t id); [[nodiscard]] auto getLatest() const -> std::vector; + + const std::byte *getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) const; + std::byte *getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr); }; diff --git a/src/runtime/local/datastructures/Structure.cpp b/src/runtime/local/datastructures/Structure.cpp index bdef9f240..36f6678f6 100644 --- a/src/runtime/local/datastructures/Structure.cpp +++ b/src/runtime/local/datastructures/Structure.cpp @@ -19,19 +19,12 @@ Structure::Structure(size_t numRows, size_t numCols) : refCounter(1), numRows(numRows), numCols(numCols) { mdo = std::make_shared(); -}; +} -void Structure::clone_mdo(const Structure *src) { - // FIXME: This clones the meta data to avoid locking (thread synchronization - // for data copy) - for (int i = 0; i < static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES); i++) { - auto placements = src->mdo->getDataPlacementByType(static_cast(i)); - for (auto it = placements->begin(); it != placements->end(); it++) { - auto src_alloc = it->get()->allocation.get(); - auto src_range = it->get()->range.get(); - auto new_data_placement = this->mdo->addDataPlacement(src_alloc, src_range); - if (src->mdo->isLatestVersion(it->get()->dp_id)) - this->mdo->addLatest(new_data_placement->dp_id); - } - } +bool Structure::isPinned(const IAllocationDescriptor *alloc_desc) const { + return alloc_desc ? pinned_allocation == alloc_desc->getType() : pinned_allocation == ALLOCATION_TYPE::HOST; +} + +void Structure::pin(const IAllocationDescriptor *alloc_desc) { + alloc_desc ? pinned_allocation = alloc_desc->getType() : pinned_allocation = ALLOCATION_TYPE::HOST; } \ No newline at end of file diff --git a/src/runtime/local/datastructures/Structure.h b/src/runtime/local/datastructures/Structure.h index 13f9a7cf5..42f6fc8da 100644 --- a/src/runtime/local/datastructures/Structure.h +++ b/src/runtime/local/datastructures/Structure.h @@ -37,12 +37,16 @@ class Structure { size_t col_offset{}; size_t numRows; size_t numCols; + mutable ALLOCATION_TYPE pinned_allocation{}; + mutable std::byte *pinned_mem{}; Structure(size_t numRows, size_t numCols); mutable std::shared_ptr mdo; - void clone_mdo(const Structure *src); + [[nodiscard]] bool isPinned(const IAllocationDescriptor *alloc_desc) const; + + void pin(const IAllocationDescriptor *alloc_desc); public: virtual ~Structure() = default; diff --git a/src/runtime/local/io/DaphneSerializer.h b/src/runtime/local/io/DaphneSerializer.h index ad43c8700..f7f4da72a 100644 --- a/src/runtime/local/io/DaphneSerializer.h +++ b/src/runtime/local/io/DaphneSerializer.h @@ -608,7 +608,7 @@ template struct DaphneSerializer, false> { if (chunkSize <= bufferIdx) return bufferIdx; - const size_t *colIdxs = arg->getColIdxs(0); + const size_t *colIdxs = arg->getColIdxsOfRow(0); if (serializeFromByte < serializationIdx + nzb * sizeof(size_t)) { size_t startOffset = serializeFromByte > serializationIdx ? serializeFromByte - serializationIdx : 0; size_t bytesToCopy = 0; @@ -630,7 +630,7 @@ template struct DaphneSerializer, false> { if (chunkSize <= bufferIdx) return bufferIdx; - const VT *vals = arg->getValues(0); + const VT *vals = arg->getRowValues(0); if (serializeFromByte < serializationIdx + nzb * sizeof(VT)) { size_t startOffset = serializeFromByte > serializationIdx ? serializeFromByte - serializationIdx : 0; size_t bytesToCopy = 0; diff --git a/src/runtime/local/kernels/AggAll.h b/src/runtime/local/kernels/AggAll.h index 14b375057..924066631 100644 --- a/src/runtime/local/kernels/AggAll.h +++ b/src/runtime/local/kernels/AggAll.h @@ -135,14 +135,14 @@ template struct AggAll> EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(opCode)); - return aggArray(arg->getValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), func, + return aggArray(arg->getRowValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), func, AggOpCodeUtils::isSparseSafe(opCode), AggOpCodeUtils::template getNeutral(opCode), ctx); } else { // The op-code is either MEAN or STDDEV or VAR. EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); - auto agg = aggArray(arg->getValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), func, - true, VTRes(0), ctx); + auto agg = aggArray(arg->getRowValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), + func, true, VTRes(0), ctx); agg = agg / (arg->getNumRows() * arg->getNumCols()); if (opCode == AggOpCode::MEAN) return agg; diff --git a/src/runtime/local/kernels/AggCol.h b/src/runtime/local/kernels/AggCol.h index f096c0c94..290181436 100644 --- a/src/runtime/local/kernels/AggCol.h +++ b/src/runtime/local/kernels/AggCol.h @@ -202,9 +202,8 @@ template struct AggCol, CSRM // for MEAN and STDDDEV, we need to sum func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); - const VTArg *valuesArg = arg->getValues(0); - const size_t *colIdxsArg = arg->getColIdxs(0); - + const VTArg *valuesArg = arg->getRowValues(0); + const size_t *colIdxsArg = arg->getColIdxsOfRow(0); const size_t numNonZeros = arg->getNumNonZeros(); if (AggOpCodeUtils::isSparseSafe(opCode)) { diff --git a/src/runtime/local/kernels/AggCum.h b/src/runtime/local/kernels/AggCum.h index 3481511e8..59647befc 100644 --- a/src/runtime/local/kernels/AggCum.h +++ b/src/runtime/local/kernels/AggCum.h @@ -66,18 +66,21 @@ template struct AggCum, Dens EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(opCode)); + auto arg_inc = arg->isView() ? arg->getNumCols() : arg->getRowSkip(); + auto res_inc = res->isView() ? res->getNumCols() : res->getRowSkip(); + // First row: copy from arg to res. for (size_t c = 0; c < numCols; c++) valuesResCur[c] = valuesArg[c]; - valuesArg += arg->getRowSkip(); - valuesResCur += res->getRowSkip(); + valuesArg += arg_inc; + valuesResCur += res_inc; // Remaining rows: calculate from previous res row and current arg row. for (size_t r = 1; r < numRows; r++) { for (size_t c = 0; c < numCols; c++) valuesResCur[c] = func(valuesResPrv[c], valuesArg[c], ctx); - valuesArg += arg->getRowSkip(); - valuesResPrv += res->getRowSkip(); - valuesResCur += res->getRowSkip(); + valuesArg += arg_inc; + valuesResPrv += res_inc; + valuesResCur += res_inc; } } }; diff --git a/src/runtime/local/kernels/AggRow.h b/src/runtime/local/kernels/AggRow.h index 8b3da82ce..4c2fd28d7 100644 --- a/src/runtime/local/kernels/AggRow.h +++ b/src/runtime/local/kernels/AggRow.h @@ -183,7 +183,7 @@ template struct AggRow, CSRM const VTRes neutral = AggOpCodeUtils::template getNeutral(opCode); for (size_t r = 0; r < numRows; r++) { - *valuesRes = AggAll>::aggArray(arg->getValues(r), arg->getNumNonZeros(r), + *valuesRes = AggAll>::aggArray(arg->getRowValues(r), arg->getNumNonZeros(r), numCols, func, isSparseSafe, neutral, ctx); valuesRes += res->getRowSkip(); } @@ -197,7 +197,7 @@ template struct AggRow, CSRM EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); for (size_t r = 0; r < numRows; r++) { - *valuesRes = AggAll>::aggArray(arg->getValues(r), arg->getNumNonZeros(r), + *valuesRes = AggAll>::aggArray(arg->getRowValues(r), arg->getNumNonZeros(r), numCols, func, isSparseSafe, neutral, ctx); const VTArg *valuesArg = arg->getValues(0); const size_t numNonZeros = arg->getNumNonZeros(r); diff --git a/src/runtime/local/kernels/CMakeLists.txt b/src/runtime/local/kernels/CMakeLists.txt index 2fa6a2cc6..50c3a74dc 100644 --- a/src/runtime/local/kernels/CMakeLists.txt +++ b/src/runtime/local/kernels/CMakeLists.txt @@ -108,6 +108,7 @@ set(SOURCES_cpp_kernels ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/MTWrapper_sparse.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/Tasks.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/WorkerCPU.h + ) # The library of pre-compiled kernels. Will be linked into the JIT-compiled user program. add_library(KernelObjLib OBJECT ${SOURCES_cpp_kernels} ${HEADERS_cpp_kernels}) diff --git a/src/runtime/local/kernels/CheckEqApprox.h b/src/runtime/local/kernels/CheckEqApprox.h index 72faf166b..4d6d020a7 100644 --- a/src/runtime/local/kernels/CheckEqApprox.h +++ b/src/runtime/local/kernels/CheckEqApprox.h @@ -137,8 +137,8 @@ template struct CheckEqApprox> { return false; for (size_t r = 0; r < numRows; r++) { - const VT *valuesLhs = lhs->getValues(r); - const VT *valuesRhs = rhs->getValues(r); + const VT *valuesLhs = lhs->getRowValues(r); + const VT *valuesRhs = rhs->getRowValues(r); const size_t nnzElementsLhs = lhs->getNumNonZeros(r); const size_t nnzElementsRhs = rhs->getNumNonZeros(r); if (nnzElementsLhs != nnzElementsRhs) diff --git a/src/runtime/local/kernels/DiagMatrix.h b/src/runtime/local/kernels/DiagMatrix.h index 489c4190d..581f35416 100644 --- a/src/runtime/local/kernels/DiagMatrix.h +++ b/src/runtime/local/kernels/DiagMatrix.h @@ -133,7 +133,7 @@ template struct DiagMatrix, CSRMatrix> { for (size_t r = 0, pos = 0; r < numRowsCols; r++) { if (arg->getNumNonZeros(r)) { - valuesRes[pos] = *(arg->getValues(r)); + valuesRes[pos] = *(arg->getRowValues(r)); colIdxsRes[pos++] = r; } rowOffsetsRes[r + 1] = pos; diff --git a/src/runtime/local/kernels/EwBinaryMat.h b/src/runtime/local/kernels/EwBinaryMat.h index 5330c71d7..331791523 100644 --- a/src/runtime/local/kernels/EwBinaryMat.h +++ b/src/runtime/local/kernels/EwBinaryMat.h @@ -148,12 +148,12 @@ template struct EwBinaryMat, CSRMatrix, CSRMatri size_t nnzRowRhs = rhs->getNumNonZeros(rowIdx); if (nnzRowLhs && nnzRowRhs) { // merge within row - const VT *valuesRowLhs = lhs->getValues(rowIdx); - const VT *valuesRowRhs = rhs->getValues(rowIdx); - VT *valuesRowRes = res->getValues(rowIdx); - const size_t *colIdxsRowLhs = lhs->getColIdxs(rowIdx); - const size_t *colIdxsRowRhs = rhs->getColIdxs(rowIdx); - size_t *colIdxsRowRes = res->getColIdxs(rowIdx); + const VT *valuesRowLhs = lhs->getRowValues(rowIdx); + const VT *valuesRowRhs = rhs->getRowValues(rowIdx); + VT *valuesRowRes = res->getRowValues(rowIdx); + const size_t *colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + const size_t *colIdxsRowRhs = rhs->getColIdxsOfRow(rowIdx); + size_t *colIdxsRowRes = res->getColIdxsOfRow(rowIdx); size_t posLhs = 0; size_t posRhs = 0; size_t posRes = 0; @@ -191,13 +191,13 @@ template struct EwBinaryMat, CSRMatrix, CSRMatri rowOffsetsRes[rowIdx + 1] = rowOffsetsRes[rowIdx] + posRes + restRowLhs + restRowRhs; } else if (nnzRowLhs) { // copy from left - memcpy(res->getValues(rowIdx), lhs->getValues(rowIdx), nnzRowLhs * sizeof(VT)); - memcpy(res->getColIdxs(rowIdx), lhs->getColIdxs(rowIdx), nnzRowLhs * sizeof(size_t)); + memcpy(res->getRowValues(rowIdx), lhs->getRowValues(rowIdx), nnzRowLhs * sizeof(VT)); + memcpy(res->getColIdxsOfRow(rowIdx), lhs->getColIdxsOfRow(rowIdx), nnzRowLhs * sizeof(size_t)); rowOffsetsRes[rowIdx + 1] = rowOffsetsRes[rowIdx] + nnzRowLhs; } else if (nnzRowRhs) { // copy from right - memcpy(res->getValues(rowIdx), rhs->getValues(rowIdx), nnzRowRhs * sizeof(VT)); - memcpy(res->getColIdxs(rowIdx), rhs->getColIdxs(rowIdx), nnzRowRhs * sizeof(size_t)); + memcpy(res->getRowValues(rowIdx), rhs->getRowValues(rowIdx), nnzRowRhs * sizeof(VT)); + memcpy(res->getColIdxsOfRow(rowIdx), rhs->getColIdxsOfRow(rowIdx), nnzRowRhs * sizeof(size_t)); rowOffsetsRes[rowIdx + 1] = rowOffsetsRes[rowIdx] + nnzRowRhs; } else // empty row in result @@ -211,12 +211,12 @@ template struct EwBinaryMat, CSRMatrix, CSRMatri size_t nnzRowRhs = rhs->getNumNonZeros(rowIdx); if (nnzRowLhs && nnzRowRhs) { // intersect within row - const VT *valuesRowLhs = lhs->getValues(rowIdx); - const VT *valuesRowRhs = rhs->getValues(rowIdx); - VT *valuesRowRes = res->getValues(rowIdx); - const size_t *colIdxsRowLhs = lhs->getColIdxs(rowIdx); - const size_t *colIdxsRowRhs = rhs->getColIdxs(rowIdx); - size_t *colIdxsRowRes = res->getColIdxs(rowIdx); + const VT *valuesRowLhs = lhs->getRowValues(rowIdx); + const VT *valuesRowRhs = rhs->getRowValues(rowIdx); + VT *valuesRowRes = res->getRowValues(rowIdx); + const size_t *colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + const size_t *colIdxsRowRhs = rhs->getColIdxsOfRow(rowIdx); + size_t *colIdxsRowRes = res->getColIdxsOfRow(rowIdx); size_t posLhs = 0; size_t posRhs = 0; size_t posRes = 0; @@ -286,10 +286,10 @@ template struct EwBinaryMat, CSRMatrix, DenseMat size_t nnzRowLhs = lhs->getNumNonZeros(rowIdx); if (nnzRowLhs) { // intersect within row - const VT *valuesRowLhs = lhs->getValues(rowIdx); - VT *valuesRowRes = res->getValues(rowIdx); - const size_t *colIdxsRowLhs = lhs->getColIdxs(rowIdx); - size_t *colIdxsRowRes = res->getColIdxs(rowIdx); + const VT *valuesRowLhs = lhs->getRowValues(rowIdx); + VT *valuesRowRes = res->getRowValues(rowIdx); + const size_t *colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + size_t *colIdxsRowRes = res->getColIdxsOfRow(rowIdx); auto rhsRow = (rhs->getNumRows() == 1 ? 0 : rowIdx); size_t posRes = 0; for (size_t posLhs = 0; posLhs < nnzRowLhs; ++posLhs) { diff --git a/src/runtime/local/kernels/Gemv.h b/src/runtime/local/kernels/Gemv.h index a46c3afcc..0edc3ded6 100644 --- a/src/runtime/local/kernels/Gemv.h +++ b/src/runtime/local/kernels/Gemv.h @@ -100,8 +100,8 @@ template struct Gemv, CSRMatrix, DenseMatrixgetNumNonZeros(r); - const size_t *rowColIdxs = mat->getColIdxs(r); - const VT *rowValues = mat->getValues(r); + const size_t *rowColIdxs = mat->getColIdxsOfRow(r); + const VT *rowValues = mat->getRowValues(r); const size_t rowIdxRes = r * rowSkipRes; for (size_t i = 0; i < rowNumNonZeros; i++) { diff --git a/src/runtime/local/kernels/HasSpecialValue.h b/src/runtime/local/kernels/HasSpecialValue.h index 274412095..73e0c2314 100644 --- a/src/runtime/local/kernels/HasSpecialValue.h +++ b/src/runtime/local/kernels/HasSpecialValue.h @@ -99,8 +99,8 @@ template struct HasSpecialValue, auto numCols = arg->getNumCols(); auto numNonZeros = arg->getNumNonZeros(); auto numElements = numRows * numCols; - auto vBegin = arg->getValues(0); - auto vEnd = arg->getValues(numRows); + auto vBegin = arg->getRowValues(0); + auto vEnd = arg->getRowValues(numRows); auto hasZeroes = numNonZeros < numElements; auto zero = VT(0); diff --git a/src/runtime/local/kernels/IsSymmetric.h b/src/runtime/local/kernels/IsSymmetric.h index 60e4d7222..a63e1df48 100644 --- a/src/runtime/local/kernels/IsSymmetric.h +++ b/src/runtime/local/kernels/IsSymmetric.h @@ -104,8 +104,8 @@ template struct IsSymmetric> { for (size_t rowIdx = 0; rowIdx < numRows; rowIdx++) { - const VT *rowA = arg->getValues(rowIdx); - const size_t *colIdxsA = arg->getColIdxs(rowIdx); + const VT *rowA = arg->getRowValues(rowIdx); + const size_t *colIdxsA = arg->getColIdxsOfRow(rowIdx); const size_t numNonZerosA = arg->getNumNonZeros(rowIdx); for (size_t idx = 0; idx < numNonZerosA; idx++) { @@ -119,8 +119,8 @@ template struct IsSymmetric> { VT valA = rowA[idx]; // B references the transposed element to compare for symmetry. - const VT *rowB = arg->getValues(colIdxA); - const size_t *colIdxsB = arg->getColIdxs(colIdxA); + const VT *rowB = arg->getRowValues(colIdxA); + const size_t *colIdxsB = arg->getColIdxsOfRow(colIdxA); const size_t numNonZerosB = arg->getNumNonZeros(colIdxA); positions[colIdxA]++; // colIdxA is rowIdxB diff --git a/src/runtime/local/kernels/MatMul.h b/src/runtime/local/kernels/MatMul.h index 7e9710125..b5466b853 100644 --- a/src/runtime/local/kernels/MatMul.h +++ b/src/runtime/local/kernels/MatMul.h @@ -77,8 +77,8 @@ template struct MatMul, CSRMatrix, DenseMatrix memset(valuesRes, VT(0), sizeof(VT) * nr1 * nc2); for (size_t r = 0; r < nr1; r++) { const size_t rowNumNonZeros = lhs->getNumNonZeros(r); - const size_t *rowColIdxs = lhs->getColIdxs(r); - const VT *rowValues = lhs->getValues(r); + const size_t *rowColIdxs = lhs->getColIdxsOfRow(r); + const VT *rowValues = lhs->getRowValues(r); const size_t rowIdxRes = r * rowSkipRes; for (size_t i = 0; i < rowNumNonZeros; i++) { diff --git a/src/runtime/local/kernels/NumDistinctApprox.h b/src/runtime/local/kernels/NumDistinctApprox.h index 936aea425..64ff0a841 100644 --- a/src/runtime/local/kernels/NumDistinctApprox.h +++ b/src/runtime/local/kernels/NumDistinctApprox.h @@ -125,7 +125,7 @@ template struct NumDistinctApprox> { } for (size_t rowIdx = 0; rowIdx < numRows; rowIdx++) { - const VT *values = arg->getValues(rowIdx); + const VT *values = arg->getRowValues(rowIdx); const size_t numNonZerosInRow = arg->getNumNonZeros(rowIdx); for (size_t colIdx = 0; colIdx < numNonZerosInRow; colIdx++) { diff --git a/src/runtime/local/kernels/Replace.h b/src/runtime/local/kernels/Replace.h index b642e7b9c..bc8cca861 100644 --- a/src/runtime/local/kernels/Replace.h +++ b/src/runtime/local/kernels/Replace.h @@ -190,8 +190,8 @@ template struct Replace, CSRMatrix, VT> { //--------main logic -------------------------- if (pattern != pattern) { // pattern is NaN for (size_t r = 0; r < numRows; r++) { - const VT *allValues = arg->getValues(r); - VT *allUpdatedValues = res->getValues(r); + const VT *allValues = arg->getRowValues(r); + VT *allUpdatedValues = res->getRowValues(r); const size_t nnzElementsRes = arg->getNumNonZeros(r); for (size_t c = 0; c < nnzElementsRes; c++) { if (allValues[c] != allValues[c]) { @@ -201,8 +201,8 @@ template struct Replace, CSRMatrix, VT> { } } else { for (size_t r = 0; r < numRows; r++) { - const VT *allValues = arg->getValues(r); - VT *allUpdatedValues = res->getValues(r); + const VT *allValues = arg->getRowValues(r); + VT *allUpdatedValues = res->getRowValues(r); const size_t nnzElementsRes = arg->getNumNonZeros(r); for (size_t c = 0; c < nnzElementsRes; c++) { if (allValues[c] == pattern) { diff --git a/src/runtime/local/kernels/Reshape.h b/src/runtime/local/kernels/Reshape.h index de7edeb55..17efaf2da 100644 --- a/src/runtime/local/kernels/Reshape.h +++ b/src/runtime/local/kernels/Reshape.h @@ -56,9 +56,9 @@ template struct Reshape, DenseMatrix> { if (numRows * numCols != arg->getNumRows() * arg->getNumCols()) throw std::runtime_error("reshape must retain the number of cells"); - if (arg->getRowSkip() == arg->getNumCols() && res == nullptr) - res = DataObjectFactory::create>(numRows, numCols, arg->getValuesSharedPtr()); - else { + if (arg->getRowSkip() == arg->getNumCols() && arg->isView() == false) { + res = DataObjectFactory::create>(numRows, numCols, arg); + } else { if (res == nullptr) res = DataObjectFactory::create>(numRows, numCols, false); diff --git a/src/runtime/local/kernels/Tri.h b/src/runtime/local/kernels/Tri.h index b57b29bc5..b73d19c79 100644 --- a/src/runtime/local/kernels/Tri.h +++ b/src/runtime/local/kernels/Tri.h @@ -126,8 +126,8 @@ template struct Tri> { rowOffsetsRes[0] = 0; for (size_t r = 0, pos = 0; r < numRows; r++, (*inc)++) { const size_t rowNumNonZeros = arg->getNumNonZeros(r); - const size_t *rowColIdxs = arg->getColIdxs(r); - const VT *rowValues = arg->getValues(r); + const size_t *rowColIdxs = arg->getColIdxsOfRow(r); + const VT *rowValues = arg->getRowValues(r); for (size_t i = 0; i < rowNumNonZeros; i++) { const size_t c = rowColIdxs[i]; diff --git a/src/runtime/local/kernels/kernels.json b/src/runtime/local/kernels/kernels.json index 22e285d77..ee3c4bf2b 100644 --- a/src/runtime/local/kernels/kernels.json +++ b/src/runtime/local/kernels/kernels.json @@ -3750,6 +3750,7 @@ [["DenseMatrix", "uint8_t"]], [["CSRMatrix", "double"]], [["CSRMatrix", "float"]], + [["CSRMatrix", "int64_t"]], ["Frame"] ] }, diff --git a/test/runtime/local/datastructures/CSRMatrixTest.cpp b/test/runtime/local/datastructures/CSRMatrixTest.cpp index 827679e89..4cb9a4e70 100644 --- a/test/runtime/local/datastructures/CSRMatrixTest.cpp +++ b/test/runtime/local/datastructures/CSRMatrixTest.cpp @@ -34,7 +34,7 @@ TEMPLATE_TEST_CASE("CSRMatrix allocates enough space", TAG_DATASTRUCTURES, ALL_V const size_t numCols = 2000; const size_t numNonZeros = 500; - CSRMatrix *m = DataObjectFactory::create>(numRows, numCols, numNonZeros, false); + auto m = DataObjectFactory::create>(numRows, numCols, numNonZeros, false); ValueType *values = m->getValues(); size_t *colIdxs = m->getColIdxs(); @@ -60,10 +60,8 @@ TEST_CASE("CSRMatrix sub-matrix works properly", TAG_DATASTRUCTURES) { const size_t numColsOrig = 7; const size_t numNonZeros = 3; - CSRMatrix *mOrig = - DataObjectFactory::create>(numRowsOrig, numColsOrig, numNonZeros, true); - CSRMatrix *mSub = DataObjectFactory::create>(mOrig, 3, 5); - + auto mOrig = DataObjectFactory::create>(numRowsOrig, numColsOrig, numNonZeros, true); + auto mSub = DataObjectFactory::create>(mOrig, 3, 5); // Sub-matrix dimensions are as expected. CHECK(mSub->getNumRows() == 2); CHECK(mSub->getNumCols() == numColsOrig); diff --git a/test/runtime/local/datastructures/DenseMatrixTest.cpp b/test/runtime/local/datastructures/DenseMatrixTest.cpp index 8314439f4..e1b3c44de 100644 --- a/test/runtime/local/datastructures/DenseMatrixTest.cpp +++ b/test/runtime/local/datastructures/DenseMatrixTest.cpp @@ -167,9 +167,8 @@ TEST_CASE("DenseMatrix sub-matrix works properly", TAG_DATASTRUCTURES) { const size_t numColsOrig = 7; const size_t numCellsOrig = numRowsOrig * numColsOrig; - DenseMatrix *mOrig = DataObjectFactory::create>(numRowsOrig, numColsOrig, true); - DenseMatrix *mSub = DataObjectFactory::create>(mOrig, 3, 5, 1, 4); - + auto mOrig = DataObjectFactory::create>(numRowsOrig, numColsOrig, true); + auto mSub = DataObjectFactory::create>(mOrig, 3, 5, 1, 4); // Sub-matrix dimensions are as expected. CHECK(mSub->getNumRows() == 2); CHECK(mSub->getNumCols() == 3); diff --git a/test/runtime/local/kernels/CheckEqTest.cpp b/test/runtime/local/kernels/CheckEqTest.cpp index ed550a53f..7a0e940d1 100644 --- a/test/runtime/local/kernels/CheckEqTest.cpp +++ b/test/runtime/local/kernels/CheckEqTest.cpp @@ -296,8 +296,8 @@ TEST_CASE("CheckEq, frames", TAG_KERNELS) { } SECTION("diff inst, same schema, same cont, same labels") { auto c3 = genGivenVals>(numRows, {VT2(8.8), VT2(9.9), VT2(1.0), VT2(2.0)}); - std::string *labels1 = new std::string[3]{"ab", "cde", "fghi"}; - std::string *labels2 = new std::string[3]{"ab", "cde", "fghi"}; + auto labels1 = new std::string[3]{"ab", "cde", "fghi"}; + auto labels2 = new std::string[3]{"ab", "cde", "fghi"}; frame1 = DataObjectFactory::create(cols, labels1); std::vector cols2 = {c0, c1, c3}; auto frame2 = DataObjectFactory::create(cols2, labels2); @@ -307,8 +307,8 @@ TEST_CASE("CheckEq, frames", TAG_KERNELS) { } SECTION("diff inst, same schema, same cont, diff labels") { auto c3 = genGivenVals>(numRows, {VT2(8.8), VT2(9.9), VT2(1.0), VT2(2.0)}); - std::string *labels1 = new std::string[3]{"ab", "cde", "fghi"}; - std::string *labels2 = new std::string[3]{"ab", "cde", "fxyz"}; + auto labels1 = new std::string[3]{"ab", "cde", "fghi"}; + auto labels2 = new std::string[3]{"ab", "cde", "fxyz"}; frame1 = DataObjectFactory::create(cols, labels1); std::vector cols2 = {c0, c1, c3}; auto frame2 = DataObjectFactory::create(cols2, labels2);