From 2e15f49815505ac2d86d5b8cf5097c6f73813338 Mon Sep 17 00:00:00 2001 From: Mark Dokter Date: Mon, 17 Jun 2024 17:46:21 +0200 Subject: [PATCH] [DAPHNE-#758] MetaDataObject refactoring, CSRMatrix support, Pinning * This commit introduces the meta data object to the CSRMatrix data type To implement this change, handling of the AllocationDescriptors has been refactored out of DenseMatrix. * Separate handling of ranges Since tracking of ranges of data is only used in the distributed setting for now, we will handle this separately and assume always a full allocation for local computation. This should result in less unnecessary "if range not null do this, else do that". * Memory pinning To prevent excessive allocation ID lookups in the hot path, especially when using --vec, this change "pins" memory by allocation type of previous accesses. Simply put, as long as there is no different access type (e.g., call getValues() for host vs device memory) it is assumed, that the data is not changed and no query of the meta data object needs to be done. Closes #758 --- src/api/cli/DaphneUserConfig.h | 1 + .../coordinator/kernels/Broadcast.h | 25 +- .../coordinator/kernels/Distribute.h | 34 ++- .../coordinator/kernels/DistributedCollect.h | 54 ++-- .../coordinator/kernels/DistributedCompute.h | 31 +-- .../coordinator/kernels/DistributedRead.h | 66 +++-- .../coordinator/kernels/DistributedWrite.h | 70 ++--- .../scheduling/LoadPartitioningDistributed.h | 28 +- .../distributed/worker/MPICoordinator.h | 11 +- src/runtime/distributed/worker/main.cpp | 31 ++- src/runtime/local/context/CUDAContext.cpp | 4 +- src/runtime/local/context/CUDAContext.h | 2 +- .../datastructures/AllocationDescriptorCUDA.h | 9 +- .../datastructures/AllocationDescriptorGRPC.h | 10 +- .../datastructures/AllocationDescriptorHost.h | 31 ++- .../datastructures/AllocationDescriptorMPI.h | 33 +-- .../local/datastructures/CSRMatrix.cpp | 50 ++++ src/runtime/local/datastructures/CSRMatrix.h | 119 ++++---- .../local/datastructures/DataPlacement.h | 22 +- .../local/datastructures/DenseMatrix.cpp | 258 ++++++++---------- .../local/datastructures/DenseMatrix.h | 101 +------ .../datastructures/IAllocationDescriptor.h | 14 +- .../local/datastructures/MetaDataObject.cpp | 135 ++++++--- .../local/datastructures/MetaDataObject.h | 21 +- .../local/datastructures/Structure.cpp | 21 +- src/runtime/local/datastructures/Structure.h | 6 +- src/runtime/local/io/DaphneSerializer.h | 4 +- src/runtime/local/kernels/AggAll.h | 6 +- src/runtime/local/kernels/AggCol.h | 5 +- src/runtime/local/kernels/AggCum.h | 13 +- src/runtime/local/kernels/AggRow.h | 4 +- src/runtime/local/kernels/CMakeLists.txt | 1 + src/runtime/local/kernels/CheckEqApprox.h | 4 +- src/runtime/local/kernels/DiagMatrix.h | 2 +- src/runtime/local/kernels/EwBinaryMat.h | 40 +-- src/runtime/local/kernels/Gemv.h | 4 +- src/runtime/local/kernels/HasSpecialValue.h | 4 +- src/runtime/local/kernels/IsSymmetric.h | 8 +- src/runtime/local/kernels/MatMul.h | 4 +- src/runtime/local/kernels/NumDistinctApprox.h | 2 +- src/runtime/local/kernels/Replace.h | 8 +- src/runtime/local/kernels/Reshape.h | 6 +- src/runtime/local/kernels/Tri.h | 4 +- src/runtime/local/kernels/kernels.json | 1 + .../local/datastructures/CSRMatrixTest.cpp | 8 +- .../local/datastructures/DenseMatrixTest.cpp | 5 +- test/runtime/local/kernels/CheckEqTest.cpp | 8 +- 47 files changed, 729 insertions(+), 599 deletions(-) diff --git a/src/api/cli/DaphneUserConfig.h b/src/api/cli/DaphneUserConfig.h index d33a18e30..6738dd755 100644 --- a/src/api/cli/DaphneUserConfig.h +++ b/src/api/cli/DaphneUserConfig.h @@ -39,6 +39,7 @@ struct DaphneUserConfig { bool use_cuda = false; bool use_vectorized_exec = false; bool use_distributed = false; + bool use_grpc_async = false; bool use_obj_ref_mgnt = true; bool use_ipa_const_propa = true; bool use_phy_op_selection = true; diff --git a/src/runtime/distributed/coordinator/kernels/Broadcast.h b/src/runtime/distributed/coordinator/kernels/Broadcast.h index 1d34470c6..80004acab 100644 --- a/src/runtime/distributed/coordinator/kernels/Broadcast.h +++ b/src/runtime/distributed/coordinator/kernels/Broadcast.h @@ -76,9 +76,9 @@ template struct Broadcast { LoadPartitioningDistributed partioner(DistributionSchema::BROADCAST, mat, dctx); while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); - auto rank = dynamic_cast(*(dp->allocation)).getRank(); + auto rank = dynamic_cast(dp->getAllocation(0))->getRank(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; // Minimum chunk size @@ -114,12 +114,13 @@ template struct Broadcast { WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank); std::string address = std::to_string(rank); DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = dataAcknowledgement.identifier; data.numRows = dataAcknowledgement.numRows; data.numCols = dataAcknowledgement.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + auto alloc = dynamic_cast(dp->getAllocation(0)); + alloc->updateDistributedData(data); } } }; @@ -150,12 +151,12 @@ template struct Broadcast { while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto address = dynamic_cast(*(dp->allocation)).getLocation(); + auto address = dynamic_cast(dp->getAllocation(0))->getLocation(); - StoredInfo storedInfo({dp->dp_id}); + StoredInfo storedInfo({dp->getID()}); caller.asyncStoreCall(address, storedInfo); // Minimum chunk size auto min_chunk_size = @@ -187,7 +188,7 @@ template struct Broadcast { auto dp_id = response.storedInfo.dp_id; auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); auto storedData = response.result; data.identifier = storedData.identifier(); @@ -195,7 +196,7 @@ template struct Broadcast { data.numCols = storedData.num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }; }; @@ -222,10 +223,10 @@ template struct Broadcast { while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto workerAddr = dynamic_cast(*(dp->allocation)).getLocation(); + auto workerAddr = dynamic_cast(dp->getAllocation(0))->getLocation(); std::thread t([=, &mat]() { // TODO Consider saving channels inside DaphneContext grpc::ChannelArguments ch_args; @@ -260,7 +261,7 @@ template struct Broadcast { newData.numRows = storedData.num_rows(); newData.numCols = storedData.num_cols(); newData.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(newData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(newData); }); threads_vector.push_back(move(t)); } diff --git a/src/runtime/distributed/coordinator/kernels/Distribute.h b/src/runtime/distributed/coordinator/kernels/Distribute.h index 0d734395b..cb32266c3 100644 --- a/src/runtime/distributed/coordinator/kernels/Distribute.h +++ b/src/runtime/distributed/coordinator/kernels/Distribute.h @@ -63,13 +63,12 @@ template struct Distribute { while (partioner.HasNextChunk()) { DataPlacement *dp = partioner.GetNextChunk(); - auto rank = dynamic_cast(*(dp->allocation)).getRank(); + auto rank = dynamic_cast(dp->getAllocation(0))->getRank(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); - + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); // Minimum chunk size auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer
::length(slicedMat) @@ -91,12 +90,12 @@ template struct Distribute { WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank); std::string address = std::to_string(rank); DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = dataAcknowledgement.identifier; data.numRows = dataAcknowledgement.numRows; data.numCols = dataAcknowledgement.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } }; @@ -121,17 +120,17 @@ template struct Distribute { while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); // Skip if already placed at workers - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; distributed::Data protoMsg; std::vector buffer; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); - StoredInfo storedInfo({dp->dp_id}); + StoredInfo storedInfo({dp->getID()}); - auto address = dynamic_cast(*(dp->allocation)).getLocation(); + auto address = dynamic_cast(dp->getAllocation(0))->getLocation(); caller.asyncStoreCall(address, storedInfo); // Minimum chunk size @@ -161,12 +160,12 @@ template struct Distribute { auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = storedData.identifier(); data.numRows = storedData.num_rows(); data.numCols = storedData.num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } }; @@ -188,22 +187,21 @@ template struct Distribute { while (partioner.HasNextChunk()) { auto dp = partioner.GetNextChunk(); // Skip if already placed at workers - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; std::vector buffer; - auto workerAddr = dynamic_cast(*(dp->allocation)).getLocation(); + auto workerAddr = dynamic_cast(dp->getAllocation(0))->getLocation(); std::thread t([=, &mat]() { auto stub = ctx->stubs[workerAddr].get(); distributed::StoredData storedData; grpc::ClientContext grpc_ctx; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); auto serializer = DaphneSerializerChunks
(slicedMat, dctx->config.max_distributed_serialization_chunk_size); - distributed::Data protoMsg; // Send chunks @@ -222,10 +220,10 @@ template struct Distribute { newData.numRows = storedData.num_rows(); newData.numCols = storedData.num_cols(); newData.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(newData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(newData); DataObjectFactory::destroy(slicedMat); }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/kernels/DistributedCollect.h b/src/runtime/distributed/coordinator/kernels/DistributedCollect.h index cee446f17..2432c82c0 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedCollect.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedCollect.h @@ -61,6 +61,7 @@ template void distributedCollect(DT *&mat, const // ---------------------------------------------------------------------------- #ifdef USE_MPI template struct DistributedCollect { + static void apply(DT *&mat, const VectorCombine &combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already " @@ -73,7 +74,7 @@ template struct DistributedCollect { std::string address = std::to_string(rank); auto dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); WorkerImpl::StoredInfo info = {distributedData.identifier, distributedData.numRows, distributedData.numCols}; MPIHelper::requestData(rank, info); @@ -98,21 +99,22 @@ template struct DistributedCollect { if (combine == VectorCombine::ADD) { ewBinaryMat(BinaryOpCode::ADD, denseMat, slicedMat, denseMat, nullptr); } else { - auto resValues = denseMat->getValues() + (dp->range->r_start * denseMat->getRowSkip()); + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++) { - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); + for (size_t r = 0; r < dp->getRange()->r_len; r++) { + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, + dp->getRange()->c_len * sizeof(double)); resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } } DataObjectFactory::destroy(slicedMat); - collectedDataItems += dp->range->r_len * dp->range->c_len; + collectedDataItems += dp->getRange()->r_len * dp->getRange()->c_len; - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributedData.isPlacedAtWorker = false; - dynamic_cast(*(dp->allocation)).updateDistributedData(distributedData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(distributedData); // this is to handle the case when not all workers participate in // the computation, i.e., number of workers is larger than of the // work items @@ -128,6 +130,7 @@ template struct DistributedCollect { // ---------------------------------------------------------------------------- template struct DistributedCollect { + static void apply(DT *&mat, const VectorCombine &combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already " @@ -139,12 +142,12 @@ template struct DistributedCollect caller(dctx); - auto dpVector = mat->getMetaDataObject()->getDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); + auto dpVector = mat->getMetaDataObject()->getRangeDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); for (auto &dp : *dpVector) { - auto address = dp->allocation->getLocation(); + auto address = dp->getAllocation(0)->getLocation(); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); - StoredInfo storedInfo({dp->dp_id}); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); + StoredInfo storedInfo({dp->getID()}); distributed::StoredData protoData; protoData.set_identifier(distributedData.identifier); protoData.set_num_rows(distributedData.numRows); @@ -157,7 +160,7 @@ template struct DistributedCollectgetMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); auto matProto = response.result; @@ -174,10 +177,11 @@ template struct DistributedCollectgetValues() + (dp->range->r_start * denseMat->getRowSkip()); + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++) { - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); + for (size_t r = 0; r < dp->getRange()->r_len; r++) { + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, + dp->getRange()->c_len * sizeof(double)); resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } @@ -185,7 +189,7 @@ template struct DistributedCollect(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }; }; @@ -195,6 +199,7 @@ template struct DistributedCollect struct DistributedCollect { + static void apply(DT *&mat, const VectorCombine &combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already " @@ -205,11 +210,11 @@ template struct DistributedCollect threads_vector; std::mutex lock; - auto dpVector = mat->getMetaDataObject()->getDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); + auto dpVector = mat->getMetaDataObject()->getRangeDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); for (auto &dp : *dpVector) { - auto address = dp->allocation->getLocation(); + auto address = dp->getAllocation(0)->getLocation(); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distributedData.identifier); protoData.set_num_rows(distributedData.numRows); @@ -236,19 +241,20 @@ template struct DistributedCollectgetValues() + (dp->range->r_start * denseMat->getRowSkip()); + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++) { - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); + for (size_t r = 0; r < dp->getRange()->r_len; r++) { + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, + dp->getRange()->c_len * sizeof(double)); resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } } DataObjectFactory::destroy(slicedMat); distributedData.isPlacedAtWorker = false; - dynamic_cast(*(dp->allocation)).updateDistributedData(distributedData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(distributedData); }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/kernels/DistributedCompute.h b/src/runtime/distributed/coordinator/kernels/DistributedCompute.h index f909906c2..508303c37 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedCompute.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedCompute.h @@ -14,8 +14,7 @@ * limitations under the License. */ -#ifndef SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOMPUTE_H -#define SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOMPUTE_H +#pragma once #include #include @@ -75,7 +74,7 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); MPIHelper::StoredInfo storedData({distrData.identifier, distrData.numRows, distrData.numCols}); task.inputs.push_back(storedData); @@ -94,12 +93,12 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(std::to_string(rank)); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = info.identifier; data.numRows = info.numRows; data.numCols = info.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } } @@ -126,13 +125,13 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distrData.identifier); @@ -143,7 +142,7 @@ template struct DistributedCompute struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = computeResult.outputs()[o].stored().identifier(); data.numRows = computeResult.outputs()[o].stored().num_rows(); data.numCols = computeResult.outputs()[o].stored().num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } } @@ -192,13 +191,13 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distrData.identifier); @@ -222,19 +221,17 @@ template struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = computeResult.outputs()[o].stored().identifier(); data.numRows = computeResult.outputs()[o].stored().num_rows(); data.numCols = computeResult.outputs()[o].stored().num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); } }; - -#endif // SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDCOMPUTE_H diff --git a/src/runtime/distributed/coordinator/kernels/DistributedRead.h b/src/runtime/distributed/coordinator/kernels/DistributedRead.h index 0ab6f72a5..1a0fa2e33 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedRead.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedRead.h @@ -68,7 +68,9 @@ template void distributedRead(DTRes *&res, const char *filename, D // MPI // ---------------------------------------------------------------------------- template struct DistributedRead { - static void apply(DTRes *&res, const char *filename, DCTX(dctx)) { throw std::runtime_error("not implemented"); } + static void apply(DTRes *&res, const char *filename, DCTX(dctx)) { + throw std::runtime_error("DistributedRead not implemented"); + } }; #endif @@ -77,7 +79,9 @@ template struct DistributedRead // ---------------------------------------------------------------------------- template struct DistributedRead { - static void apply(DTRes *&res, const char *filename, DCTX(dctx)) { throw std::runtime_error("not implemented"); } + static void apply(DTRes *&res, const char *filename, DCTX(dctx)) { + throw std::runtime_error("DistributedRead not implemented"); + } }; // ---------------------------------------------------------------------------- @@ -96,39 +100,45 @@ template struct DistributedRead threads_vector; LoadPartitioningDistributed partioner(DistributionSchema::DISTRIBUTE, res, dctx); + auto hdfsFn = std::string(filename); + while (partioner.HasNextChunk()) { - auto hdfsFn = std::string(filename); auto dp = partioner.GetNextChunk(); - auto workerAddr = dynamic_cast(dp->allocation.get())->getLocation(); - std::thread t([=, &res]() { - auto stub = ctx->stubs[workerAddr].get(); - - distributed::HDFSFile fileData; - fileData.set_filename(hdfsFn); - fileData.set_start_row(dp->range->r_start); - fileData.set_num_rows(dp->range->r_len); - fileData.set_num_cols(dp->range->c_len); - - grpc::ClientContext grpc_ctx; - distributed::StoredData response; - - auto status = stub->ReadHDFS(&grpc_ctx, fileData, &response); - if (!status.ok()) - throw std::runtime_error(status.error_message()); - - DistributedData newData; - newData.identifier = response.identifier(); - newData.numRows = response.num_rows(); - newData.numCols = response.num_cols(); - newData.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(newData); - }); - threads_vector.push_back(move(t)); + if (auto grpc_alloc = dynamic_cast(dp->getAllocation(0))) { + auto workerAddr = grpc_alloc->getLocation(); + std::thread t([=, &res]() { + auto stub = ctx->stubs[workerAddr].get(); + + distributed::HDFSFile fileData; + fileData.set_filename(hdfsFn); + fileData.set_start_row(dp->getRange()->r_start); + fileData.set_num_rows(dp->getRange()->r_len); + fileData.set_num_cols(dp->getRange()->c_len); + + grpc::ClientContext grpc_ctx; + distributed::StoredData response; + + auto status = stub->ReadHDFS(&grpc_ctx, fileData, &response); + if (!status.ok()) + throw std::runtime_error(status.error_message()); + + DistributedData newData; + newData.identifier = response.identifier(); + newData.numRows = response.num_rows(); + newData.numCols = response.num_cols(); + newData.isPlacedAtWorker = true; + grpc_alloc->updateDistributedData(newData); + }); + threads_vector.push_back(move(t)); + } else + throw std::runtime_error("dynamic_cast(alloc) failed (returned nullptr)"); } for (auto &thread : threads_vector) thread.join(); +#else + throw std::runtime_error("DistributedRead not implemented"); #endif } }; diff --git a/src/runtime/distributed/coordinator/kernels/DistributedWrite.h b/src/runtime/distributed/coordinator/kernels/DistributedWrite.h index 3e4488520..248cd4d62 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedWrite.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedWrite.h @@ -118,41 +118,45 @@ template struct DistributedWritegetMetaDataObject()->getDataPlacementByLocation(workerAddr))) { - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); - if (data.isPlacedAtWorker) { - std::thread t([=, &mat]() { - auto stub = ctx->stubs[workerAddr].get(); - - distributed::HDFSWriteInfo fileData; - fileData.mutable_matrix()->set_identifier(data.identifier); - fileData.mutable_matrix()->set_num_rows(data.numRows); - fileData.mutable_matrix()->set_num_cols(data.numCols); - - fileData.set_dirname(hdfsfilename.c_str()); - fileData.set_segment(std::to_string(chunkId).c_str()); - - grpc::ClientContext grpc_ctx; - distributed::Empty empty; - - auto status = stub->WriteHDFS(&grpc_ctx, fileData, &empty); - if (!status.ok()) - throw std::runtime_error(status.error_message()); - }); - threads_vector.push_back(move(t)); - } else { - auto slicedMat = - mat->sliceRow(dp->range.get()->r_start, dp->range.get()->r_start + dp->range.get()->r_len); - if (extension == ".csv") { - writeHDFSCsv(slicedMat, hdfsfilename.c_str(), dctx); - } else if (extension == ".dbdf") { - writeDaphneHDFS(slicedMat, hdfsfilename.c_str(), dctx); + if (auto grpc_alloc = dynamic_cast(dp->getAllocation(0))) { + auto data = grpc_alloc->getDistributedData(); + + if (data.isPlacedAtWorker) { + std::thread t([=, &mat]() { + auto stub = ctx->stubs[workerAddr].get(); + + distributed::HDFSWriteInfo fileData; + fileData.mutable_matrix()->set_identifier(data.identifier); + fileData.mutable_matrix()->set_num_rows(data.numRows); + fileData.mutable_matrix()->set_num_cols(data.numCols); + + fileData.set_dirname(hdfsfilename.c_str()); + fileData.set_segment(std::to_string(chunkId).c_str()); + + grpc::ClientContext grpc_ctx; + distributed::Empty empty; + + auto status = stub->WriteHDFS(&grpc_ctx, fileData, &empty); + if (!status.ok()) + throw std::runtime_error(status.error_message()); + }); + threads_vector.push_back(move(t)); + } else { + auto slicedMat = + mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); + if (extension == ".csv") { + writeHDFSCsv(slicedMat, hdfsfilename.c_str(), dctx); + } else if (extension == ".dbdf") { + writeDaphneHDFS(slicedMat, hdfsfilename.c_str(), dctx); + } } + } else { + continue; } - } else { - continue; - } - // TODO we should also store ranges that did not have a - // dataplacement associated with them + // TODO we should also store ranges that did not have a + // dataplacement associated with them + } else + throw std::runtime_error("dynamic_cast(alloc) failed (returned nullptr)"); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h b/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h index a05a4bc30..362fe9c92 100644 --- a/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h +++ b/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h @@ -16,7 +16,8 @@ #pragma once -#include +#include + #include #include #include @@ -54,8 +55,7 @@ template class LoadPartitioningDistributed { // Here we provide the different implementations. // Another solution would be to make sure that every constructor is similar // so this would not be needed. - static ALLOCATOR CreateAllocatorDescriptor(DaphneContext *ctx, const std::string &addr, - const DistributedData &data) { + static ALLOCATOR CreateAllocatorDescriptor(DaphneContext* ctx, const std::string &addr, DistributedData& data) { if constexpr (std::is_same_v) return AllocationDescriptorMPI(std::stoi(addr), ctx, data); else if constexpr (std::is_same_v) @@ -106,25 +106,25 @@ template class LoadPartitioningDistributed { DataPlacement *dp; if ((dp = mat->getMetaDataObject()->getDataPlacementByLocation(workerAddr))) { - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); // Check if existing placement matches the same ranges we currently // need if (data.isPlacedAtWorker) { - auto existingRange = dp->range.get(); + auto existingRange = dp->getRange(); if (*existingRange == range) data.isPlacedAtWorker = true; else { - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); + mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); data.isPlacedAtWorker = false; } } else - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); + mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); // TODO Currently we do not support distributing/splitting // by columns. When we do, this should be changed (e.g. Index(0, // taskIndex)) This can be decided based on DistributionSchema data.ix = GetDistributedIndex(); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } else { // Else, create new object metadata entry DistributedData data; // TODO Currently we do not support distributing/splitting @@ -132,7 +132,9 @@ template class LoadPartitioningDistributed { // taskIndex)) data.ix = GetDistributedIndex(); auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data); - dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + dp = mat->getMetaDataObject()->addDataPlacement(allocations, &range); } taskIndex++; return dp; @@ -192,11 +194,13 @@ template class LoadPartitioningDistributed { // If dp already exists for this worker, update the range and // data if (auto dp = (*outputs[i])->getMetaDataObject()->getDataPlacementByLocation(workerAddr)) { - (*outputs[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + (*outputs[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } else { // else create new dp entry auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data); - ((*outputs[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + ((*outputs[i]))->getMetaDataObject()->addDataPlacement(allocations, &range); } } } diff --git a/src/runtime/distributed/worker/MPICoordinator.h b/src/runtime/distributed/worker/MPICoordinator.h index 4563af582..5162b6f45 100644 --- a/src/runtime/distributed/worker/MPICoordinator.h +++ b/src/runtime/distributed/worker/MPICoordinator.h @@ -95,11 +95,14 @@ class MPICoordinator { std::string addr = std::to_string(COORDINATOR); // If dp already exists for this worker, update the range and data if (auto dp = (*res[i])->getMetaDataObject()->getDataPlacementByLocation(addr)) { - (*res[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + (*res[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); + } else { // else create new dp entry - AllocationDescriptorMPI allocationDescriptor(dctx, COORDINATOR, data); - ((*res[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + AllocationDescriptorMPI allocationDescriptor(COORDINATOR, dctx, data); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + ((*res[i]))->getMetaDataObject()->addDataPlacement(allocations, &range); } } // solver.Compute(&outputsStoredInfo, inputsStoredInfo, mlirCode); diff --git a/src/runtime/distributed/worker/main.cpp b/src/runtime/distributed/worker/main.cpp index ec9eeb6f9..a77cc8193 100644 --- a/src/runtime/distributed/worker/main.cpp +++ b/src/runtime/distributed/worker/main.cpp @@ -21,25 +21,28 @@ #include "WorkerImplGRPCSync.h" #include +// global logger handle for this executable +static std::unique_ptr daphneLogger; + int main(int argc, char *argv[]) { DaphneUserConfig user_config{}; std::string configFile = "WorkerConfig.json"; - if (argc == 3) { + // initialize logging facility + daphneLogger = std::make_unique(user_config); + auto log = daphneLogger->getDefaultLogger(); + + if (argc == 3) configFile = argv[2]; - } - try { - if (ConfigParser::fileExists(configFile)) { + if (ConfigParser::fileExists(configFile)) { + try { ConfigParser::readUserConfig(configFile, user_config); + } catch (std::exception &e) { + log->warn("Parser error while reading worker config from {}:\n{}", configFile, e.what()); } - } catch (std::exception &e) { - spdlog::error("Parser error while reading worker config:\n{}", e.what()); - spdlog::error("You can create a WorkerConfig.json to configure the worker.\n"); } - user_config.resolveLibDir(); - auto logger = std::make_unique(user_config); if (argc < 2 || argc > 3) { std::cout << "Usage: " << argv[0] << " [ConfigFile]" << std::endl; @@ -47,11 +50,13 @@ int main(int argc, char *argv[]) { } auto addr = argv[1]; - // TODO choose specific implementation based on arguments or config file - WorkerImpl *service = new WorkerImplGRPCSync(addr, user_config); + std::unique_ptr service; + if (user_config.use_grpc_async) + service = std::make_unique(addr, user_config); + else + service = std::make_unique(addr, user_config); - std::cout << "Started Distributed Worker on `" << addr << "`\n"; + log->info(fmt::format("Started Distributed Worker on {}", addr)); service->Wait(); - return 0; } \ No newline at end of file diff --git a/src/runtime/local/context/CUDAContext.cpp b/src/runtime/local/context/CUDAContext.cpp index dddd530bb..c56012383 100644 --- a/src/runtime/local/context/CUDAContext.cpp +++ b/src/runtime/local/context/CUDAContext.cpp @@ -52,7 +52,6 @@ void CUDAContext::init() { logger->info("Using CUDA device {}: {}\n\tAvailable mem: {} Total mem: {} " "using {}% -> {}", device_id, device_properties.name, available, total, mem_usage * 100, mem_budget); - CHECK_CUBLAS(cublasCreate(&cublas_handle)); CHECK_CUSPARSE(cusparseCreate(&cusparse_handle)); CHECK_CUDNN(cudnnCreate(&cudnn_handle)); @@ -69,7 +68,6 @@ void CUDAContext::init() { CHECK_CUSOLVER(cusolverDnSetStream(cusolver_handle, cusolver_stream)); getCUDNNWorkspace(64 * 1024 * 1024); - // CHECK_CUBLAS(cublasLtCreate(&cublaslt_Handle)); // CHECK_CUDART(cudaMalloc(&cublas_workspace, cublas_workspace_size)); } @@ -134,4 +132,4 @@ void CUDAContext::free(size_t id) { allocations.erase(id); } -int CUDAContext::getMaxNumThreads() { return device_properties.maxThreadsPerBlock; } +int CUDAContext::getMaxNumThreads() const { return device_properties.maxThreadsPerBlock; } diff --git a/src/runtime/local/context/CUDAContext.h b/src/runtime/local/context/CUDAContext.h index 15c03d9a2..faf6882d6 100644 --- a/src/runtime/local/context/CUDAContext.h +++ b/src/runtime/local/context/CUDAContext.h @@ -77,7 +77,7 @@ class CUDAContext final : public IContext { void *getCUDNNWorkspace(size_t size); [[nodiscard]] size_t getMemBudget() const { return mem_budget; } - int getMaxNumThreads(); + int getMaxNumThreads() const; static CUDAContext *get(DaphneContext *ctx, size_t id) { return dynamic_cast(ctx->getCUDAContext(id)); } diff --git a/src/runtime/local/datastructures/AllocationDescriptorCUDA.h b/src/runtime/local/datastructures/AllocationDescriptorCUDA.h index 48fb9c099..bd79c175e 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorCUDA.h +++ b/src/runtime/local/datastructures/AllocationDescriptorCUDA.h @@ -28,8 +28,6 @@ class AllocationDescriptorCUDA : public IAllocationDescriptor { size_t alloc_id{}; public: - AllocationDescriptorCUDA() = delete; - AllocationDescriptorCUDA(DaphneContext *ctx, uint32_t device_id) : device_id(device_id), dctx(ctx) {} ~AllocationDescriptorCUDA() override { @@ -45,9 +43,12 @@ class AllocationDescriptorCUDA : public IAllocationDescriptor { // [[nodiscard]] uint32_t getLocation() const { return device_id; } [[nodiscard]] std::string getLocation() const override { return std::to_string(device_id); } - void createAllocation(size_t size, bool zero) override { + [[nodiscard]] std::unique_ptr createAllocation(size_t size, bool zero) const override { + auto new_alloc = std::make_unique(dctx, device_id); auto ctx = CUDAContext::get(dctx, device_id); - data = ctx->malloc(size, zero, alloc_id); + new_alloc->data = ctx->malloc(size, zero, new_alloc->alloc_id); + new_alloc->size = size; + return new_alloc; } std::shared_ptr getData() override { return data; } diff --git a/src/runtime/local/datastructures/AllocationDescriptorGRPC.h b/src/runtime/local/datastructures/AllocationDescriptorGRPC.h index 00e0a7658..0eb7aa147 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorGRPC.h +++ b/src/runtime/local/datastructures/AllocationDescriptorGRPC.h @@ -33,15 +33,17 @@ class AllocationDescriptorGRPC : public IAllocationDescriptor { std::shared_ptr data; public: - AllocationDescriptorGRPC(){}; + AllocationDescriptorGRPC() {}; AllocationDescriptorGRPC(DaphneContext *ctx, const std::string &address, const DistributedData &data) - : ctx(ctx), workerAddress(address), distributedData(data){}; + : ctx(ctx), workerAddress(address), distributedData(data) {}; - ~AllocationDescriptorGRPC() override{}; + ~AllocationDescriptorGRPC() override {}; [[nodiscard]] ALLOCATION_TYPE getType() const override { return type; }; std::string getLocation() const override { return workerAddress; }; - void createAllocation(size_t size, bool zero) override {} + std::unique_ptr createAllocation(size_t size, bool zero) const override { + return nullptr; + } // TODO: Implement transferTo and transferFrom functions std::shared_ptr getData() override { throw std::runtime_error("TransferTo/From functions are not implemented yet."); diff --git a/src/runtime/local/datastructures/AllocationDescriptorHost.h b/src/runtime/local/datastructures/AllocationDescriptorHost.h index af7e71e67..96ca3dcaa 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorHost.h +++ b/src/runtime/local/datastructures/AllocationDescriptorHost.h @@ -18,6 +18,7 @@ #include "DataPlacement.h" #include +#include class AllocationDescriptorHost : public IAllocationDescriptor { ALLOCATION_TYPE type = ALLOCATION_TYPE::HOST; @@ -25,14 +26,40 @@ class AllocationDescriptorHost : public IAllocationDescriptor { public: ~AllocationDescriptorHost() override = default; + [[nodiscard]] ALLOCATION_TYPE getType() const override { return type; } - void createAllocation(size_t size, bool zero) override {} - std::string getLocation() const override { return "Host"; } + + static std::unique_ptr createHostAllocation(std::shared_ptr data, size_t size, + bool zero) { + auto new_alloc = std::make_unique(); + new_alloc->data = std::move(data); + new_alloc->size = size; + if (zero) + memset(new_alloc->data.get(), 0, size); + return new_alloc; + } + + [[nodiscard]] std::unique_ptr createAllocation(size_t size, bool zero) const override { + auto new_alloc = std::make_unique(); + new_alloc->size = size; + new_alloc->data = std::shared_ptr(new std::byte[size], std::default_delete()); + + if (zero) + memset(new_alloc->data.get(), 0, size); + + return new_alloc; + } + + [[nodiscard]] std::string getLocation() const override { return "Host"; } std::shared_ptr getData() override { return data; } + + void setData(std::shared_ptr &_data) { data = _data; } + void transferTo(std::byte *src, size_t size) override {} void transferFrom(std::byte *dst, size_t size) override {} [[nodiscard]] std::unique_ptr clone() const override { return std::make_unique(*this); } + bool operator==(const IAllocationDescriptor *other) const override { return (getType() == other->getType()); } }; diff --git a/src/runtime/local/datastructures/AllocationDescriptorMPI.h b/src/runtime/local/datastructures/AllocationDescriptorMPI.h index 8f9d0cf72..ff7f26f7a 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorMPI.h +++ b/src/runtime/local/datastructures/AllocationDescriptorMPI.h @@ -14,8 +14,7 @@ * limitations under the License. */ -#ifndef SRC_RUNTIME_LOCAL_DATASTRUCTURE_ALLOCATION_DESCRIPTORMPH_H -#define SRC_RUNTIME_LOCAL_DATASTRUCTURE_ALLOCATION_DESCRIPTORMPH_H +#pragma once #include #include @@ -26,23 +25,24 @@ class AllocationDescriptorMPI : public IAllocationDescriptor { ALLOCATION_TYPE type = ALLOCATION_TYPE::DIST_MPI; - int processRankID; - DaphneContext *ctx; + int processRankID{}; + DaphneContext *ctx{}; DistributedData distributedData; std::shared_ptr data; public: - AllocationDescriptorMPI(){}; - AllocationDescriptorMPI(int id, DaphneContext *ctx, DistributedData data) - : processRankID(id), ctx(ctx), distributedData(data){}; + AllocationDescriptorMPI() = default; + AllocationDescriptorMPI(int id, DaphneContext *ctx, DistributedData &data) + : processRankID(id), ctx(ctx), distributedData(data) {} - ~AllocationDescriptorMPI() override{}; + ~AllocationDescriptorMPI() override = default; [[nodiscard]] ALLOCATION_TYPE getType() const override { return type; }; - std::string getLocation() const override { return std::to_string(processRankID); }; + [[nodiscard]] std::string getLocation() const override { return std::to_string(processRankID); }; + + std::unique_ptr createAllocation(size_t size, bool zero) const override { return nullptr; } - void createAllocation(size_t size, bool zero) override {}; std::shared_ptr getData() override { return nullptr; }; bool operator==(const IAllocationDescriptor *other) const override { @@ -58,10 +58,11 @@ class AllocationDescriptorMPI : public IAllocationDescriptor { void transferTo(std::byte *src, size_t size) override { /* TODO */ }; void transferFrom(std::byte *src, size_t size) override { /* TODO */ }; - const DistributedIndex getDistributedIndex() { return distributedData.ix; } - const DistributedData getDistributedData() { return distributedData; } - void updateDistributedData(DistributedData data_) { distributedData = data_; } - int getRank() { return processRankID; } -}; + DistributedIndex getDistributedIndex() const { return distributedData.ix; } + + DistributedData getDistributedData() { return distributedData; } -#endif // SRC_RUNTIME_LOCAL_DATASTRUCTURE_ALLOCATION_DESCRIPTORMPH_H + void updateDistributedData(DistributedData &data_) { distributedData = data_; } + + int getRank() const { return processRankID; } +}; diff --git a/src/runtime/local/datastructures/CSRMatrix.cpp b/src/runtime/local/datastructures/CSRMatrix.cpp index e49f88ff0..41c1a4ec3 100644 --- a/src/runtime/local/datastructures/CSRMatrix.cpp +++ b/src/runtime/local/datastructures/CSRMatrix.cpp @@ -18,6 +18,56 @@ #include "CSRMatrix.h" +template +CSRMatrix::CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero, + IAllocationDescriptor *allocInfo) + : Matrix(maxNumRows, numCols), numRowsAllocated(maxNumRows), isRowAllocatedBefore(false), is_view(false), + maxNumNonZeros(maxNumNonZeros), lastAppendedRowIdx(0) { + auto val_buf_size = maxNumNonZeros * sizeof(ValueType); + auto cidx_buf_size = maxNumNonZeros * sizeof(size_t); + auto rptr_buf_size = (numRows + 1) * sizeof(size_t); + + std::unique_ptr val_alloc; + std::unique_ptr cidx_alloc; + std::unique_ptr rptr_alloc; + + if (!allocInfo) { + values = std::shared_ptr(new ValueType[maxNumNonZeros], std::default_delete()); + auto bytes = std::reinterpret_pointer_cast(values); + val_alloc = AllocationDescriptorHost::createHostAllocation(bytes, val_buf_size, zero); + colIdxs = std::shared_ptr(new size_t[maxNumNonZeros], std::default_delete()); + bytes = std::reinterpret_pointer_cast(colIdxs); + cidx_alloc = AllocationDescriptorHost::createHostAllocation(bytes, cidx_buf_size, zero); + rowOffsets = std::shared_ptr(new size_t[numRows + 1], std::default_delete()); + bytes = std::reinterpret_pointer_cast(rowOffsets); + rptr_alloc = AllocationDescriptorHost::createHostAllocation(bytes, rptr_buf_size, zero); + } else { + val_alloc = allocInfo->createAllocation(val_buf_size, zero); + cidx_alloc = allocInfo->createAllocation(cidx_buf_size, zero); + rptr_alloc = allocInfo->createAllocation(rptr_buf_size, zero); + + // ToDo: refactor data storage into memory management + if (allocInfo->getType() == ALLOCATION_TYPE::HOST) { + values = std::reinterpret_pointer_cast(val_alloc->getData()); + colIdxs = std::reinterpret_pointer_cast(cidx_alloc->getData()); + rowOffsets = std::reinterpret_pointer_cast(rptr_alloc->getData()); + } + } + + spdlog::debug( + "Creating {} x {} sparse matrix of type: {}. Required memory: vals={}, cidxs={}, rptrs={}, total={} Mb", + numRows, numCols, val_alloc->getTypeName(), static_cast(val_buf_size) / (1048576), + static_cast(cidx_buf_size) / (1048576), static_cast(rptr_buf_size) / (1048576), + static_cast(val_buf_size + cidx_buf_size + rptr_buf_size) / (1048576)); + + std::vector> allocations; + allocations.emplace_back(std::move(val_alloc)); + allocations.emplace_back(std::move(cidx_alloc)); + allocations.emplace_back(std::move(rptr_alloc)); + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); +} + template size_t CSRMatrix::serialize(std::vector &buf) const { return DaphneSerializer>::serialize(this, buf); } diff --git a/src/runtime/local/datastructures/CSRMatrix.h b/src/runtime/local/datastructures/CSRMatrix.h index 08dc4a3e1..9741780b0 100644 --- a/src/runtime/local/datastructures/CSRMatrix.h +++ b/src/runtime/local/datastructures/CSRMatrix.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -60,6 +61,7 @@ template class CSRMatrix : public Matrix { bool isRowAllocatedBefore; + const bool is_view; /** * @brief The maximum number of non-zero values this matrix was allocated * to accommodate. @@ -88,18 +90,8 @@ template class CSRMatrix : public Matrix { * @param zero Whether the allocated memory of the internal arrays shall be * initialized to zeros (`true`), or be left uninitialized (`false`). */ - CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero) - : Matrix(maxNumRows, numCols), numRowsAllocated(maxNumRows), isRowAllocatedBefore(false), - maxNumNonZeros(maxNumNonZeros), values(new ValueType[maxNumNonZeros], std::default_delete()), - colIdxs(new size_t[maxNumNonZeros], std::default_delete()), - rowOffsets(new size_t[numRows + 1], std::default_delete()), lastAppendedRowIdx(0) { - if (zero) { - memset(values.get(), 0, maxNumNonZeros * sizeof(ValueType)); - memset(colIdxs.get(), 0, maxNumNonZeros * sizeof(size_t)); - memset(rowOffsets.get(), 0, (numRows + 1) * sizeof(size_t)); - } - } - + CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero, + IAllocationDescriptor *allocInfo = nullptr); /** * @brief Creates a `CSRMatrix` around a sub-matrix of another `CSRMatrix` * without copying the data. @@ -112,7 +104,7 @@ template class CSRMatrix : public Matrix { */ CSRMatrix(const CSRMatrix *src, size_t rowLowerIncl, size_t rowUpperExcl) : Matrix(rowUpperExcl - rowLowerIncl, src->numCols), - numRowsAllocated(src->numRowsAllocated - rowLowerIncl), isRowAllocatedBefore(rowLowerIncl > 0), + numRowsAllocated(src->numRowsAllocated - rowLowerIncl), isRowAllocatedBefore(rowLowerIncl > 0), is_view(true), lastAppendedRowIdx(0) { if (!src) throw std::runtime_error("CSRMatrix: src must not be null"); @@ -123,16 +115,18 @@ template class CSRMatrix : public Matrix { if (rowLowerIncl >= rowUpperExcl) throw std::runtime_error("CSRMatrix: rowLowerIncl must be lower than rowUpperExcl"); + this->row_offset = rowLowerIncl; maxNumNonZeros = src->maxNumNonZeros; values = src->values; colIdxs = src->colIdxs; rowOffsets = std::shared_ptr(src->rowOffsets, src->rowOffsets.get() + rowLowerIncl); + // ToDo: temporary fix to directly assign mdo + this->mdo = src->mdo; } - virtual ~CSRMatrix() { - // nothing to do - } + [[nodiscard]] size_t offset() const { return this->row_offset; } + virtual ~CSRMatrix() = default; void fillNextPosUntil(size_t nextPos, size_t rowIdx) { if (rowIdx > lastAppendedRowIdx) { for (size_t r = lastAppendedRowIdx + 2; r <= rowIdx + 1; r++) @@ -153,13 +147,16 @@ template class CSRMatrix : public Matrix { this->numRows = numRows; } - size_t getMaxNumNonZeros() const { return maxNumNonZeros; } - size_t getNumNonZeros() const { return rowOffsets.get()[numRows] - rowOffsets.get()[0]; } + [[nodiscard]] size_t getMaxNumNonZeros() const { return maxNumNonZeros; } + [[nodiscard]] size_t getNumNonZeros() const { return getRowOffsets()[numRows] - getRowOffsets()[0]; } - size_t getNumNonZeros(size_t rowIdx) const { + [[nodiscard]] size_t getNumNonZeros(size_t rowIdx) const { if (rowIdx >= numRows) throw std::runtime_error("CSRMatrix (getNumNonZeros): rowIdx is out of bounds"); - return rowOffsets.get()[rowIdx + 1] - rowOffsets.get()[rowIdx]; + auto roff = getRowOffsets(); + auto rownext = roff[rowIdx + 1]; + auto row = roff[rowIdx]; + return rownext - row; } void shrinkNumNonZeros(size_t numNonZeros) { @@ -170,39 +167,59 @@ template class CSRMatrix : public Matrix { // colIdxs arrays. } - ValueType *getValues() { return values.get(); } + ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) { + return reinterpret_cast(this->mdo->getData(0, alloc_desc, range)); + } - const ValueType *getValues() const { return values.get(); } + const ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const { + return reinterpret_cast(this->mdo->getData(0, alloc_desc, range)); + } - ValueType *getValues(size_t rowIdx) { + ValueType *getRowValues(size_t rowIdx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) { // We allow equality here to enable retrieving a pointer to the end. if (rowIdx > numRows) throw std::runtime_error("CSRMatrix (getValues): rowIdx is out of bounds"); - return values.get() + rowOffsets.get()[rowIdx]; + return &(reinterpret_cast(getValues(alloc_desc, range))[getRowOffsets()[rowIdx]]); } - const ValueType *getValues(size_t rowIdx) const { - return const_cast *>(this)->getValues(rowIdx); + const ValueType *getRowValues(size_t rowIdx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) const { + return &(reinterpret_cast(getValues(alloc_desc, range))[getRowOffsets()[rowIdx]]); } - size_t *getColIdxs() { return colIdxs.get(); } + size_t *getColIdxs(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) { + return reinterpret_cast(this->mdo->getData(1, alloc_desc, range)); + } - const size_t *getColIdxs() const { return colIdxs.get(); } + const size_t *getColIdxs(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const { + return reinterpret_cast(this->mdo->getData(1, alloc_desc, range)); + } - size_t *getColIdxs(size_t rowIdx) { + size_t *getColIdxsOfRow(size_t rowIdx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) { // We allow equality here to enable retrieving a pointer to the end. if (rowIdx > numRows) throw std::runtime_error("CSRMatrix (getColIdxs): rowIdx is out of bounds"); - return colIdxs.get() + rowOffsets.get()[rowIdx]; + auto data = this->mdo->getData(1, alloc_desc, range); + return &(reinterpret_cast(data)[getRowOffsets()[rowIdx]]); } - const size_t *getColIdxs(size_t rowIdx) const { - return const_cast *>(this)->getColIdxs(rowIdx); + const size_t *getColIdxsOfRow(size_t rowIdx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) const { + auto data = this->mdo->getData(1, alloc_desc, range); + return &(reinterpret_cast(data)[getRowOffsets()[rowIdx]]); } - size_t *getRowOffsets() { return rowOffsets.get(); } + size_t *getRowOffsets(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) { + auto ptr = reinterpret_cast(this->mdo->getData(2, alloc_desc, range)); + return ptr + offset(); + } - const size_t *getRowOffsets() const { return rowOffsets.get(); } + const size_t *getRowOffsets(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const { + auto ptr = reinterpret_cast(this->mdo->getData(2, alloc_desc, range)); + return ptr + offset(); + } ValueType get(size_t rowIdx, size_t colIdx) const override { if (rowIdx >= numRows) @@ -210,8 +227,8 @@ template class CSRMatrix : public Matrix { if (colIdx >= numCols) throw std::runtime_error("CSRMatrix (get): colIdx is out of bounds"); - const size_t *rowColIdxsBeg = getColIdxs(rowIdx); - const size_t *rowColIdxsEnd = getColIdxs(rowIdx + 1); + const size_t *rowColIdxsBeg = getColIdxsOfRow(rowIdx); + const size_t *rowColIdxsEnd = getColIdxsOfRow(rowIdx + 1); const size_t *ptrExpected = std::lower_bound(rowColIdxsBeg, rowColIdxsEnd, colIdx); if (ptrExpected == rowColIdxsEnd || *ptrExpected != colIdx) @@ -219,7 +236,7 @@ template class CSRMatrix : public Matrix { return ValueType(0); else // Entry for the given coordinates present. - return getValues(rowIdx)[ptrExpected - rowColIdxsBeg]; + return getRowValues(rowIdx)[ptrExpected - rowColIdxsBeg]; } void set(size_t rowIdx, size_t colIdx, ValueType value) override { @@ -228,13 +245,13 @@ template class CSRMatrix : public Matrix { if (colIdx >= numCols) throw std::runtime_error("CSRMatrix (set): colIdx is out of bounds"); - size_t *rowColIdxsBeg = getColIdxs(rowIdx); - size_t *rowColIdxsEnd = getColIdxs(rowIdx + 1); + size_t *rowColIdxsBeg = getColIdxsOfRow(rowIdx); + size_t *rowColIdxsEnd = getColIdxsOfRow(rowIdx + 1); const size_t *ptrExpected = std::lower_bound(rowColIdxsBeg, rowColIdxsEnd, colIdx); const size_t posExpected = ptrExpected - rowColIdxsBeg; const size_t posEnd = colIdxs.get() + rowOffsets.get()[numRowsAllocated] - rowColIdxsBeg; - ValueType *rowValuesBeg = getValues(rowIdx); + ValueType *rowValuesBeg = getRowValues(rowIdx); if (ptrExpected == rowColIdxsEnd || *ptrExpected != colIdx) { // No entry for the given coordinates present. @@ -306,7 +323,10 @@ template class CSRMatrix : public Matrix { void finishAppend() override { fillNextPosUntil(rowOffsets.get()[lastAppendedRowIdx + 1], numRows - 1); } - bool isView() const { return (numRowsAllocated > numRows || isRowAllocatedBefore); } + [[nodiscard]] bool isView() const { + // return (numRowsAllocated > numRows || isRowAllocatedBefore); + return is_view; + } void printValue(std::ostream &os, ValueType val) const { switch (ValueTypeUtils::codeFor) { @@ -327,12 +347,12 @@ template class CSRMatrix : public Matrix { << std::endl; // Note that, in general, the values within one row might not be sorted // by column index. Thus, the following is a little complicated. - ValueType *oneRow = new ValueType[numCols]; + auto oneRow = new ValueType[numCols]; for (size_t r = 0; r < numRows; r++) { memset(oneRow, 0, numCols * sizeof(ValueType)); const size_t rowNumNonZeros = getNumNonZeros(r); - const size_t *rowColIdxs = getColIdxs(r); - const ValueType *rowValues = getValues(r); + const size_t *rowColIdxs = getColIdxsOfRow(r); + const ValueType *rowValues = getRowValues(r); for (size_t i = 0; i < rowNumNonZeros; i++) oneRow[rowColIdxs[i]] = rowValues[i]; for (size_t c = 0; c < numCols; c++) { @@ -402,11 +422,10 @@ template class CSRMatrix : public Matrix { if (numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) return false; - const ValueType *valuesBegLhs = this->getValues(0); - const ValueType *valuesEndLhs = this->getValues(numRows); - const ValueType *valuesBegRhs = rhs.getValues(0); - const ValueType *valuesEndRhs = rhs.getValues(numRows); - + const ValueType *valuesBegLhs = this->getRowValues(0); + const ValueType *valuesEndLhs = this->getRowValues(numRows); + const ValueType *valuesBegRhs = rhs.getRowValues(0); + const ValueType *valuesEndRhs = rhs.getRowValues(numRows); const size_t nnzLhs = valuesEndLhs - valuesBegLhs; const size_t nnzRhs = valuesEndRhs - valuesBegRhs; @@ -421,7 +440,7 @@ template class CSRMatrix : public Matrix { const size_t *colIdxsBegRhs = rhs.getColIdxs(0); if (colIdxsBegLhs != colIdxsBegRhs) - if (memcmp(colIdxsBegLhs, colIdxsBegRhs, nnzLhs * sizeof(size_t))) + if (memcmp(colIdxsBegLhs, colIdxsBegRhs, nnzLhs * sizeof(size_t)) != 0) return false; return true; diff --git a/src/runtime/local/datastructures/DataPlacement.h b/src/runtime/local/datastructures/DataPlacement.h index d99a96879..8f6352add 100644 --- a/src/runtime/local/datastructures/DataPlacement.h +++ b/src/runtime/local/datastructures/DataPlacement.h @@ -23,20 +23,32 @@ #include /** - * The DataPlacement struct binds an allocation descriptor to a range + * The DataPlacement struct binds allocation descriptors to a range * description and stores an ID of an instantiated object. + * + * The number of allocations varys depending on the data type (1 allocation of values of a dense matrix + * three allocations for values/col_idxs/row_ptrs of a CSR matrix) */ -struct DataPlacement { +class DataPlacement { size_t dp_id; // used to generate object IDs static std::atomic_size_t instance_count; - std::unique_ptr allocation{}; + std::vector> allocations{}; std::unique_ptr range{}; + public: DataPlacement() = delete; - DataPlacement(std::unique_ptr _a, std::unique_ptr _r) - : dp_id(instance_count++), allocation(std::move(_a)), range(std::move(_r)) {} + DataPlacement(std::vector> _a, std::unique_ptr _r) + : dp_id(instance_count++), allocations(std::move(_a)), range(std::move(_r)) {} + + [[nodiscard]] size_t getID() const { return dp_id; } + std::string getLocation() { return allocations.front()->getLocation(); } + Range *getRange() { return range.get(); } + void setRange(Range *r) { range.reset(r); } + void setRange(std::unique_ptr r) { range = std::move(r); } + uint32_t getNumAllocations() { return allocations.size(); } + IAllocationDescriptor *getAllocation(uint32_t num) { return allocations[num].get(); } }; diff --git a/src/runtime/local/datastructures/DenseMatrix.cpp b/src/runtime/local/datastructures/DenseMatrix.cpp index 463a5ca0c..6177a8304 100644 --- a/src/runtime/local/datastructures/DenseMatrix.cpp +++ b/src/runtime/local/datastructures/DenseMatrix.cpp @@ -68,172 +68,74 @@ template DenseMatrix::DenseMatrix(size_t maxNumRows, size_t numCols, bool zero, IAllocationDescriptor *allocInfo) : Matrix(maxNumRows, numCols), is_view(false), rowSkip(numCols), bufferSize(numRows * numCols * sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - DataPlacement *new_data_placement; - if (allocInfo != nullptr) { - spdlog::debug("Creating {} x {} dense matrix of type: {}. Required memory: {} Mb", numRows, numCols, - static_cast(allocInfo->getType()), static_cast(getBufferSize()) / (1048576)); - - new_data_placement = this->mdo->addDataPlacement(allocInfo); - new_data_placement->allocation->createAllocation(getBufferSize(), zero); + std::unique_ptr val_alloc; + if (!allocInfo) { + alloc_shared_values(zero); + auto bytes = std::reinterpret_pointer_cast(values); + val_alloc = AllocationDescriptorHost::createHostAllocation(bytes, getBufferSize(), zero); } else { - AllocationDescriptorHost myHostAllocInfo; - alloc_shared_values(); + val_alloc = allocInfo->createAllocation(getBufferSize(), zero); - if (zero) - std::fill(values.get(), values.get() + maxNumRows * numCols, ValueTypeUtils::defaultValue); - new_data_placement = this->mdo->addDataPlacement(&myHostAllocInfo); + // ToDo: refactor data storage into memory management + if (allocInfo->getType() == ALLOCATION_TYPE::HOST) { + alloc_shared_values(zero); + + auto bytes = std::reinterpret_pointer_cast(values); + dynamic_cast(val_alloc.get())->setData(bytes); + } } - this->mdo->addLatest(new_data_placement->dp_id); + spdlog::debug("Creating {} x {} dense matrix of type: {}. Required memory: {} Mb", numRows, numCols, + val_alloc->getTypeName(), static_cast(getBufferSize()) / (1048576)); + + std::vector> allocations; + allocations.emplace_back(std::move(val_alloc)); + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); } template DenseMatrix::DenseMatrix(size_t numRows, size_t numCols, std::shared_ptr &values) : Matrix(numRows, numCols), is_view(false), rowSkip(numCols), values(values), bufferSize(numRows * numCols * sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - AllocationDescriptorHost myHostAllocInfo; - DataPlacement *new_data_placement = this->mdo->addDataPlacement(&myHostAllocInfo); - this->mdo->addLatest(new_data_placement->dp_id); + auto bytes = std::reinterpret_pointer_cast(values); + + std::vector> allocations; + auto vec_alloc = AllocationDescriptorHost::createHostAllocation(bytes, getBufferSize(), false); + allocations.emplace_back(std::move(vec_alloc)); + + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); } template DenseMatrix::DenseMatrix(const DenseMatrix *src, int64_t rowLowerIncl, int64_t rowUpperExcl, int64_t colLowerIncl, int64_t colUpperExcl) - : Matrix(rowUpperExcl - rowLowerIncl, colUpperExcl - colLowerIncl), is_view(true), + : Matrix(rowUpperExcl - rowLowerIncl, colUpperExcl - colLowerIncl), is_view(true), rowSkip(src->rowSkip), bufferSize(numRows * numCols * sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { validateArgs(src, rowLowerIncl, rowUpperExcl, colLowerIncl, colUpperExcl); - this->row_offset = rowLowerIncl; - this->col_offset = colLowerIncl; - rowSkip = src->rowSkip; + this->row_offset = isView() ? src->row_offset + rowLowerIncl : rowLowerIncl; + this->col_offset = isView() ? src->col_offset + colLowerIncl : colLowerIncl; // ToDo: manage host mem (values) in a data placement if (src->values) { - alloc_shared_values(src->values, offset()); bufferSize = numRows * rowSkip * sizeof(ValueType); + this->values = src->values; } - this->clone_mdo(src); + this->mdo = src->mdo; } template DenseMatrix::DenseMatrix(size_t numRows, size_t numCols, const DenseMatrix *src) - : Matrix(numRows, numCols), is_view(false), rowSkip(numCols), + : Matrix(numRows, numCols), is_view(src->is_view), rowSkip(src->getRowSkip()), bufferSize(numRows * numCols * sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - if (src->values) - values = src->values; - this->clone_mdo(src); -} + if (!is_view) + rowSkip = numCols; -template -auto DenseMatrix::getValuesInternal(const IAllocationDescriptor *alloc_desc, - const Range *range) -> std::tuple { - // If no range information is provided we assume the full range that this - // matrix covers - if (range == nullptr || *range == Range(*this)) { - if (alloc_desc) { - auto ret = this->mdo->findDataPlacementByType(alloc_desc, range); - if (!ret) { - // find other allocation type X (preferably host allocation) to - // transfer from in latest_version - - // tuple content: - std::tuple result = std::make_tuple(false, 0, nullptr); - auto latest = this->mdo->getLatest(); - DataPlacement *placement; - for (auto &placement_id : latest) { - placement = this->mdo->getDataPlacementByID(placement_id); - if (placement->range == nullptr || - *(placement->range) == Range{0, 0, this->getNumRows(), this->getNumCols()}) { - std::get<0>(result) = true; - std::get<1>(result) = placement->dp_id; - // prefer host allocation - if (placement->allocation->getType() == ALLOCATION_TYPE::HOST) { - std::get<2>(result) = reinterpret_cast(values.get()); - break; - } - } - } - - // if we found a data placement that is not in host memory, - // transfer it there before returning - if (std::get<0>(result) == true && std::get<2>(result) == nullptr) { - AllocationDescriptorHost myHostAllocInfo; - if (!values) - this->alloc_shared_values(); - this->mdo->addDataPlacement(&myHostAllocInfo); - placement->allocation->transferFrom(reinterpret_cast(startAddress()), getBufferSize()); - std::get<2>(result) = startAddress(); - } - - // create new data placement - auto new_data_placement = const_cast *>(this)->mdo->addDataPlacement(alloc_desc); - new_data_placement->allocation->createAllocation(getBufferSize(), false); - - // transfer to requested data placement - new_data_placement->allocation->transferTo(reinterpret_cast(startAddress()), - getBufferSize()); - return std::make_tuple(false, new_data_placement->dp_id, - reinterpret_cast(new_data_placement->allocation->getData().get())); - } else { - bool latest = this->mdo->isLatestVersion(ret->dp_id); - if (!latest) { - ret->allocation->transferTo(reinterpret_cast(startAddress()), getBufferSize()); - } - return std::make_tuple(latest, ret->dp_id, - reinterpret_cast(ret->allocation->getData().get())); - } - } else { - // if no alloc info was provided we try to get/create a full host - // allocation and return that - std::tuple result = std::make_tuple(false, 0, nullptr); - auto latest = this->mdo->getLatest(); - DataPlacement *placement; - for (auto &placement_id : latest) { - placement = this->mdo->getDataPlacementByID(placement_id); - - // only consider allocations covering full range of matrix - if (placement->range == nullptr || - *(placement->range) == - Range{this->row_offset, this->col_offset, this->getNumRows(), this->getNumCols()}) { - std::get<0>(result) = true; - std::get<1>(result) = placement->dp_id; - // prefer host allocation - if (placement->allocation->getType() == ALLOCATION_TYPE::HOST) { - std::get<2>(result) = reinterpret_cast(startAddress()); - break; - } - } - } - - // if we found a data placement that is not in host memory, transfer - // it there before returning - if (std::get<0>(result) == true && std::get<2>(result) == nullptr) { - AllocationDescriptorHost myHostAllocInfo; - - // ToDo: this needs fixing in the context of matrix views - if (!values) - const_cast *>(this)->alloc_shared_values(); - - this->mdo->addDataPlacement(&myHostAllocInfo); - // std::cout << "bufferSize: " << getBufferSize() - // << " RxRS: " << this->getNumRows() * - // this->getRowSkip() * sizeof(ValueType) << - // std::endl; if(getBufferSize() != - // this->getNumRows() * this->getRowSkip()) { - // placement->allocation->transferFrom(reinterpret_cast(startAddress()), this->getNumRows() * - // this->getRowSkip() * sizeof(ValueType)); - // } - // else - placement->allocation->transferFrom(reinterpret_cast(startAddress()), getBufferSize()); - std::get<2>(result) = startAddress(); - } - if (std::get<2>(result) == nullptr) - throw std::runtime_error("No object meta data in matrix"); - else - return result; - } - } else - throw std::runtime_error("Range support under construction"); + this->row_offset = src->row_offset; + this->col_offset = src->col_offset; + this->values = src->values; + this->mdo = src->mdo; } template void DenseMatrix::printValue(std::ostream &os, ValueType val) const { @@ -251,14 +153,50 @@ template <> [[maybe_unused]] void DenseMatrix::printValue(std::ostream & } template -void DenseMatrix::alloc_shared_values(std::shared_ptr src, size_t offset) { +void DenseMatrix::alloc_shared_values(bool zero, std::shared_ptr src, size_t offset) { // correct since C++17: Calls delete[] instead of simple delete if (src) { values = std::shared_ptr(src, src.get() + offset); - } else - // values = std::shared_ptr(new - // ValueType[numRows*numCols]); - values = std::shared_ptr(new ValueType[numRows * getRowSkip()]); + } else { + values = std::shared_ptr(new ValueType[getBufferSize()]); + if (zero) + std::fill(values.get(), values.get() + this->getNumRows() * this->getNumCols(), + ValueTypeUtils::defaultValue); + } +} + +template bool DenseMatrix::operator==(const DenseMatrix &rhs) const { + // Note that we do not use the generic `get` interface to matrices here since + // this operator is meant to be used for writing tests for, besides others, + // those generic interfaces. + + if (this == &rhs) + return true; + + const size_t numRows = this->getNumRows(); + const size_t numCols = this->getNumCols(); + + if (numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) + return false; + + const ValueType *valuesLhs = this->getValues(); + const ValueType *valuesRhs = rhs.getValues(); + + const size_t rowSkipLhs = this->getRowSkip(); + const size_t rowSkipRhs = rhs.getRowSkip(); + + if (valuesLhs == valuesRhs && rowSkipLhs == rowSkipRhs) + return true; + + for (size_t r = 0; r < numRows; ++r) { + for (size_t c = 0; c < numCols; ++c) { + if (*(valuesLhs + c) != *(valuesRhs + c)) + return false; + } + valuesLhs += rowSkipLhs; + valuesRhs += rowSkipRhs; + } + return true; } template size_t DenseMatrix::serialize(std::vector &buf) const { @@ -269,6 +207,40 @@ template <> size_t DenseMatrix::serialize(std::vector &buf) const { throw std::runtime_error("DenseMatrix serialization not implemented"); } +template +const ValueType *DenseMatrix::getValues(const IAllocationDescriptor *alloc_desc, const Range *range) const { + const ValueType *ptr; + if (this->isPinned(alloc_desc)) { + ptr = reinterpret_cast(this->pinned_mem); + } else { + this->pinned_mem = this->mdo->getData(0, alloc_desc, range); + ptr = reinterpret_cast(this->pinned_mem); + const_cast *>(this)->pin(alloc_desc); + } + + if (isView()) + return ptr + offset(); + else + return ptr; +} + +template +ValueType *DenseMatrix::getValues(const IAllocationDescriptor *alloc_desc, const Range *range) { + ValueType *ptr; + if (this->isPinned(alloc_desc)) { + ptr = reinterpret_cast(this->pinned_mem); + } else { + this->pinned_mem = this->mdo->getData(0, alloc_desc, range); + ptr = reinterpret_cast(this->pinned_mem); + this->pin(alloc_desc); + } + + if (isView()) + return ptr + offset(); + else + return ptr; +} + // ---------------------------------------------------------------------------- // const char* specialization DenseMatrix::DenseMatrix(size_t maxNumRows, size_t numCols, bool zero, size_t strBufferCapacity_, diff --git a/src/runtime/local/datastructures/DenseMatrix.h b/src/runtime/local/datastructures/DenseMatrix.h index ec4c9bd05..19a2f7d08 100644 --- a/src/runtime/local/datastructures/DenseMatrix.h +++ b/src/runtime/local/datastructures/DenseMatrix.h @@ -101,12 +101,13 @@ template class DenseMatrix : public Matrix { : DenseMatrix(src, rowLowerIncl, rowUpperExcl, 0, src->numCols){}; /** - * @brief Creates a `DenseMatrix` around an existing array of values without + * @brief Creates a `DenseMatrix` around an existing `DenseMatrix` without * copying the data. + * This is used in transpose and reshape operations for example; * * @param numRows The exact number of rows. * @param numCols The exact number of columns. - * @param values A `std::shared_ptr` to an existing array of values. + * @param src A source matrix */ DenseMatrix(size_t numRows, size_t numCols, const DenseMatrix *src); @@ -126,12 +127,11 @@ template class DenseMatrix : public Matrix { const size_t endPosExcl = pos(rowIdx, colIdx); if (startPosIncl < endPosExcl) - std::fill(values.get() + startPosIncl, values.get() + endPosExcl, + std::fill(getValues() + startPosIncl, values.get() + endPosExcl, ValueTypeUtils::defaultValue); } else { - auto v = values.get() + lastAppendedRowIdx * rowSkip; + auto v = getValues() + lastAppendedRowIdx * rowSkip; std::fill(v + lastAppendedColIdx + 1, v + numCols, ValueTypeUtils::defaultValue); - v += rowSkip; for (size_t r = lastAppendedRowIdx + 1; r < rowIdx; r++) { @@ -145,42 +145,10 @@ template class DenseMatrix : public Matrix { void printValue(std::ostream &os, ValueType val) const; - void alloc_shared_values(std::shared_ptr src = nullptr, size_t offset = 0); - - /** - * @brief The getValuesInternal method fetches a pointer to an allocation of - * values. Optionally a sub range can be specified. - * - * This method is called by the public getValues() methods either in const - * or non-const fashion. The const version returns a data pointer that is - * meant to be read-only. Read only access updates the list of up-to-date - * allocations (the latest_versions "list"). This way several copies of the - * data in various allocations can be kept without invalidating their copy. - * If the read write version of getValues() is used, the latest_versions - * list is cleared because a write to an allocation is assumed, which - * renders the other allocations of the same data out of sync. - * - * @param alloc_desc An instance of an IAllocationDescriptor derived class - * that is used to specify what type of allocation is requested. If no - * allocation descriptor is provided, a host allocation (plain main memory) - * is assumed by default. - * - * @param range An optional range describing which rows and columns of a - * matrix-like structure are requested. By default this is null and means - * all rows and columns. - * @return A tuple of three values is returned: - * 1: bool - is the returend allocation in the latest_versions list - * 2: size_t - the ID of the data placement (a structure relating an - * allocation to a range) 3: ValueType* - the pointer to the actual data - */ - - auto getValuesInternal(const IAllocationDescriptor *alloc_desc = nullptr, - const Range *range = nullptr) -> std::tuple; + void alloc_shared_values(bool zero = false, std::shared_ptr src = nullptr, size_t offset = 0); [[nodiscard]] size_t offset() const { return this->row_offset * rowSkip + this->col_offset; } - ValueType *startAddress() const { return isPartialBuffer() ? values.get() + offset() : values.get(); } - public: template using WithValueType = DenseMatrix; @@ -218,12 +186,7 @@ template class DenseMatrix : public Matrix { * of a data structure. * @return A pointer to the data in the requested memory space */ - const ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const { - auto [isLatest, id, ptr] = const_cast *>(this)->getValuesInternal(alloc_desc, range); - if (!isLatest) - this->mdo->addLatest(id); - return ptr; - } + const ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) const; /** * @brief Fetch a pointer to the data held by this structure meant for @@ -237,21 +200,17 @@ template class DenseMatrix : public Matrix { * memory is requested (e.g. main memory in the current system, memory in an * accelerator card or memory in another host) * + * * @param range A Range object describing optionally requesting a sub range * of a data structure. * @return A pointer to the data in the requested memory space */ - ValueType *getValues(IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr) { - auto [isLatest, id, ptr] = const_cast *>(this)->getValuesInternal(alloc_desc, range); - if (!isLatest) - this->mdo->setLatest(id); - return ptr; - } + ValueType *getValues(const IAllocationDescriptor *alloc_desc = nullptr, const Range *range = nullptr); std::shared_ptr getValuesSharedPtr() const { return values; } - ValueType get(size_t rowIdx, size_t colIdx) const override { - return getValues()[pos(rowIdx, colIdx, isPartialBuffer())]; + auto position = pos(rowIdx, colIdx, isPartialBuffer()); + return getValues()[position]; } void set(size_t rowIdx, size_t colIdx, ValueType value) override { @@ -262,7 +221,7 @@ template class DenseMatrix : public Matrix { void prepareAppend() override { // The matrix might be empty. if (numRows != 0 && numCols != 0) - values.get()[0] = ValueType(ValueTypeUtils::defaultValue); + getValues()[0] = ValueType(ValueTypeUtils::defaultValue); lastAppendedRowIdx = 0; lastAppendedColIdx = 0; } @@ -271,7 +230,7 @@ template class DenseMatrix : public Matrix { // Set all cells since the last one that was appended to zero. fillZeroUntil(rowIdx, colIdx); // Set the specified cell. - values.get()[pos(rowIdx, colIdx)] = value; + getValues()[pos(rowIdx, colIdx)] = value; // Update append state. lastAppendedRowIdx = rowIdx; lastAppendedColIdx = colIdx; @@ -308,39 +267,7 @@ template class DenseMatrix : public Matrix { [[nodiscard]] size_t getBufferSize() const { return bufferSize; } - bool operator==(const DenseMatrix &rhs) const { - // Note that we do not use the generic `get` interface to matrices here - // since this operator is meant to be used for writing tests for, - // besides others, those generic interfaces. - - if (this == &rhs) - return true; - - const size_t numRows = this->getNumRows(); - const size_t numCols = this->getNumCols(); - - if (numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) - return false; - - const ValueType *valuesLhs = this->getValues(); - const ValueType *valuesRhs = rhs.getValues(); - - const size_t rowSkipLhs = this->getRowSkip(); - const size_t rowSkipRhs = rhs.getRowSkip(); - - if (valuesLhs == valuesRhs && rowSkipLhs == rowSkipRhs) - return true; - - for (size_t r = 0; r < numRows; ++r) { - for (size_t c = 0; c < numCols; ++c) { - if (*(valuesLhs + c) != *(valuesRhs + c)) - return false; - } - valuesLhs += rowSkipLhs; - valuesRhs += rowSkipRhs; - } - return true; - } + bool operator==(const DenseMatrix &rhs) const; size_t serialize(std::vector &buf) const override; }; diff --git a/src/runtime/local/datastructures/IAllocationDescriptor.h b/src/runtime/local/datastructures/IAllocationDescriptor.h index eb17eb8f8..fce62c950 100644 --- a/src/runtime/local/datastructures/IAllocationDescriptor.h +++ b/src/runtime/local/datastructures/IAllocationDescriptor.h @@ -17,6 +17,7 @@ #pragma once #include +#include // An alphabetically sorted wishlist of supported allocation types ;-) // Supporting all of that is probably unmaintainable :-/ @@ -36,6 +37,11 @@ enum class ALLOCATION_TYPE { NUM_ALLOC_TYPES }; +// string representation of enum +static std::string_view allocation_types[] = { + "DIST_GRPC", "DIST_GRPC_ASYNC", "DIST_GRPC_SYNC", "DIST_MPI", "DIST_SPARK", "GPU_CUDA", "GPU_HIP", + "HOST", "HOST_PINNED_CUDA", "FPGA_INT", "FPGA_XLX", "ONEAPI", "NUM_ALLOC_TYPES"}; + /** * @brief The IAllocationDescriptor interface class describes an abstract * interface to handle memory allocations @@ -50,14 +56,20 @@ enum class ALLOCATION_TYPE { * by the allocator. */ class IAllocationDescriptor { + protected: + size_t size = 0; + public: virtual ~IAllocationDescriptor() = default; [[nodiscard]] virtual ALLOCATION_TYPE getType() const = 0; - virtual void createAllocation(size_t size, bool zero) = 0; + [[nodiscard]] virtual std::unique_ptr createAllocation(size_t size, bool zero) const = 0; [[nodiscard]] virtual std::string getLocation() const = 0; virtual std::shared_ptr getData() = 0; virtual void transferTo(std::byte *src, size_t size) = 0; virtual void transferFrom(std::byte *dst, size_t size) = 0; [[nodiscard]] virtual std::unique_ptr clone() const = 0; virtual bool operator==(const IAllocationDescriptor *other) const { return (getType() == other->getType()); } + [[nodiscard]] virtual size_t getSize() const { return size; }; + + virtual std::string_view getTypeName() const { return allocation_types[static_cast(getType())]; } }; diff --git a/src/runtime/local/datastructures/MetaDataObject.cpp b/src/runtime/local/datastructures/MetaDataObject.cpp index 5f8b36028..ea8131ba2 100644 --- a/src/runtime/local/datastructures/MetaDataObject.cpp +++ b/src/runtime/local/datastructures/MetaDataObject.cpp @@ -17,21 +17,35 @@ #include "MetaDataObject.h" #include "DataPlacement.h" -DataPlacement *MetaDataObject::addDataPlacement(const IAllocationDescriptor *allocInfo, Range *r) { - data_placements[static_cast(allocInfo->getType())].emplace_back( - std::make_unique(allocInfo->clone(), r == nullptr ? nullptr : r->clone())); - return data_placements[static_cast(allocInfo->getType())].back().get(); +#include + +DataPlacement *MetaDataObject::addDataPlacement(std::vector> &allocInfos, + Range *r) { + if (r) { + auto type = static_cast(allocInfos.front()->getType()); + range_data_placements[type].emplace_back(std::make_unique(std::move(allocInfos), r->clone())); + return range_data_placements[type].back().get(); + } else { + auto type = static_cast(allocInfos.front()->getType()); + data_placements[type] = std::make_unique(std::move(allocInfos), nullptr); + return data_placements[type].get(); + } } -auto MetaDataObject::getDataPlacementByType(ALLOCATION_TYPE type) const +auto MetaDataObject::getRangeDataPlacementByType(ALLOCATION_TYPE type) const -> const std::vector> * { - return &(data_placements[static_cast(type)]); + return &(range_data_placements[static_cast(type)]); +} + +auto MetaDataObject::getDataPlacementByType(ALLOCATION_TYPE type) const -> DataPlacement * { + return data_placements[static_cast(type)].get(); } DataPlacement *MetaDataObject::getDataPlacementByLocation(const std::string &location) const { - for (const auto &_omdType : data_placements) { + // ToDo: no range? + for (const auto &_omdType : range_data_placements) { for (auto &_omd : _omdType) { - if (_omd->allocation->getLocation() == location) + if (_omd->getLocation() == location) return const_cast(_omd.get()); } } @@ -39,10 +53,10 @@ DataPlacement *MetaDataObject::getDataPlacementByLocation(const std::string &loc } void MetaDataObject::updateRangeDataPlacementByID(size_t id, Range *r) { - for (auto &_omdType : data_placements) { + for (auto &_omdType : range_data_placements) { for (auto &_omd : _omdType) { - if (_omd->dp_id == id) { - _omd->range = r->clone(); + if (_omd->getID() == id) { + _omd->setRange(r->clone()); return; } } @@ -50,31 +64,19 @@ void MetaDataObject::updateRangeDataPlacementByID(size_t id, Range *r) { } DataPlacement *MetaDataObject::getDataPlacementByID(size_t id) const { - for (const auto &_omdType : data_placements) { - for (auto &_omd : _omdType) { - if (_omd->dp_id == id) - return const_cast(_omd.get()); - } + for (const auto &dp : data_placements) { + if (dp) + if (dp->getID() == id) + return const_cast(dp.get()); } - return nullptr; -} - -const DataPlacement *MetaDataObject::findDataPlacementByType(const IAllocationDescriptor *alloc_desc, - const Range *range) const { - auto res = getDataPlacementByType(alloc_desc->getType()); - if (res->empty()) - return nullptr; - else { - for (size_t i = 0; i < res->size(); ++i) { - if ((*res)[i]->allocation->operator==(alloc_desc)) { - if (((*res)[i]->range == nullptr && range == nullptr) || - ((*res)[i]->range != nullptr && (*res)[i]->range->operator==(range))) { - return (*res)[i].get(); - } - } + for (const auto &rdp_by_type : range_data_placements) { + for (auto &rdp : rdp_by_type) { + if (rdp) + if (rdp->getID() == id) + return const_cast(rdp.get()); } - return nullptr; } + return nullptr; } bool MetaDataObject::isLatestVersion(size_t placement) const { @@ -89,3 +91,70 @@ void MetaDataObject::setLatest(size_t id) { } auto MetaDataObject::getLatest() const -> std::vector { return latest_version; } + +DataPlacement *MetaDataObject::getDataPlacement(const IAllocationDescriptor *alloc_desc) { + auto dp = getDataPlacementByType(alloc_desc->getType()); + if (!dp) { + // find other allocation type X (preferably host allocation) to transfer from in latest_version + auto latest = getLatest(); + DataPlacement *placement = getDataPlacementByID(latest.front()); + + std::vector> allocations; + for (uint32_t i = 0; i < placement->getNumAllocations(); ++i) { + auto other_alloc = placement->getAllocation(i); + auto new_alloc = alloc_desc->createAllocation(other_alloc->getSize(), false); + + // transfer to requested data placement + // ToDo: this works for the HOST <-> CUDA use case for now + // ToDo: update *Matrix own shared ptrs when transferring back from other storage + if (new_alloc->getType() == ALLOCATION_TYPE::HOST) + other_alloc->transferFrom(new_alloc->getData().get(), new_alloc->getSize()); + else + new_alloc->transferTo(other_alloc->getData().get(), other_alloc->getSize()); + allocations.emplace_back(std::move(new_alloc)); + } + return addDataPlacement(allocations); + } else { + bool isLatest = isLatestVersion(dp->getID()); + if (!isLatest) { + auto latest = getDataPlacementByID(getLatest().front()); + for (uint32_t i = 0; i < latest->getNumAllocations(); ++i) { + auto other_alloc = latest->getAllocation(i); + dp->getAllocation(i)->transferTo(other_alloc->getData().get(), other_alloc->getSize()); + } + } + return dp; + } +} + +std::pair +MetaDataObject::getDataInternal(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, const Range *range) { + DataPlacement *dp; + if (!range) { + if (!alloc_desc) { + auto ad = AllocationDescriptorHost(); + dp = getDataPlacement(&ad); + } else + dp = getDataPlacement(alloc_desc); + + return std::make_pair(dp->getID(), dp->getAllocation(alloc_idx)->getData().get()); + } else + throw std::runtime_error("Range support under construction"); +} + +const std::byte *MetaDataObject::getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, + const Range *range) const { + const std::lock_guard lock(mtx); + + auto [id, ptr] = const_cast(this)->getDataInternal(alloc_idx, alloc_desc, range); + const_cast(this)->addLatest(id); + return ptr; +} + +std::byte *MetaDataObject::getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, const Range *range) { + const std::lock_guard lock(mtx); + + auto [id, ptr] = getDataInternal(alloc_idx, alloc_desc, range); + setLatest(id); + return ptr; +} \ No newline at end of file diff --git a/src/runtime/local/datastructures/MetaDataObject.h b/src/runtime/local/datastructures/MetaDataObject.h index 994b0ca73..5a8d3b591 100644 --- a/src/runtime/local/datastructures/MetaDataObject.h +++ b/src/runtime/local/datastructures/MetaDataObject.h @@ -38,20 +38,33 @@ struct DataPlacement; */ class MetaDataObject { std::array>, static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES)> - data_placements; + range_data_placements; + + std::array, static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES)> data_placements; + mutable std::mutex mtx{}; std::vector latest_version; + std::pair getDataInternal(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, + const Range *range); public: - DataPlacement *addDataPlacement(const IAllocationDescriptor *allocInfo, Range *r = nullptr); - const DataPlacement *findDataPlacementByType(const IAllocationDescriptor *alloc_desc, const Range *range) const; + DataPlacement *addDataPlacement(std::vector> &allocInfos, + Range *r = nullptr); [[nodiscard]] DataPlacement *getDataPlacementByID(size_t id) const; [[nodiscard]] DataPlacement *getDataPlacementByLocation(const std::string &location) const; [[nodiscard]] auto - getDataPlacementByType(ALLOCATION_TYPE type) const -> const std::vector> *; + getRangeDataPlacementByType(ALLOCATION_TYPE type) const -> const std::vector> *; + [[nodiscard]] auto getDataPlacementByType(ALLOCATION_TYPE type) const -> DataPlacement *; void updateRangeDataPlacementByID(size_t id, Range *r); + DataPlacement *getDataPlacement(const IAllocationDescriptor *alloc_desc); + [[nodiscard]] bool isLatestVersion(size_t placement) const; void addLatest(size_t id); void setLatest(size_t id); [[nodiscard]] auto getLatest() const -> std::vector; + + const std::byte *getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr) const; + std::byte *getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc = nullptr, + const Range *range = nullptr); }; diff --git a/src/runtime/local/datastructures/Structure.cpp b/src/runtime/local/datastructures/Structure.cpp index bdef9f240..36f6678f6 100644 --- a/src/runtime/local/datastructures/Structure.cpp +++ b/src/runtime/local/datastructures/Structure.cpp @@ -19,19 +19,12 @@ Structure::Structure(size_t numRows, size_t numCols) : refCounter(1), numRows(numRows), numCols(numCols) { mdo = std::make_shared(); -}; +} -void Structure::clone_mdo(const Structure *src) { - // FIXME: This clones the meta data to avoid locking (thread synchronization - // for data copy) - for (int i = 0; i < static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES); i++) { - auto placements = src->mdo->getDataPlacementByType(static_cast(i)); - for (auto it = placements->begin(); it != placements->end(); it++) { - auto src_alloc = it->get()->allocation.get(); - auto src_range = it->get()->range.get(); - auto new_data_placement = this->mdo->addDataPlacement(src_alloc, src_range); - if (src->mdo->isLatestVersion(it->get()->dp_id)) - this->mdo->addLatest(new_data_placement->dp_id); - } - } +bool Structure::isPinned(const IAllocationDescriptor *alloc_desc) const { + return alloc_desc ? pinned_allocation == alloc_desc->getType() : pinned_allocation == ALLOCATION_TYPE::HOST; +} + +void Structure::pin(const IAllocationDescriptor *alloc_desc) { + alloc_desc ? pinned_allocation = alloc_desc->getType() : pinned_allocation = ALLOCATION_TYPE::HOST; } \ No newline at end of file diff --git a/src/runtime/local/datastructures/Structure.h b/src/runtime/local/datastructures/Structure.h index 13f9a7cf5..42f6fc8da 100644 --- a/src/runtime/local/datastructures/Structure.h +++ b/src/runtime/local/datastructures/Structure.h @@ -37,12 +37,16 @@ class Structure { size_t col_offset{}; size_t numRows; size_t numCols; + mutable ALLOCATION_TYPE pinned_allocation{}; + mutable std::byte *pinned_mem{}; Structure(size_t numRows, size_t numCols); mutable std::shared_ptr mdo; - void clone_mdo(const Structure *src); + [[nodiscard]] bool isPinned(const IAllocationDescriptor *alloc_desc) const; + + void pin(const IAllocationDescriptor *alloc_desc); public: virtual ~Structure() = default; diff --git a/src/runtime/local/io/DaphneSerializer.h b/src/runtime/local/io/DaphneSerializer.h index ad43c8700..f7f4da72a 100644 --- a/src/runtime/local/io/DaphneSerializer.h +++ b/src/runtime/local/io/DaphneSerializer.h @@ -608,7 +608,7 @@ template struct DaphneSerializer, false> { if (chunkSize <= bufferIdx) return bufferIdx; - const size_t *colIdxs = arg->getColIdxs(0); + const size_t *colIdxs = arg->getColIdxsOfRow(0); if (serializeFromByte < serializationIdx + nzb * sizeof(size_t)) { size_t startOffset = serializeFromByte > serializationIdx ? serializeFromByte - serializationIdx : 0; size_t bytesToCopy = 0; @@ -630,7 +630,7 @@ template struct DaphneSerializer, false> { if (chunkSize <= bufferIdx) return bufferIdx; - const VT *vals = arg->getValues(0); + const VT *vals = arg->getRowValues(0); if (serializeFromByte < serializationIdx + nzb * sizeof(VT)) { size_t startOffset = serializeFromByte > serializationIdx ? serializeFromByte - serializationIdx : 0; size_t bytesToCopy = 0; diff --git a/src/runtime/local/kernels/AggAll.h b/src/runtime/local/kernels/AggAll.h index 14b375057..924066631 100644 --- a/src/runtime/local/kernels/AggAll.h +++ b/src/runtime/local/kernels/AggAll.h @@ -135,14 +135,14 @@ template struct AggAll> EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(opCode)); - return aggArray(arg->getValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), func, + return aggArray(arg->getRowValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), func, AggOpCodeUtils::isSparseSafe(opCode), AggOpCodeUtils::template getNeutral(opCode), ctx); } else { // The op-code is either MEAN or STDDEV or VAR. EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); - auto agg = aggArray(arg->getValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), func, - true, VTRes(0), ctx); + auto agg = aggArray(arg->getRowValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), + func, true, VTRes(0), ctx); agg = agg / (arg->getNumRows() * arg->getNumCols()); if (opCode == AggOpCode::MEAN) return agg; diff --git a/src/runtime/local/kernels/AggCol.h b/src/runtime/local/kernels/AggCol.h index f096c0c94..290181436 100644 --- a/src/runtime/local/kernels/AggCol.h +++ b/src/runtime/local/kernels/AggCol.h @@ -202,9 +202,8 @@ template struct AggCol, CSRM // for MEAN and STDDDEV, we need to sum func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); - const VTArg *valuesArg = arg->getValues(0); - const size_t *colIdxsArg = arg->getColIdxs(0); - + const VTArg *valuesArg = arg->getRowValues(0); + const size_t *colIdxsArg = arg->getColIdxsOfRow(0); const size_t numNonZeros = arg->getNumNonZeros(); if (AggOpCodeUtils::isSparseSafe(opCode)) { diff --git a/src/runtime/local/kernels/AggCum.h b/src/runtime/local/kernels/AggCum.h index 3481511e8..59647befc 100644 --- a/src/runtime/local/kernels/AggCum.h +++ b/src/runtime/local/kernels/AggCum.h @@ -66,18 +66,21 @@ template struct AggCum, Dens EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(opCode)); + auto arg_inc = arg->isView() ? arg->getNumCols() : arg->getRowSkip(); + auto res_inc = res->isView() ? res->getNumCols() : res->getRowSkip(); + // First row: copy from arg to res. for (size_t c = 0; c < numCols; c++) valuesResCur[c] = valuesArg[c]; - valuesArg += arg->getRowSkip(); - valuesResCur += res->getRowSkip(); + valuesArg += arg_inc; + valuesResCur += res_inc; // Remaining rows: calculate from previous res row and current arg row. for (size_t r = 1; r < numRows; r++) { for (size_t c = 0; c < numCols; c++) valuesResCur[c] = func(valuesResPrv[c], valuesArg[c], ctx); - valuesArg += arg->getRowSkip(); - valuesResPrv += res->getRowSkip(); - valuesResCur += res->getRowSkip(); + valuesArg += arg_inc; + valuesResPrv += res_inc; + valuesResCur += res_inc; } } }; diff --git a/src/runtime/local/kernels/AggRow.h b/src/runtime/local/kernels/AggRow.h index 8b3da82ce..4c2fd28d7 100644 --- a/src/runtime/local/kernels/AggRow.h +++ b/src/runtime/local/kernels/AggRow.h @@ -183,7 +183,7 @@ template struct AggRow, CSRM const VTRes neutral = AggOpCodeUtils::template getNeutral(opCode); for (size_t r = 0; r < numRows; r++) { - *valuesRes = AggAll>::aggArray(arg->getValues(r), arg->getNumNonZeros(r), + *valuesRes = AggAll>::aggArray(arg->getRowValues(r), arg->getNumNonZeros(r), numCols, func, isSparseSafe, neutral, ctx); valuesRes += res->getRowSkip(); } @@ -197,7 +197,7 @@ template struct AggRow, CSRM EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); for (size_t r = 0; r < numRows; r++) { - *valuesRes = AggAll>::aggArray(arg->getValues(r), arg->getNumNonZeros(r), + *valuesRes = AggAll>::aggArray(arg->getRowValues(r), arg->getNumNonZeros(r), numCols, func, isSparseSafe, neutral, ctx); const VTArg *valuesArg = arg->getValues(0); const size_t numNonZeros = arg->getNumNonZeros(r); diff --git a/src/runtime/local/kernels/CMakeLists.txt b/src/runtime/local/kernels/CMakeLists.txt index 2fa6a2cc6..50c3a74dc 100644 --- a/src/runtime/local/kernels/CMakeLists.txt +++ b/src/runtime/local/kernels/CMakeLists.txt @@ -108,6 +108,7 @@ set(SOURCES_cpp_kernels ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/MTWrapper_sparse.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/Tasks.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/WorkerCPU.h + ) # The library of pre-compiled kernels. Will be linked into the JIT-compiled user program. add_library(KernelObjLib OBJECT ${SOURCES_cpp_kernels} ${HEADERS_cpp_kernels}) diff --git a/src/runtime/local/kernels/CheckEqApprox.h b/src/runtime/local/kernels/CheckEqApprox.h index 72faf166b..4d6d020a7 100644 --- a/src/runtime/local/kernels/CheckEqApprox.h +++ b/src/runtime/local/kernels/CheckEqApprox.h @@ -137,8 +137,8 @@ template struct CheckEqApprox> { return false; for (size_t r = 0; r < numRows; r++) { - const VT *valuesLhs = lhs->getValues(r); - const VT *valuesRhs = rhs->getValues(r); + const VT *valuesLhs = lhs->getRowValues(r); + const VT *valuesRhs = rhs->getRowValues(r); const size_t nnzElementsLhs = lhs->getNumNonZeros(r); const size_t nnzElementsRhs = rhs->getNumNonZeros(r); if (nnzElementsLhs != nnzElementsRhs) diff --git a/src/runtime/local/kernels/DiagMatrix.h b/src/runtime/local/kernels/DiagMatrix.h index 489c4190d..581f35416 100644 --- a/src/runtime/local/kernels/DiagMatrix.h +++ b/src/runtime/local/kernels/DiagMatrix.h @@ -133,7 +133,7 @@ template struct DiagMatrix, CSRMatrix> { for (size_t r = 0, pos = 0; r < numRowsCols; r++) { if (arg->getNumNonZeros(r)) { - valuesRes[pos] = *(arg->getValues(r)); + valuesRes[pos] = *(arg->getRowValues(r)); colIdxsRes[pos++] = r; } rowOffsetsRes[r + 1] = pos; diff --git a/src/runtime/local/kernels/EwBinaryMat.h b/src/runtime/local/kernels/EwBinaryMat.h index 5330c71d7..331791523 100644 --- a/src/runtime/local/kernels/EwBinaryMat.h +++ b/src/runtime/local/kernels/EwBinaryMat.h @@ -148,12 +148,12 @@ template struct EwBinaryMat, CSRMatrix, CSRMatri size_t nnzRowRhs = rhs->getNumNonZeros(rowIdx); if (nnzRowLhs && nnzRowRhs) { // merge within row - const VT *valuesRowLhs = lhs->getValues(rowIdx); - const VT *valuesRowRhs = rhs->getValues(rowIdx); - VT *valuesRowRes = res->getValues(rowIdx); - const size_t *colIdxsRowLhs = lhs->getColIdxs(rowIdx); - const size_t *colIdxsRowRhs = rhs->getColIdxs(rowIdx); - size_t *colIdxsRowRes = res->getColIdxs(rowIdx); + const VT *valuesRowLhs = lhs->getRowValues(rowIdx); + const VT *valuesRowRhs = rhs->getRowValues(rowIdx); + VT *valuesRowRes = res->getRowValues(rowIdx); + const size_t *colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + const size_t *colIdxsRowRhs = rhs->getColIdxsOfRow(rowIdx); + size_t *colIdxsRowRes = res->getColIdxsOfRow(rowIdx); size_t posLhs = 0; size_t posRhs = 0; size_t posRes = 0; @@ -191,13 +191,13 @@ template struct EwBinaryMat, CSRMatrix, CSRMatri rowOffsetsRes[rowIdx + 1] = rowOffsetsRes[rowIdx] + posRes + restRowLhs + restRowRhs; } else if (nnzRowLhs) { // copy from left - memcpy(res->getValues(rowIdx), lhs->getValues(rowIdx), nnzRowLhs * sizeof(VT)); - memcpy(res->getColIdxs(rowIdx), lhs->getColIdxs(rowIdx), nnzRowLhs * sizeof(size_t)); + memcpy(res->getRowValues(rowIdx), lhs->getRowValues(rowIdx), nnzRowLhs * sizeof(VT)); + memcpy(res->getColIdxsOfRow(rowIdx), lhs->getColIdxsOfRow(rowIdx), nnzRowLhs * sizeof(size_t)); rowOffsetsRes[rowIdx + 1] = rowOffsetsRes[rowIdx] + nnzRowLhs; } else if (nnzRowRhs) { // copy from right - memcpy(res->getValues(rowIdx), rhs->getValues(rowIdx), nnzRowRhs * sizeof(VT)); - memcpy(res->getColIdxs(rowIdx), rhs->getColIdxs(rowIdx), nnzRowRhs * sizeof(size_t)); + memcpy(res->getRowValues(rowIdx), rhs->getRowValues(rowIdx), nnzRowRhs * sizeof(VT)); + memcpy(res->getColIdxsOfRow(rowIdx), rhs->getColIdxsOfRow(rowIdx), nnzRowRhs * sizeof(size_t)); rowOffsetsRes[rowIdx + 1] = rowOffsetsRes[rowIdx] + nnzRowRhs; } else // empty row in result @@ -211,12 +211,12 @@ template struct EwBinaryMat, CSRMatrix, CSRMatri size_t nnzRowRhs = rhs->getNumNonZeros(rowIdx); if (nnzRowLhs && nnzRowRhs) { // intersect within row - const VT *valuesRowLhs = lhs->getValues(rowIdx); - const VT *valuesRowRhs = rhs->getValues(rowIdx); - VT *valuesRowRes = res->getValues(rowIdx); - const size_t *colIdxsRowLhs = lhs->getColIdxs(rowIdx); - const size_t *colIdxsRowRhs = rhs->getColIdxs(rowIdx); - size_t *colIdxsRowRes = res->getColIdxs(rowIdx); + const VT *valuesRowLhs = lhs->getRowValues(rowIdx); + const VT *valuesRowRhs = rhs->getRowValues(rowIdx); + VT *valuesRowRes = res->getRowValues(rowIdx); + const size_t *colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + const size_t *colIdxsRowRhs = rhs->getColIdxsOfRow(rowIdx); + size_t *colIdxsRowRes = res->getColIdxsOfRow(rowIdx); size_t posLhs = 0; size_t posRhs = 0; size_t posRes = 0; @@ -286,10 +286,10 @@ template struct EwBinaryMat, CSRMatrix, DenseMat size_t nnzRowLhs = lhs->getNumNonZeros(rowIdx); if (nnzRowLhs) { // intersect within row - const VT *valuesRowLhs = lhs->getValues(rowIdx); - VT *valuesRowRes = res->getValues(rowIdx); - const size_t *colIdxsRowLhs = lhs->getColIdxs(rowIdx); - size_t *colIdxsRowRes = res->getColIdxs(rowIdx); + const VT *valuesRowLhs = lhs->getRowValues(rowIdx); + VT *valuesRowRes = res->getRowValues(rowIdx); + const size_t *colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + size_t *colIdxsRowRes = res->getColIdxsOfRow(rowIdx); auto rhsRow = (rhs->getNumRows() == 1 ? 0 : rowIdx); size_t posRes = 0; for (size_t posLhs = 0; posLhs < nnzRowLhs; ++posLhs) { diff --git a/src/runtime/local/kernels/Gemv.h b/src/runtime/local/kernels/Gemv.h index a46c3afcc..0edc3ded6 100644 --- a/src/runtime/local/kernels/Gemv.h +++ b/src/runtime/local/kernels/Gemv.h @@ -100,8 +100,8 @@ template struct Gemv, CSRMatrix, DenseMatrixgetNumNonZeros(r); - const size_t *rowColIdxs = mat->getColIdxs(r); - const VT *rowValues = mat->getValues(r); + const size_t *rowColIdxs = mat->getColIdxsOfRow(r); + const VT *rowValues = mat->getRowValues(r); const size_t rowIdxRes = r * rowSkipRes; for (size_t i = 0; i < rowNumNonZeros; i++) { diff --git a/src/runtime/local/kernels/HasSpecialValue.h b/src/runtime/local/kernels/HasSpecialValue.h index 274412095..73e0c2314 100644 --- a/src/runtime/local/kernels/HasSpecialValue.h +++ b/src/runtime/local/kernels/HasSpecialValue.h @@ -99,8 +99,8 @@ template struct HasSpecialValue, auto numCols = arg->getNumCols(); auto numNonZeros = arg->getNumNonZeros(); auto numElements = numRows * numCols; - auto vBegin = arg->getValues(0); - auto vEnd = arg->getValues(numRows); + auto vBegin = arg->getRowValues(0); + auto vEnd = arg->getRowValues(numRows); auto hasZeroes = numNonZeros < numElements; auto zero = VT(0); diff --git a/src/runtime/local/kernels/IsSymmetric.h b/src/runtime/local/kernels/IsSymmetric.h index 60e4d7222..a63e1df48 100644 --- a/src/runtime/local/kernels/IsSymmetric.h +++ b/src/runtime/local/kernels/IsSymmetric.h @@ -104,8 +104,8 @@ template struct IsSymmetric> { for (size_t rowIdx = 0; rowIdx < numRows; rowIdx++) { - const VT *rowA = arg->getValues(rowIdx); - const size_t *colIdxsA = arg->getColIdxs(rowIdx); + const VT *rowA = arg->getRowValues(rowIdx); + const size_t *colIdxsA = arg->getColIdxsOfRow(rowIdx); const size_t numNonZerosA = arg->getNumNonZeros(rowIdx); for (size_t idx = 0; idx < numNonZerosA; idx++) { @@ -119,8 +119,8 @@ template struct IsSymmetric> { VT valA = rowA[idx]; // B references the transposed element to compare for symmetry. - const VT *rowB = arg->getValues(colIdxA); - const size_t *colIdxsB = arg->getColIdxs(colIdxA); + const VT *rowB = arg->getRowValues(colIdxA); + const size_t *colIdxsB = arg->getColIdxsOfRow(colIdxA); const size_t numNonZerosB = arg->getNumNonZeros(colIdxA); positions[colIdxA]++; // colIdxA is rowIdxB diff --git a/src/runtime/local/kernels/MatMul.h b/src/runtime/local/kernels/MatMul.h index 7e9710125..b5466b853 100644 --- a/src/runtime/local/kernels/MatMul.h +++ b/src/runtime/local/kernels/MatMul.h @@ -77,8 +77,8 @@ template struct MatMul, CSRMatrix, DenseMatrix memset(valuesRes, VT(0), sizeof(VT) * nr1 * nc2); for (size_t r = 0; r < nr1; r++) { const size_t rowNumNonZeros = lhs->getNumNonZeros(r); - const size_t *rowColIdxs = lhs->getColIdxs(r); - const VT *rowValues = lhs->getValues(r); + const size_t *rowColIdxs = lhs->getColIdxsOfRow(r); + const VT *rowValues = lhs->getRowValues(r); const size_t rowIdxRes = r * rowSkipRes; for (size_t i = 0; i < rowNumNonZeros; i++) { diff --git a/src/runtime/local/kernels/NumDistinctApprox.h b/src/runtime/local/kernels/NumDistinctApprox.h index 936aea425..64ff0a841 100644 --- a/src/runtime/local/kernels/NumDistinctApprox.h +++ b/src/runtime/local/kernels/NumDistinctApprox.h @@ -125,7 +125,7 @@ template struct NumDistinctApprox> { } for (size_t rowIdx = 0; rowIdx < numRows; rowIdx++) { - const VT *values = arg->getValues(rowIdx); + const VT *values = arg->getRowValues(rowIdx); const size_t numNonZerosInRow = arg->getNumNonZeros(rowIdx); for (size_t colIdx = 0; colIdx < numNonZerosInRow; colIdx++) { diff --git a/src/runtime/local/kernels/Replace.h b/src/runtime/local/kernels/Replace.h index b642e7b9c..bc8cca861 100644 --- a/src/runtime/local/kernels/Replace.h +++ b/src/runtime/local/kernels/Replace.h @@ -190,8 +190,8 @@ template struct Replace, CSRMatrix, VT> { //--------main logic -------------------------- if (pattern != pattern) { // pattern is NaN for (size_t r = 0; r < numRows; r++) { - const VT *allValues = arg->getValues(r); - VT *allUpdatedValues = res->getValues(r); + const VT *allValues = arg->getRowValues(r); + VT *allUpdatedValues = res->getRowValues(r); const size_t nnzElementsRes = arg->getNumNonZeros(r); for (size_t c = 0; c < nnzElementsRes; c++) { if (allValues[c] != allValues[c]) { @@ -201,8 +201,8 @@ template struct Replace, CSRMatrix, VT> { } } else { for (size_t r = 0; r < numRows; r++) { - const VT *allValues = arg->getValues(r); - VT *allUpdatedValues = res->getValues(r); + const VT *allValues = arg->getRowValues(r); + VT *allUpdatedValues = res->getRowValues(r); const size_t nnzElementsRes = arg->getNumNonZeros(r); for (size_t c = 0; c < nnzElementsRes; c++) { if (allValues[c] == pattern) { diff --git a/src/runtime/local/kernels/Reshape.h b/src/runtime/local/kernels/Reshape.h index de7edeb55..17efaf2da 100644 --- a/src/runtime/local/kernels/Reshape.h +++ b/src/runtime/local/kernels/Reshape.h @@ -56,9 +56,9 @@ template struct Reshape, DenseMatrix> { if (numRows * numCols != arg->getNumRows() * arg->getNumCols()) throw std::runtime_error("reshape must retain the number of cells"); - if (arg->getRowSkip() == arg->getNumCols() && res == nullptr) - res = DataObjectFactory::create>(numRows, numCols, arg->getValuesSharedPtr()); - else { + if (arg->getRowSkip() == arg->getNumCols() && arg->isView() == false) { + res = DataObjectFactory::create>(numRows, numCols, arg); + } else { if (res == nullptr) res = DataObjectFactory::create>(numRows, numCols, false); diff --git a/src/runtime/local/kernels/Tri.h b/src/runtime/local/kernels/Tri.h index b57b29bc5..b73d19c79 100644 --- a/src/runtime/local/kernels/Tri.h +++ b/src/runtime/local/kernels/Tri.h @@ -126,8 +126,8 @@ template struct Tri> { rowOffsetsRes[0] = 0; for (size_t r = 0, pos = 0; r < numRows; r++, (*inc)++) { const size_t rowNumNonZeros = arg->getNumNonZeros(r); - const size_t *rowColIdxs = arg->getColIdxs(r); - const VT *rowValues = arg->getValues(r); + const size_t *rowColIdxs = arg->getColIdxsOfRow(r); + const VT *rowValues = arg->getRowValues(r); for (size_t i = 0; i < rowNumNonZeros; i++) { const size_t c = rowColIdxs[i]; diff --git a/src/runtime/local/kernels/kernels.json b/src/runtime/local/kernels/kernels.json index 22e285d77..ee3c4bf2b 100644 --- a/src/runtime/local/kernels/kernels.json +++ b/src/runtime/local/kernels/kernels.json @@ -3750,6 +3750,7 @@ [["DenseMatrix", "uint8_t"]], [["CSRMatrix", "double"]], [["CSRMatrix", "float"]], + [["CSRMatrix", "int64_t"]], ["Frame"] ] }, diff --git a/test/runtime/local/datastructures/CSRMatrixTest.cpp b/test/runtime/local/datastructures/CSRMatrixTest.cpp index 827679e89..4cb9a4e70 100644 --- a/test/runtime/local/datastructures/CSRMatrixTest.cpp +++ b/test/runtime/local/datastructures/CSRMatrixTest.cpp @@ -34,7 +34,7 @@ TEMPLATE_TEST_CASE("CSRMatrix allocates enough space", TAG_DATASTRUCTURES, ALL_V const size_t numCols = 2000; const size_t numNonZeros = 500; - CSRMatrix *m = DataObjectFactory::create>(numRows, numCols, numNonZeros, false); + auto m = DataObjectFactory::create>(numRows, numCols, numNonZeros, false); ValueType *values = m->getValues(); size_t *colIdxs = m->getColIdxs(); @@ -60,10 +60,8 @@ TEST_CASE("CSRMatrix sub-matrix works properly", TAG_DATASTRUCTURES) { const size_t numColsOrig = 7; const size_t numNonZeros = 3; - CSRMatrix *mOrig = - DataObjectFactory::create>(numRowsOrig, numColsOrig, numNonZeros, true); - CSRMatrix *mSub = DataObjectFactory::create>(mOrig, 3, 5); - + auto mOrig = DataObjectFactory::create>(numRowsOrig, numColsOrig, numNonZeros, true); + auto mSub = DataObjectFactory::create>(mOrig, 3, 5); // Sub-matrix dimensions are as expected. CHECK(mSub->getNumRows() == 2); CHECK(mSub->getNumCols() == numColsOrig); diff --git a/test/runtime/local/datastructures/DenseMatrixTest.cpp b/test/runtime/local/datastructures/DenseMatrixTest.cpp index 8314439f4..e1b3c44de 100644 --- a/test/runtime/local/datastructures/DenseMatrixTest.cpp +++ b/test/runtime/local/datastructures/DenseMatrixTest.cpp @@ -167,9 +167,8 @@ TEST_CASE("DenseMatrix sub-matrix works properly", TAG_DATASTRUCTURES) { const size_t numColsOrig = 7; const size_t numCellsOrig = numRowsOrig * numColsOrig; - DenseMatrix *mOrig = DataObjectFactory::create>(numRowsOrig, numColsOrig, true); - DenseMatrix *mSub = DataObjectFactory::create>(mOrig, 3, 5, 1, 4); - + auto mOrig = DataObjectFactory::create>(numRowsOrig, numColsOrig, true); + auto mSub = DataObjectFactory::create>(mOrig, 3, 5, 1, 4); // Sub-matrix dimensions are as expected. CHECK(mSub->getNumRows() == 2); CHECK(mSub->getNumCols() == 3); diff --git a/test/runtime/local/kernels/CheckEqTest.cpp b/test/runtime/local/kernels/CheckEqTest.cpp index ed550a53f..7a0e940d1 100644 --- a/test/runtime/local/kernels/CheckEqTest.cpp +++ b/test/runtime/local/kernels/CheckEqTest.cpp @@ -296,8 +296,8 @@ TEST_CASE("CheckEq, frames", TAG_KERNELS) { } SECTION("diff inst, same schema, same cont, same labels") { auto c3 = genGivenVals>(numRows, {VT2(8.8), VT2(9.9), VT2(1.0), VT2(2.0)}); - std::string *labels1 = new std::string[3]{"ab", "cde", "fghi"}; - std::string *labels2 = new std::string[3]{"ab", "cde", "fghi"}; + auto labels1 = new std::string[3]{"ab", "cde", "fghi"}; + auto labels2 = new std::string[3]{"ab", "cde", "fghi"}; frame1 = DataObjectFactory::create(cols, labels1); std::vector cols2 = {c0, c1, c3}; auto frame2 = DataObjectFactory::create(cols2, labels2); @@ -307,8 +307,8 @@ TEST_CASE("CheckEq, frames", TAG_KERNELS) { } SECTION("diff inst, same schema, same cont, diff labels") { auto c3 = genGivenVals>(numRows, {VT2(8.8), VT2(9.9), VT2(1.0), VT2(2.0)}); - std::string *labels1 = new std::string[3]{"ab", "cde", "fghi"}; - std::string *labels2 = new std::string[3]{"ab", "cde", "fxyz"}; + auto labels1 = new std::string[3]{"ab", "cde", "fghi"}; + auto labels2 = new std::string[3]{"ab", "cde", "fxyz"}; frame1 = DataObjectFactory::create(cols, labels1); std::vector cols2 = {c0, c1, c3}; auto frame2 = DataObjectFactory::create(cols2, labels2);