diff --git a/src/runtime/distributed/coordinator/kernels/Broadcast.h b/src/runtime/distributed/coordinator/kernels/Broadcast.h index d048be423..64fd9b04b 100644 --- a/src/runtime/distributed/coordinator/kernels/Broadcast.h +++ b/src/runtime/distributed/coordinator/kernels/Broadcast.h @@ -82,9 +82,9 @@ struct Broadcast LoadPartitioningDistributed partioner(DistributionSchema::BROADCAST, mat, dctx); while (partioner.HasNextChunk()){ auto dp = partioner.GetNextChunk(); - auto rank = dynamic_cast(*(dp->allocation)).getRank(); + auto rank = dynamic_cast(dp->getAllocation(0))->getRank(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; // Minimum chunk size @@ -120,13 +120,13 @@ struct Broadcast WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank); std::string address=std::to_string(rank); DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = dataAcknowledgement.identifier; data.numRows = dataAcknowledgement.numRows; data.numCols = dataAcknowledgement.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); - + auto alloc = dynamic_cast(dp->getAllocation(0)); + alloc->updateDistributedData(data); } } }; @@ -160,12 +160,12 @@ struct Broadcast while(partioner.HasNextChunk()){ auto dp = partioner.GetNextChunk(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto address = dynamic_cast(*(dp->allocation)).getLocation(); + auto address = dynamic_cast(dp->getAllocation(0))->getLocation(); - StoredInfo storedInfo({dp->dp_id}); + StoredInfo storedInfo({dp->getID()}); caller.asyncStoreCall(address, storedInfo); // Minimum chunk size auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer
::length(mat) ? @@ -196,7 +196,7 @@ struct Broadcast auto dp_id = response.storedInfo.dp_id; auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); auto storedData = response.result; data.identifier = storedData.identifier(); @@ -204,7 +204,7 @@ struct Broadcast data.numCols = storedData.num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }; }; @@ -234,10 +234,10 @@ struct Broadcast while(partioner.HasNextChunk()){ auto dp = partioner.GetNextChunk(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto workerAddr = dynamic_cast(*(dp->allocation)).getLocation(); + auto workerAddr = dynamic_cast(dp->getAllocation(0))->getLocation(); std::thread t([=, &mat]() { // TODO Consider saving channels inside DaphneContext @@ -272,7 +272,7 @@ struct Broadcast newData.numRows = storedData.num_rows(); newData.numCols = storedData.num_cols(); newData.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(newData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(newData); }); threads_vector.push_back(move(t)); } diff --git a/src/runtime/distributed/coordinator/kernels/Distribute.h b/src/runtime/distributed/coordinator/kernels/Distribute.h index 179fda62c..a64932c7e 100644 --- a/src/runtime/distributed/coordinator/kernels/Distribute.h +++ b/src/runtime/distributed/coordinator/kernels/Distribute.h @@ -16,14 +16,14 @@ #pragma once -#include -#include -#include #include - -#include #include #include +#include +#include +#include +#include +#include #ifdef USE_MPI #include @@ -71,12 +71,12 @@ struct Distribute while (partioner.HasNextChunk()){ DataPlacement *dp = partioner.GetNextChunk(); - auto rank = dynamic_cast(*(dp->allocation)).getRank(); + auto rank = dynamic_cast(dp->getAllocation(0))->getRank(); - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); // Minimum chunk size auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer
::length(slicedMat) ? @@ -99,12 +99,12 @@ struct Distribute WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank); std::string address = std::to_string(rank); DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = dataAcknowledgement.identifier ; data.numRows = dataAcknowledgement.numRows; data.numCols = dataAcknowledgement.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } @@ -132,17 +132,17 @@ struct Distribute while (partioner.HasNextChunk()){ auto dp = partioner.GetNextChunk(); // Skip if already placed at workers - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; distributed::Data protoMsg; std::vector buffer; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); - StoredInfo storedInfo({dp->dp_id}); + StoredInfo storedInfo({dp->getID()}); - auto address = dynamic_cast(*(dp->allocation)).getLocation(); + auto address = dynamic_cast(dp->getAllocation(0))->getLocation(); caller.asyncStoreCall(address, storedInfo); // Minimum chunk size @@ -172,12 +172,12 @@ struct Distribute auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = storedData.identifier(); data.numRows = storedData.num_rows(); data.numCols = storedData.num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } @@ -202,14 +202,14 @@ struct Distribute while (partioner.HasNextChunk()){ auto dp = partioner.GetNextChunk(); // Skip if already placed at workers - if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) + if (dynamic_cast(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker) continue; std::vector buffer; - auto workerAddr = dynamic_cast(*(dp->allocation)).getLocation(); + auto workerAddr = dynamic_cast(dp->getAllocation(0))->getLocation(); std::thread t([=, &mat]() { auto stub = ctx->stubs[workerAddr].get(); @@ -217,7 +217,7 @@ struct Distribute distributed::StoredData storedData; grpc::ClientContext grpc_ctx; - auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len); + auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len); auto serializer = DaphneSerializerChunks
(slicedMat, dctx->config.max_distributed_serialization_chunk_size); distributed::Data protoMsg; @@ -238,10 +238,10 @@ struct Distribute newData.numRows = storedData.num_rows(); newData.numCols = storedData.num_cols(); newData.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(newData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(newData); DataObjectFactory::destroy(slicedMat); }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/kernels/DistributedCollect.h b/src/runtime/distributed/coordinator/kernels/DistributedCollect.h index 9e9216bf0..5b2393edf 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedCollect.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedCollect.h @@ -68,7 +68,7 @@ void distributedCollect(DT *&mat, const VectorCombine &combine, DCTX(dctx)) template struct DistributedCollect { - static void apply(DT *&mat, const VectorCombine& combine, DCTX(dctx)) + static void apply(DT *&mat, const VectorCombine& combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already allocated by wrapper since information regarding size only exists there"); @@ -80,7 +80,7 @@ struct DistributedCollect std::string address = std::to_string(rank); auto dp=mat->getMetaDataObject()->getDataPlacementByLocation(address); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); WorkerImpl::StoredInfo info = { distributedData.identifier, distributedData.numRows, @@ -106,22 +106,22 @@ struct DistributedCollect auto slicedMat = dynamic_cast*>(DF_deserialize(buffer)); if (combine == VectorCombine::ADD) { ewBinaryMat(BinaryOpCode::ADD, denseMat, slicedMat, denseMat, nullptr); - } else { - auto resValues = denseMat->getValues() + (dp->range->r_start * denseMat->getRowSkip()); + } else { + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++){ - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); + for (size_t r = 0; r < dp->getRange()->r_len; r++){ + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, dp->getRange()->c_len * sizeof(double)); resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } } DataObjectFactory::destroy(slicedMat); - collectedDataItems+= dp->range->r_len * dp->range->c_len; + collectedDataItems+= dp->getRange()->r_len * dp->getRange()->c_len; - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributedData.isPlacedAtWorker = false; - dynamic_cast(*(dp->allocation)).updateDistributedData(distributedData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(distributedData); // this is to handle the case when not all workers participate in the computation, i.e., number of workers is larger than of the work items if(collectedDataItems == denseMat->getNumRows() * denseMat->getNumCols()) break; @@ -137,7 +137,7 @@ struct DistributedCollect template struct DistributedCollect { - static void apply(DT *&mat, const VectorCombine& combine, DCTX(dctx)) + static void apply(DT *&mat, const VectorCombine& combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already allocated by wrapper since information regarding size only exists there"); @@ -148,12 +148,12 @@ struct DistributedCollect DistributedGRPCCaller caller(dctx); - auto dpVector = mat->getMetaDataObject()->getDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); + auto dpVector = mat->getMetaDataObject()->getRangeDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); for (auto &dp : *dpVector) { - auto address = dp->allocation->getLocation(); + auto address = dp->getAllocation(0)->getLocation(); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); - StoredInfo storedInfo({dp->dp_id}); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); + StoredInfo storedInfo({dp->getID()}); distributed::StoredData protoData; protoData.set_identifier(distributedData.identifier); protoData.set_num_rows(distributedData.numRows); @@ -168,7 +168,7 @@ struct DistributedCollect auto response = caller.getNextResult(); auto dp_id = response.storedInfo.dp_id; auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); auto matProto = response.result; @@ -183,18 +183,18 @@ struct DistributedCollect if (combine == VectorCombine::ADD) { ewBinaryMat(BinaryOpCode::ADD, denseMat, slicedMat, denseMat, nullptr); } else { - auto resValues = denseMat->getValues() + (dp->range->r_start * denseMat->getRowSkip()); + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++){ - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); - resValues += denseMat->getRowSkip(); + for (size_t r = 0; r < dp->getRange()->r_len; r++){ + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, dp->getRange()->c_len * sizeof(double)); + resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } } DataObjectFactory::destroy(slicedMat); data.isPlacedAtWorker = false; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }; }; @@ -207,7 +207,7 @@ struct DistributedCollect template struct DistributedCollect { - static void apply(DT *&mat, const VectorCombine& combine, DCTX(dctx)) + static void apply(DT *&mat, const VectorCombine& combine, DCTX(dctx)) { if (mat == nullptr) throw std::runtime_error("DistributedCollect gRPC: result matrix must be already allocated by wrapper since information regarding size only exists there"); @@ -216,11 +216,11 @@ struct DistributedCollect std::vector threads_vector; std::mutex lock; - auto dpVector = mat->getMetaDataObject()->getDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); + auto dpVector = mat->getMetaDataObject()->getRangeDataPlacementByType(ALLOCATION_TYPE::DIST_GRPC); for (auto &dp : *dpVector) { - auto address = dp->allocation->getLocation(); + auto address = dp->getAllocation(0)->getLocation(); - auto distributedData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distributedData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distributedData.identifier); protoData.set_num_rows(distributedData.numRows); @@ -246,19 +246,19 @@ struct DistributedCollect std::lock_guard g(lock); ewBinaryMat(BinaryOpCode::ADD, denseMat, slicedMat, denseMat, nullptr); } else { - auto resValues = denseMat->getValues() + (dp->range->r_start * denseMat->getRowSkip()); + auto resValues = denseMat->getValues() + (dp->getRange()->r_start * denseMat->getRowSkip()); auto slicedMatValues = slicedMat->getValues(); - for (size_t r = 0; r < dp->range->r_len; r++){ - memcpy(resValues + dp->range->c_start, slicedMatValues, dp->range->c_len * sizeof(double)); - resValues += denseMat->getRowSkip(); + for (size_t r = 0; r < dp->getRange()->r_len; r++){ + memcpy(resValues + dp->getRange()->c_start, slicedMatValues, dp->getRange()->c_len * sizeof(double)); + resValues += denseMat->getRowSkip(); slicedMatValues += slicedMat->getRowSkip(); } } DataObjectFactory::destroy(slicedMat); distributedData.isPlacedAtWorker = false; - dynamic_cast(*(dp->allocation)).updateDistributedData(distributedData); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(distributedData); }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/kernels/DistributedCompute.h b/src/runtime/distributed/coordinator/kernels/DistributedCompute.h index d8b700c01..bc72d2d8a 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedCompute.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedCompute.h @@ -86,7 +86,7 @@ struct DistributedCompute for (size_t i = 0; i < numInputs; i++) { auto dp = args[i]->getMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); MPIHelper::StoredInfo storedData({distrData.identifier, distrData.numRows, distrData.numCols}); task.inputs.push_back(storedData); @@ -105,12 +105,12 @@ struct DistributedCompute auto resMat = *res[idx++]; auto dp = resMat->getMetaDataObject()->getDataPlacementByLocation(std::to_string(rank)); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = info.identifier; data.numRows = info.numRows; data.numCols = info.numCols; data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } } @@ -151,7 +151,7 @@ struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distrData.identifier); @@ -179,12 +179,12 @@ struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = computeResult.outputs()[o].stored().identifier(); data.numRows = computeResult.outputs()[o].stored().num_rows(); data.numCols = computeResult.outputs()[o].stored().num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } } } @@ -225,7 +225,7 @@ struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto distrData = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto distrData = dynamic_cast(dp->getAllocation(0))->getDistributedData(); distributed::StoredData protoData; protoData.set_identifier(distrData.identifier); @@ -250,15 +250,15 @@ struct DistributedComputegetMetaDataObject()->getDataPlacementByLocation(addr); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); data.identifier = computeResult.outputs()[o].stored().identifier(); data.numRows = computeResult.outputs()[o].stored().num_rows(); data.numCols = computeResult.outputs()[o].stored().num_cols(); data.isPlacedAtWorker = true; - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } }); - threads_vector.push_back(move(t)); + threads_vector.push_back(std::move(t)); } for (auto &thread : threads_vector) thread.join(); diff --git a/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h b/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h index 03134e5cc..9eca147fa 100644 --- a/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h +++ b/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h @@ -16,7 +16,8 @@ #pragma once -#include +#include + #include #include #include @@ -64,7 +65,7 @@ class LoadPartitioningDistributed { // Each allocation descriptor might use a different constructor. // Here we provide the different implementations. // Another solution would be to make sure that every constructor is similar so this would not be needed. - static ALLOCATOR CreateAllocatorDescriptor(DaphneContext* ctx, const std::string &addr, const DistributedData &data) { + static ALLOCATOR CreateAllocatorDescriptor(DaphneContext* ctx, const std::string &addr, DistributedData& data) { if constexpr (std::is_same_v) return AllocationDescriptorMPI(std::stoi(addr), ctx, data); else if constexpr (std::is_same_v) @@ -86,7 +87,6 @@ class LoadPartitioningDistributed { ((taskIndex + 1) * k + std::min(taskIndex + 1, m)) - ((taskIndex * k) + std::min(taskIndex, m)), mat->getNumCols() ); - break; } case DistributionSchema::BROADCAST: return Range( @@ -95,7 +95,6 @@ class LoadPartitioningDistributed { mat->getNumRows(), mat->getNumCols() ); - break; default: throw std::runtime_error("Unknown distribution scheme"); } @@ -122,24 +121,24 @@ class LoadPartitioningDistributed { DataPlacement *dp; if ((dp = mat->getMetaDataObject()->getDataPlacementByLocation(workerAddr))) { - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + auto data = dynamic_cast(dp->getAllocation(0))->getDistributedData(); // Check if existing placement matches the same ranges we currently need if (data.isPlacedAtWorker) { - auto existingRange = dp->range.get(); + auto existingRange = dp->getRange(); if (*existingRange == range) data.isPlacedAtWorker = true; else { - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); + mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); data.isPlacedAtWorker = false; } } else - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); + mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); // TODO Currently we do not support distributing/splitting // by columns. When we do, this should be changed (e.g. Index(0, taskIndex)) // This can be decided based on DistributionSchema data.ix = GetDistributedIndex(); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } else { // Else, create new object metadata entry DistributedData data; @@ -147,7 +146,9 @@ class LoadPartitioningDistributed { // by columns. When we do, this should be changed (e.g. Index(0, taskIndex)) data.ix = GetDistributedIndex(); auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data); - dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + dp = mat->getMetaDataObject()->addDataPlacement(allocations, &range); } taskIndex++; return dp; @@ -214,13 +215,15 @@ class LoadPartitioningDistributed { // If dp already exists for this worker, update the range and data if (auto dp = (*outputs[i])->getMetaDataObject()->getDataPlacementByLocation(workerAddr)) { - (*outputs[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + (*outputs[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } else { // else create new dp entry auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data); - ((*outputs[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + ((*outputs[i]))->getMetaDataObject()->addDataPlacement(allocations, &range); } } } diff --git a/src/runtime/distributed/worker/MPICoordinator.h b/src/runtime/distributed/worker/MPICoordinator.h index 320dd8d11..82a14ad05 100644 --- a/src/runtime/distributed/worker/MPICoordinator.h +++ b/src/runtime/distributed/worker/MPICoordinator.h @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -15,7 +14,6 @@ #include #include #include -##include #include #include @@ -100,15 +98,14 @@ class MPICoordinator{ std::string addr= std::to_string(COORDINATOR); // If dp already exists for this worker, update the range and data if (auto dp = (*res[i])->getMetaDataObject()->getDataPlacementByLocation(addr)) { - (*res[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); + (*res[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->getID(), &range); + dynamic_cast(dp->getAllocation(0))->updateDistributedData(data); } else { // else create new dp entry - AllocationDescriptorMPI allocationDescriptor( - dctx, - COORDINATOR, - data); - ((*res[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + AllocationDescriptorMPI allocationDescriptor(COORDINATOR, dctx, data); + std::vector> allocations; + allocations.emplace_back(&allocationDescriptor); + ((*res[i]))->getMetaDataObject()->addDataPlacement(allocations, &range); } } //solver.Compute(&outputsStoredInfo, inputsStoredInfo, mlirCode); diff --git a/src/runtime/local/context/CUDAContext.cpp b/src/runtime/local/context/CUDAContext.cpp index e0bbf513c..491324346 100644 --- a/src/runtime/local/context/CUDAContext.cpp +++ b/src/runtime/local/context/CUDAContext.cpp @@ -50,7 +50,6 @@ void CUDAContext::init() { logger->info("Using CUDA device {}: {}\n\tAvailable mem: {} Total mem: {} using {}% -> {}", device_id, device_properties.name, available, total, mem_usage * 100, mem_budget); - CHECK_CUBLAS(cublasCreate(&cublas_handle)); CHECK_CUSPARSE(cusparseCreate(&cusparse_handle)); CHECK_CUDNN(cudnnCreate(&cudnn_handle)); @@ -67,7 +66,6 @@ void CUDAContext::init() { CHECK_CUSOLVER(cusolverDnSetStream(cusolver_handle, cusolver_stream)); getCUDNNWorkspace(64 * 1024 * 1024); - // CHECK_CUBLAS(cublasLtCreate(&cublaslt_Handle)); // CHECK_CUDART(cudaMalloc(&cublas_workspace, cublas_workspace_size)); } @@ -145,6 +143,6 @@ void CUDAContext::free(size_t id) { allocations.erase(id); } -int CUDAContext::getMaxNumThreads() { +int CUDAContext::getMaxNumThreads() const { return device_properties.maxThreadsPerBlock; } diff --git a/src/runtime/local/context/CUDAContext.h b/src/runtime/local/context/CUDAContext.h index 08c3e8e7e..aeeb08d3a 100644 --- a/src/runtime/local/context/CUDAContext.h +++ b/src/runtime/local/context/CUDAContext.h @@ -78,7 +78,7 @@ class CUDAContext final : public IContext { void* getCUDNNWorkspace(size_t size); [[nodiscard]] size_t getMemBudget() const { return mem_budget; } - int getMaxNumThreads(); + int getMaxNumThreads() const; static CUDAContext* get(DaphneContext* ctx, size_t id) { return dynamic_cast(ctx->getCUDAContext(id)); } std::shared_ptr malloc(size_t size, bool zero, size_t& id); diff --git a/src/runtime/local/datastructures/AllocationDescriptorCUDA.h b/src/runtime/local/datastructures/AllocationDescriptorCUDA.h index 4a104e2bf..1ccdab1d3 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorCUDA.h +++ b/src/runtime/local/datastructures/AllocationDescriptorCUDA.h @@ -28,8 +28,6 @@ class AllocationDescriptorCUDA : public IAllocationDescriptor { size_t alloc_id{}; public: - AllocationDescriptorCUDA() = delete; - AllocationDescriptorCUDA(DaphneContext* ctx, uint32_t device_id) : device_id(device_id), dctx(ctx) { } ~AllocationDescriptorCUDA() override { @@ -44,9 +42,12 @@ class AllocationDescriptorCUDA : public IAllocationDescriptor { // [[nodiscard]] uint32_t getLocation() const { return device_id; } [[nodiscard]] std::string getLocation() const override { return std::to_string(device_id); } - void createAllocation(size_t size, bool zero) override { + [[nodiscard]] std::unique_ptr createAllocation(size_t size, bool zero) const override { + auto new_alloc = std::make_unique(dctx, device_id); auto ctx = CUDAContext::get(dctx, device_id); - data = ctx->malloc(size, zero, alloc_id); + new_alloc->data = ctx->malloc(size, zero, new_alloc->alloc_id); + new_alloc->size = size; + return new_alloc; } std::shared_ptr getData() override { return data; } diff --git a/src/runtime/local/datastructures/AllocationDescriptorGRPC.h b/src/runtime/local/datastructures/AllocationDescriptorGRPC.h index bb03310bf..56770a0e1 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorGRPC.h +++ b/src/runtime/local/datastructures/AllocationDescriptorGRPC.h @@ -41,7 +41,7 @@ class AllocationDescriptorGRPC : public IAllocationDescriptor { std::string getLocation() const override {return workerAddress; }; - void createAllocation(size_t size, bool zero) override {} + [[nodiscard]] std::unique_ptr createAllocation(size_t size, bool zero) const override {} // TODO: Implement transferTo and transferFrom functions std::shared_ptr getData() override { throw std::runtime_error("TransferTo/From functions are not implemented yet."); diff --git a/src/runtime/local/datastructures/AllocationDescriptorHost.h b/src/runtime/local/datastructures/AllocationDescriptorHost.h index 22b1639f8..685ff0335 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorHost.h +++ b/src/runtime/local/datastructures/AllocationDescriptorHost.h @@ -18,6 +18,7 @@ #include "DataPlacement.h" #include +#include class AllocationDescriptorHost : public IAllocationDescriptor { ALLOCATION_TYPE type = ALLOCATION_TYPE::HOST; @@ -25,15 +26,41 @@ class AllocationDescriptorHost : public IAllocationDescriptor { public: ~AllocationDescriptorHost() override = default; + [[nodiscard]] ALLOCATION_TYPE getType() const override { return type; } - void createAllocation(size_t size, bool zero) override { } - std::string getLocation() const override - { return "Host"; } + + static std::unique_ptr createHostAllocation(std::shared_ptr data, size_t size, + bool zero) { + auto new_alloc = std::make_unique(); + new_alloc->data = std::move(data); + new_alloc->size = size; + if(zero) + memset(new_alloc->data.get(), 0, size); + return new_alloc; + } + + [[nodiscard]] std::unique_ptr createAllocation(size_t size, bool zero) const override { + auto new_alloc = std::make_unique(); + new_alloc->size = size; + new_alloc->data = std::shared_ptr(new std::byte[size], std::default_delete()); + + if(zero) + memset(new_alloc->data.get(), 0, size); + + return new_alloc; + } + + [[nodiscard]] std::string getLocation() const override { return "Host"; } + std::shared_ptr getData() override { return data; } + + void setData(std::shared_ptr& _data) { data = _data; } + void transferTo(std::byte* src, size_t size) override { } void transferFrom(std::byte* dst, size_t size) override {} [[nodiscard]] std::unique_ptr clone() const override { return std::make_unique(*this); } + bool operator==(const IAllocationDescriptor* other) const override { return (getType() == other->getType()); } }; diff --git a/src/runtime/local/datastructures/AllocationDescriptorMPI.h b/src/runtime/local/datastructures/AllocationDescriptorMPI.h index e38923e88..9005d92d7 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorMPI.h +++ b/src/runtime/local/datastructures/AllocationDescriptorMPI.h @@ -26,27 +26,24 @@ class AllocationDescriptorMPI : public IAllocationDescriptor { ALLOCATION_TYPE type = ALLOCATION_TYPE::DIST_MPI; - int processRankID; - DaphneContext* ctx; + int processRankID{}; + DaphneContext* ctx{}; DistributedData distributedData; std::shared_ptr data; public: - AllocationDescriptorMPI() {} ; - AllocationDescriptorMPI(int id, - DaphneContext* ctx, - DistributedData data) : processRankID(id), ctx(ctx), distributedData(data) {} ; + AllocationDescriptorMPI() = default; + AllocationDescriptorMPI(int id, DaphneContext* ctx, DistributedData& data) : processRankID(id), ctx(ctx), + distributedData(data) {} - ~AllocationDescriptorMPI() override {}; + ~AllocationDescriptorMPI() override = default; - [[nodiscard]] ALLOCATION_TYPE getType() const override - { return type; }; + [[nodiscard]] ALLOCATION_TYPE getType() const override { return type; }; - std::string getLocation() const override { - return std::to_string(processRankID); - }; - - void createAllocation(size_t size, bool zero) override {} ; + [[nodiscard]] std::string getLocation() const override { return std::to_string(processRankID); }; + + [[nodiscard]] std::unique_ptr createAllocation(size_t size, bool zero) const override {} + std::shared_ptr getData() override {return nullptr;} ; bool operator==(const IAllocationDescriptor* other) const override { @@ -62,14 +59,13 @@ class AllocationDescriptorMPI : public IAllocationDescriptor { void transferTo(std::byte *src, size_t size) override { /* TODO */ }; void transferFrom(std::byte *src, size_t size) override { /* TODO */ }; - const DistributedIndex getDistributedIndex() - { return distributedData.ix; } - const DistributedData getDistributedData() - { return distributedData; } - void updateDistributedData(DistributedData data_) - { distributedData = data_; } - int getRank() - { return processRankID; } + DistributedIndex getDistributedIndex() const { return distributedData.ix; } + + DistributedData getDistributedData() { return distributedData; } + + void updateDistributedData(DistributedData& data_) { distributedData = data_; } + + int getRank() const { return processRankID; } }; #endif //SRC_RUNTIME_LOCAL_DATASTRUCTURE_ALLOCATION_DESCRIPTORMPH_H diff --git a/src/runtime/local/datastructures/CSRMatrix.cpp b/src/runtime/local/datastructures/CSRMatrix.cpp index d23254878..a6a9c0181 100644 --- a/src/runtime/local/datastructures/CSRMatrix.cpp +++ b/src/runtime/local/datastructures/CSRMatrix.cpp @@ -18,6 +18,54 @@ #include "CSRMatrix.h" +template +CSRMatrix::CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero, IAllocationDescriptor* allocInfo) : + Matrix(maxNumRows, numCols), numRowsAllocated(maxNumRows), isRowAllocatedBefore(false), + is_view(false), maxNumNonZeros(maxNumNonZeros), lastAppendedRowIdx(0) { + auto val_buf_size = maxNumNonZeros * sizeof(ValueType); + auto cidx_buf_size = maxNumNonZeros * sizeof(size_t); + auto rptr_buf_size = (numRows + 1) * sizeof(size_t); + + std::unique_ptr val_alloc; + std::unique_ptr cidx_alloc; + std::unique_ptr rptr_alloc; + + if(!allocInfo) { + values = std::shared_ptr(new ValueType[maxNumNonZeros], std::default_delete()); + auto bytes = std::reinterpret_pointer_cast(values); + val_alloc = AllocationDescriptorHost::createHostAllocation(bytes, val_buf_size, zero); + colIdxs = std::shared_ptr(new size_t[maxNumNonZeros], std::default_delete()); + bytes = std::reinterpret_pointer_cast(colIdxs); + cidx_alloc = AllocationDescriptorHost::createHostAllocation(bytes, cidx_buf_size, zero); + rowOffsets = std::shared_ptr(new size_t[numRows + 1], std::default_delete()); + bytes = std::reinterpret_pointer_cast(rowOffsets); + rptr_alloc = AllocationDescriptorHost::createHostAllocation(bytes, rptr_buf_size, zero); + } + else { + val_alloc = allocInfo->createAllocation(val_buf_size, zero); + cidx_alloc = allocInfo->createAllocation(cidx_buf_size, zero); + rptr_alloc = allocInfo->createAllocation(rptr_buf_size, zero); + + // ToDo: refactor data storage into memory management + if(allocInfo->getType() == ALLOCATION_TYPE::HOST) { + values = std::reinterpret_pointer_cast(val_alloc->getData()); + colIdxs = std::reinterpret_pointer_cast(cidx_alloc->getData()); + rowOffsets = std::reinterpret_pointer_cast(rptr_alloc->getData()); + } + } + + spdlog::debug("Creating {} x {} sparse matrix of type: {}. Required memory: vals={}, cidxs={}, rptrs={}, total={} Mb", numRows, numCols, + val_alloc->getTypeName(), static_cast(val_buf_size) / (1048576), static_cast(cidx_buf_size) / (1048576), + static_cast(rptr_buf_size) / (1048576), static_cast(val_buf_size + cidx_buf_size + rptr_buf_size) / (1048576)); + + std::vector> allocations; + allocations.emplace_back(std::move(val_alloc)); + allocations.emplace_back(std::move(cidx_alloc)); + allocations.emplace_back(std::move(rptr_alloc)); + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); +} + template size_t CSRMatrix::serialize(std::vector &buf) const { return DaphneSerializer>::serialize(this, buf); diff --git a/src/runtime/local/datastructures/CSRMatrix.h b/src/runtime/local/datastructures/CSRMatrix.h index 9f6f310d2..2acbc160d 100644 --- a/src/runtime/local/datastructures/CSRMatrix.h +++ b/src/runtime/local/datastructures/CSRMatrix.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -60,7 +61,9 @@ class CSRMatrix : public Matrix { size_t numRowsAllocated; bool isRowAllocatedBefore; - + + const bool is_view; + /** * @brief The maximum number of non-zero values this matrix was allocated * to accommodate. @@ -91,23 +94,8 @@ class CSRMatrix : public Matrix { * @param zero Whether the allocated memory of the internal arrays shall be * initialized to zeros (`true`), or be left uninitialized (`false`). */ - CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero) : - Matrix(maxNumRows, numCols), - numRowsAllocated(maxNumRows), - isRowAllocatedBefore(false), - maxNumNonZeros(maxNumNonZeros), - values(new ValueType[maxNumNonZeros], std::default_delete()), - colIdxs(new size_t[maxNumNonZeros], std::default_delete()), - rowOffsets(new size_t[numRows + 1], std::default_delete()), - lastAppendedRowIdx(0) - { - if(zero) { - memset(values.get(), 0, maxNumNonZeros * sizeof(ValueType)); - memset(colIdxs.get(), 0, maxNumNonZeros * sizeof(size_t)); - memset(rowOffsets.get(), 0, (numRows + 1) * sizeof(size_t)); - } - } - + CSRMatrix(size_t maxNumRows, size_t numCols, size_t maxNumNonZeros, bool zero, IAllocationDescriptor* allocInfo = nullptr); + /** * @brief Creates a `CSRMatrix` around a sub-matrix of another `CSRMatrix` * without copying the data. @@ -120,7 +108,7 @@ class CSRMatrix : public Matrix { Matrix(rowUpperExcl - rowLowerIncl, src->numCols), numRowsAllocated(src->numRowsAllocated - rowLowerIncl), isRowAllocatedBefore(rowLowerIncl > 0), - lastAppendedRowIdx(0) + is_view(true), lastAppendedRowIdx(0) { if (!src) throw std::runtime_error("CSRMatrix: src must not be null"); @@ -130,16 +118,19 @@ class CSRMatrix : public Matrix { throw std::runtime_error("CSRMatrix: rowUpperExcl is out of bounds"); if (rowLowerIncl >= rowUpperExcl) throw std::runtime_error("CSRMatrix: rowLowerIncl must be lower than rowUpperExcl"); - + + this->row_offset = rowLowerIncl; maxNumNonZeros = src->maxNumNonZeros; values = src->values; colIdxs = src->colIdxs; rowOffsets = std::shared_ptr(src->rowOffsets, src->rowOffsets.get() + rowLowerIncl); + //ToDo: temporary fix to directly assign mdo + this->mdo = src->mdo; } - - virtual ~CSRMatrix() { - // nothing to do - } + + [[nodiscard]] size_t offset() const { return this->row_offset; } + + virtual ~CSRMatrix() = default; void fillNextPosUntil(size_t nextPos, size_t rowIdx) { if(rowIdx > lastAppendedRowIdx) { @@ -161,19 +152,22 @@ class CSRMatrix : public Matrix { this->numRows = numRows; } - size_t getMaxNumNonZeros() const { + [[nodiscard]] size_t getMaxNumNonZeros() const { return maxNumNonZeros; } - size_t getNumNonZeros() const { - return rowOffsets.get()[numRows] - rowOffsets.get()[0]; + [[nodiscard]] size_t getNumNonZeros() const { + return getRowOffsets()[numRows] - getRowOffsets()[0]; } - size_t getNumNonZeros(size_t rowIdx) const { + [[nodiscard]] size_t getNumNonZeros(size_t rowIdx) const { if (rowIdx >= numRows) throw std::runtime_error("CSRMatrix (getNumNonZeros): rowIdx is out of bounds"); - return rowOffsets.get()[rowIdx + 1] - rowOffsets.get()[rowIdx]; + auto roff = getRowOffsets(); + auto rownext = roff[rowIdx+1]; + auto row = roff[rowIdx]; + return rownext - row; } - + void shrinkNumNonZeros(size_t numNonZeros) { if (numNonZeros > getNumNonZeros()) throw std::runtime_error("CSRMatrix (shrinkNumNonZeros): numNonZeros can only be shrunk"); @@ -181,50 +175,54 @@ class CSRMatrix : public Matrix { // colIdxs arrays. } - ValueType * getValues() { - return values.get(); + ValueType* getValues(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) { + return reinterpret_cast(this->mdo->getData(0, alloc_desc, range)); } - - const ValueType * getValues() const { - return values.get(); + + const ValueType* getValues(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) const { + return reinterpret_cast(this->mdo->getData(0, alloc_desc, range)); } - - ValueType * getValues(size_t rowIdx) { + + ValueType* getRowValues(size_t rowIdx, const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) { // We allow equality here to enable retrieving a pointer to the end. if (rowIdx > numRows) throw std::runtime_error("CSRMatrix (getValues): rowIdx is out of bounds"); - return values.get() + rowOffsets.get()[rowIdx]; + return &(reinterpret_cast(getValues(alloc_desc, range))[getRowOffsets()[rowIdx]]); } - - const ValueType * getValues(size_t rowIdx) const { - return const_cast *>(this)->getValues(rowIdx); + + const ValueType* getRowValues(size_t rowIdx, const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) const { + return &(reinterpret_cast(getValues(alloc_desc, range))[getRowOffsets()[rowIdx]]); } - size_t * getColIdxs() { - return colIdxs.get(); + size_t* getColIdxs(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) { + return reinterpret_cast(this->mdo->getData(1, alloc_desc, range)); } - const size_t * getColIdxs() const { - return colIdxs.get(); + const size_t* getColIdxs(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) const { + return reinterpret_cast(this->mdo->getData(1, alloc_desc, range)); } - size_t * getColIdxs(size_t rowIdx) { + size_t* getColIdxsOfRow(size_t rowIdx, const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) { // We allow equality here to enable retrieving a pointer to the end. if (rowIdx > numRows) throw std::runtime_error("CSRMatrix (getColIdxs): rowIdx is out of bounds"); - return colIdxs.get() + rowOffsets.get()[rowIdx]; + auto data = this->mdo->getData(1, alloc_desc, range); + return &(reinterpret_cast(data)[getRowOffsets()[rowIdx]]); } - const size_t * getColIdxs(size_t rowIdx) const { - return const_cast *>(this)->getColIdxs(rowIdx); + const size_t* getColIdxsOfRow(size_t rowIdx, const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) const { + auto data = this->mdo->getData(1, alloc_desc, range); + return &(reinterpret_cast(data)[getRowOffsets()[rowIdx]]); } - size_t * getRowOffsets() { - return rowOffsets.get(); + size_t* getRowOffsets(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) { + auto ptr = reinterpret_cast(this->mdo->getData(2, alloc_desc, range)); + return ptr + offset(); } - const size_t * getRowOffsets() const { - return rowOffsets.get(); + const size_t* getRowOffsets(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) const { + auto ptr = reinterpret_cast(this->mdo->getData(2, alloc_desc, range)); + return ptr + offset(); } ValueType get(size_t rowIdx, size_t colIdx) const override { @@ -233,8 +231,8 @@ class CSRMatrix : public Matrix { if (colIdx >= numCols) throw std::runtime_error("CSRMatrix (get): colIdx is out of bounds"); - const size_t * rowColIdxsBeg = getColIdxs(rowIdx); - const size_t * rowColIdxsEnd = getColIdxs(rowIdx + 1); + const size_t * rowColIdxsBeg = getColIdxsOfRow(rowIdx); + const size_t * rowColIdxsEnd = getColIdxsOfRow(rowIdx + 1); const size_t * ptrExpected = std::lower_bound(rowColIdxsBeg, rowColIdxsEnd, colIdx); if(ptrExpected == rowColIdxsEnd || *ptrExpected != colIdx) @@ -242,7 +240,7 @@ class CSRMatrix : public Matrix { return ValueType(0); else // Entry for the given coordinates present. - return getValues(rowIdx)[ptrExpected - rowColIdxsBeg]; + return getRowValues(rowIdx)[ptrExpected - rowColIdxsBeg]; } void set(size_t rowIdx, size_t colIdx, ValueType value) override { @@ -251,13 +249,13 @@ class CSRMatrix : public Matrix { if (colIdx >= numCols) throw std::runtime_error("CSRMatrix (set): colIdx is out of bounds"); - size_t * rowColIdxsBeg = getColIdxs(rowIdx); - size_t * rowColIdxsEnd = getColIdxs(rowIdx + 1); + size_t * rowColIdxsBeg = getColIdxsOfRow(rowIdx); + size_t * rowColIdxsEnd = getColIdxsOfRow(rowIdx + 1); const size_t * ptrExpected = std::lower_bound(rowColIdxsBeg, rowColIdxsEnd, colIdx); const size_t posExpected = ptrExpected - rowColIdxsBeg; const size_t posEnd = colIdxs.get() + rowOffsets.get()[numRowsAllocated] - rowColIdxsBeg; - ValueType * rowValuesBeg = getValues(rowIdx); + ValueType * rowValuesBeg = getRowValues(rowIdx); if(ptrExpected == rowColIdxsEnd || *ptrExpected != colIdx) { // No entry for the given coordinates present. @@ -333,8 +331,9 @@ class CSRMatrix : public Matrix { fillNextPosUntil(rowOffsets.get()[lastAppendedRowIdx + 1], numRows - 1); } - bool isView() const { - return (numRowsAllocated > numRows || isRowAllocatedBefore); + [[nodiscard]] bool isView() const { +// return (numRowsAllocated > numRows || isRowAllocatedBefore); + return is_view; } void printValue(std::ostream & os, ValueType val) const { @@ -350,12 +349,12 @@ class CSRMatrix : public Matrix { << ValueTypeUtils::cppNameFor << ')' << std::endl; // Note that, in general, the values within one row might not be sorted // by column index. Thus, the following is a little complicated. - ValueType * oneRow = new ValueType[numCols]; + auto oneRow = new ValueType[numCols]; for (size_t r = 0; r < numRows; r++) { memset(oneRow, 0, numCols * sizeof(ValueType)); const size_t rowNumNonZeros = getNumNonZeros(r); - const size_t * rowColIdxs = getColIdxs(r); - const ValueType * rowValues = getValues(r); + const size_t* rowColIdxs = getColIdxsOfRow(r); + const ValueType* rowValues = getRowValues(r); for(size_t i = 0; i < rowNumNonZeros; i++) oneRow[rowColIdxs[i]] = rowValues[i]; for(size_t c = 0; c < numCols; c++) { @@ -427,10 +426,10 @@ class CSRMatrix : public Matrix { if(numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) return false; - const ValueType * valuesBegLhs = this->getValues(0); - const ValueType * valuesEndLhs = this->getValues(numRows); - const ValueType * valuesBegRhs = rhs.getValues(0); - const ValueType * valuesEndRhs = rhs.getValues(numRows); + const ValueType * valuesBegLhs = this->getRowValues(0); + const ValueType * valuesEndLhs = this->getRowValues(numRows); + const ValueType * valuesBegRhs = rhs.getRowValues(0); + const ValueType * valuesEndRhs = rhs.getRowValues(numRows); const size_t nnzLhs = valuesEndLhs - valuesBegLhs; const size_t nnzRhs = valuesEndRhs - valuesBegRhs; @@ -446,7 +445,7 @@ class CSRMatrix : public Matrix { const size_t * colIdxsBegRhs = rhs.getColIdxs(0); if(colIdxsBegLhs != colIdxsBegRhs) - if(memcmp(colIdxsBegLhs, colIdxsBegRhs, nnzLhs * sizeof(size_t))) + if(memcmp(colIdxsBegLhs, colIdxsBegRhs, nnzLhs * sizeof(size_t)) != 0) return false; return true; diff --git a/src/runtime/local/datastructures/DataPlacement.h b/src/runtime/local/datastructures/DataPlacement.h index 6410d2a85..218a70e12 100644 --- a/src/runtime/local/datastructures/DataPlacement.h +++ b/src/runtime/local/datastructures/DataPlacement.h @@ -23,20 +23,33 @@ #include /** - * The DataPlacement struct binds an allocation descriptor to a range description and stores an ID of + * The DataPlacement struct binds allocation descriptors to a range description and stores an ID of * an instantiated object. + * + * The number of allocations varys depending on the data type (1 allocation of values of a dense matrix + * three allocations for values/col_idxs/row_ptrs of a CSR matrix) */ -struct DataPlacement { +class DataPlacement { size_t dp_id; // used to generate object IDs static std::atomic_size_t instance_count; - std::unique_ptr allocation{}; + std::vector> allocations{}; std::unique_ptr range{}; +public: DataPlacement() = delete; - DataPlacement(std::unique_ptr _a, std::unique_ptr _r) : dp_id(instance_count++), - allocation(std::move(_a)), range(std::move(_r)) { } + DataPlacement(std::vector> _a, std::unique_ptr _r) : + dp_id(instance_count++), allocations(std::move(_a)), range(std::move(_r)) { } + + [[nodiscard]] size_t getID() const {return dp_id;} + std::string getLocation() { return allocations.front()->getLocation(); } + Range* getRange() { return range.get(); } + void setRange(Range* r) { range.reset(r); } + void setRange(std::unique_ptr r) { range = std::move(r); } + uint32_t getNumAllocations() { return allocations.size(); } + IAllocationDescriptor* getAllocation(uint32_t num) { return allocations[num].get(); } + }; diff --git a/src/runtime/local/datastructures/DenseMatrix.cpp b/src/runtime/local/datastructures/DenseMatrix.cpp index 4520478c8..78baf34c5 100644 --- a/src/runtime/local/datastructures/DenseMatrix.cpp +++ b/src/runtime/local/datastructures/DenseMatrix.cpp @@ -60,165 +60,75 @@ DenseMatrix::DenseMatrix(size_t maxNumRows, size_t numCols, bool zero Matrix(maxNumRows, numCols), is_view(false), rowSkip(numCols), bufferSize(numRows*numCols*sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - DataPlacement* new_data_placement; - if(allocInfo != nullptr) { - spdlog::debug("Creating {} x {} dense matrix of type: {}. Required memory: {} Mb", - numRows, numCols, static_cast(allocInfo->getType()), - static_cast(getBufferSize()) / (1048576)); - - new_data_placement = this->mdo->addDataPlacement(allocInfo); - new_data_placement->allocation->createAllocation(getBufferSize(), zero); + std::unique_ptr val_alloc; + if(!allocInfo) { + alloc_shared_values(zero); + auto bytes = std::reinterpret_pointer_cast(values); + val_alloc = AllocationDescriptorHost::createHostAllocation(bytes, getBufferSize(), zero); } else { - AllocationDescriptorHost myHostAllocInfo; - alloc_shared_values(); - if(zero) - memset(values.get(), 0, maxNumRows * numCols * sizeof(ValueType)); - new_data_placement = this->mdo->addDataPlacement(&myHostAllocInfo); + val_alloc = allocInfo->createAllocation(getBufferSize(), zero); + + // ToDo: refactor data storage into memory management + if(allocInfo->getType() == ALLOCATION_TYPE::HOST) { + alloc_shared_values(zero); + + auto bytes = std::reinterpret_pointer_cast(values); + dynamic_cast(val_alloc.get())->setData(bytes); + } } - this->mdo->addLatest(new_data_placement->dp_id); + spdlog::debug("Creating {} x {} dense matrix of type: {}. Required memory: {} Mb", numRows, numCols, + val_alloc->getTypeName(), static_cast(getBufferSize()) / (1048576)); + + std::vector> allocations; + allocations.emplace_back(std::move(val_alloc)); + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); } template DenseMatrix::DenseMatrix(size_t numRows, size_t numCols, std::shared_ptr& values) : Matrix(numRows, numCols), is_view(false), rowSkip(numCols), values(values), bufferSize(numRows*numCols*sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - AllocationDescriptorHost myHostAllocInfo; - DataPlacement* new_data_placement = this->mdo->addDataPlacement(&myHostAllocInfo); - this->mdo->addLatest(new_data_placement->dp_id); + auto bytes = std::reinterpret_pointer_cast(values); + + std::vector> allocations; + auto vec_alloc = AllocationDescriptorHost::createHostAllocation(bytes, getBufferSize(), false); + allocations.emplace_back(std::move(vec_alloc)); + + auto p = this->mdo->addDataPlacement(allocations); + this->mdo->addLatest(p->getID()); } template DenseMatrix::DenseMatrix(const DenseMatrix * src, int64_t rowLowerIncl, int64_t rowUpperExcl, int64_t colLowerIncl, int64_t colUpperExcl) : Matrix(rowUpperExcl - rowLowerIncl, - colUpperExcl - colLowerIncl), is_view(true), bufferSize(numRows*numCols*sizeof(ValueType)), - lastAppendedRowIdx(0), lastAppendedColIdx(0) -{ + colUpperExcl - colLowerIncl), is_view(true), rowSkip(src->rowSkip), bufferSize(numRows*numCols*sizeof(ValueType)), + lastAppendedRowIdx(0), lastAppendedColIdx(0) { validateArgs(src, rowLowerIncl, rowUpperExcl, colLowerIncl, colUpperExcl); - this->row_offset = rowLowerIncl; - this->col_offset = colLowerIncl; - rowSkip = src->rowSkip; + this->row_offset = isView() ? src->row_offset + rowLowerIncl : rowLowerIncl; + this->col_offset = isView() ? src->col_offset + colLowerIncl : colLowerIncl; // ToDo: manage host mem (values) in a data placement if(src->values) { - alloc_shared_values(src->values, offset()); bufferSize = numRows*rowSkip*sizeof(ValueType); + this->values = src->values; } - this->clone_mdo(src); + this->mdo = src->mdo; } template DenseMatrix::DenseMatrix(size_t numRows, size_t numCols, const DenseMatrix *src) : - Matrix(numRows, numCols), is_view(false), rowSkip(numCols), + Matrix(numRows, numCols), is_view(src->is_view), rowSkip(src->getRowSkip()), bufferSize(numRows*numCols*sizeof(ValueType)), lastAppendedRowIdx(0), lastAppendedColIdx(0) { - if(src->values) - values = src->values; - this->clone_mdo(src); -} - -template -auto DenseMatrix::getValuesInternal(const IAllocationDescriptor* alloc_desc, const Range* range) - -> std::tuple { - // If no range information is provided we assume the full range that this matrix covers - if(range == nullptr || *range == Range(*this)) { - if(alloc_desc) { - auto ret = this->mdo->findDataPlacementByType(alloc_desc, range); - if(!ret) { - // find other allocation type X (preferably host allocation) to transfer from in latest_version - - // tuple content: - std::tuple result = std::make_tuple(false, 0, nullptr); - auto latest = this->mdo->getLatest(); - DataPlacement *placement; - for (auto &placement_id: latest) { - placement = this->mdo->getDataPlacementByID(placement_id); - if(placement->range == nullptr || *(placement->range) == Range{0, 0, this->getNumRows(), - this->getNumCols()}) { - std::get<0>(result) = true; - std::get<1>(result) = placement->dp_id; - // prefer host allocation - if(placement->allocation->getType() == ALLOCATION_TYPE::HOST) { - std::get<2>(result) = reinterpret_cast(values.get()); - break; - } - } - } - - // if we found a data placement that is not in host memory, transfer it there before returning - if(std::get<0>(result) == true && std::get<2>(result) == nullptr) { - AllocationDescriptorHost myHostAllocInfo; - if(!values) - this->alloc_shared_values(); - this->mdo->addDataPlacement(&myHostAllocInfo); - placement->allocation->transferFrom(reinterpret_cast(startAddress()), getBufferSize()); - std::get<2>(result) = startAddress(); - } - - // create new data placement - auto new_data_placement = const_cast *>(this)->mdo->addDataPlacement(alloc_desc); - new_data_placement->allocation->createAllocation(getBufferSize(), false); - - // transfer to requested data placement - new_data_placement->allocation->transferTo(reinterpret_cast(startAddress()), getBufferSize()); - return std::make_tuple(false, new_data_placement->dp_id, reinterpret_cast( - new_data_placement->allocation->getData().get())); - } - else { - bool latest = this->mdo->isLatestVersion(ret->dp_id); - if(!latest) { - ret->allocation->transferTo(reinterpret_cast(startAddress()), getBufferSize()); - } - return std::make_tuple(latest, ret->dp_id, reinterpret_cast(ret->allocation->getData() - .get())); - } - } - else { - // if no alloc info was provided we try to get/create a full host allocation and return that - std::tuple result = std::make_tuple(false, 0, nullptr); - auto latest = this->mdo->getLatest(); - DataPlacement *placement; - for (auto &placement_id: latest) { - placement = this->mdo->getDataPlacementByID(placement_id); - - // only consider allocations covering full range of matrix - if(placement->range == nullptr || *(placement->range) == Range{this->row_offset, this->col_offset, - this->getNumRows(), this->getNumCols()}) { - std::get<0>(result) = true; - std::get<1>(result) = placement->dp_id; - // prefer host allocation - if(placement->allocation->getType() == ALLOCATION_TYPE::HOST) { - std::get<2>(result) = reinterpret_cast(startAddress()); - break; - } - } - } + if(!is_view) + rowSkip = numCols; - // if we found a data placement that is not in host memory, transfer it there before returning - if(std::get<0>(result) == true && std::get<2>(result) == nullptr) { - AllocationDescriptorHost myHostAllocInfo; - - // ToDo: this needs fixing in the context of matrix views - if(!values) - const_cast*>(this)->alloc_shared_values(); - - this->mdo->addDataPlacement(&myHostAllocInfo); -// std::cout << "bufferSize: " << getBufferSize() << " RxRS: " << this->getNumRows() * this->getRowSkip() * sizeof(ValueType) << std::endl; -// if(getBufferSize() != this->getNumRows() * this->getRowSkip()) { -// placement->allocation->transferFrom(reinterpret_cast(startAddress()), this->getNumRows() * this->getRowSkip() * sizeof(ValueType)); -// } -// else - placement->allocation->transferFrom(reinterpret_cast(startAddress()), getBufferSize()); - std::get<2>(result) = startAddress(); - } - if(std::get<2>(result) == nullptr) - throw std::runtime_error("No object meta data in matrix"); - else - return result; - } - } - else - throw std::runtime_error("Range support under construction"); + this->row_offset = src->row_offset; + this->col_offset = src->col_offset; + this->values = src->values; + this->mdo = src->mdo; } template void DenseMatrix::printValue(std::ostream & os, ValueType val) const { @@ -238,14 +148,53 @@ template <> } template -void DenseMatrix::alloc_shared_values(std::shared_ptr src, size_t offset) { +void DenseMatrix::alloc_shared_values(bool zero, std::shared_ptr src, size_t offset) { // correct since C++17: Calls delete[] instead of simple delete if(src) { values = std::shared_ptr(src, src.get() + offset); } - else -// values = std::shared_ptr(new ValueType[numRows*numCols]); - values = std::shared_ptr(new ValueType[numRows * getRowSkip()]); + else { + values = std::shared_ptr(new ValueType[getBufferSize()]); + if(zero) + memset(values.get(), 0, getBufferSize()); + } +} + +template +bool DenseMatrix::operator==(const DenseMatrix & rhs) const { + // Note that we do not use the generic `get` interface to matrices here since + // this operator is meant to be used for writing tests for, besides others, + // those generic interfaces. + + if(this == &rhs) + return true; + + const size_t numRows = this->getNumRows(); + const size_t numCols = this->getNumCols(); + + if(numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) + return false; + + const ValueType* valuesLhs = this->getValues(); + const ValueType* valuesRhs = rhs.getValues(); + + size_t rowSkipLhs = this->getRowSkip(); + size_t rowSkipRhs = rhs.getRowSkip(); + + if(valuesLhs == valuesRhs && rowSkipLhs == rowSkipRhs) + return true; + + if(rowSkipLhs == numCols && rowSkipRhs == numCols) + return !memcmp(valuesLhs, valuesRhs, numRows * numCols * sizeof(ValueType)); + else { + for(size_t r = 0; r < numRows; r++) { + if(memcmp(valuesLhs, valuesRhs, numCols * sizeof(ValueType)) != 0) + return false; + valuesLhs += rowSkipLhs; + valuesRhs += rowSkipRhs; + } + return true; + } } template @@ -258,6 +207,43 @@ size_t DenseMatrix::serialize(std::vector &buf) const{ throw std::runtime_error("DenseMatrix serialization not implemented"); } +template +const ValueType* DenseMatrix::getValues(const IAllocationDescriptor* alloc_desc, const Range* range) const { + const ValueType* ptr; + if(this->isPinned(alloc_desc)) { + ptr = reinterpret_cast(this->pinned_mem); + } + else { + this->pinned_mem = this->mdo->getData(0, alloc_desc, range); + ptr = reinterpret_cast(this->pinned_mem); + const_cast*>(this)->pin(alloc_desc); + } + + if(isView()) + return ptr + offset(); + else + return ptr; +} + +template +ValueType* DenseMatrix::getValues(const IAllocationDescriptor* alloc_desc, const Range* range) { + ValueType* ptr; + if(this->isPinned(alloc_desc)) { + ptr = reinterpret_cast(this->pinned_mem); + } + else { + this->pinned_mem = this->mdo->getData(0, alloc_desc, range); + ptr = reinterpret_cast(this->pinned_mem); + this->pin(alloc_desc); + } + + if(isView()) + return ptr + offset(); + else + return ptr; +} + + // ---------------------------------------------------------------------------- // const char* specialization DenseMatrix::DenseMatrix(size_t maxNumRows, size_t numCols, bool zero, size_t strBufferCapacity_, ALLOCATION_TYPE type) : diff --git a/src/runtime/local/datastructures/DenseMatrix.h b/src/runtime/local/datastructures/DenseMatrix.h index 4ebdd4bdd..fa4d6202f 100644 --- a/src/runtime/local/datastructures/DenseMatrix.h +++ b/src/runtime/local/datastructures/DenseMatrix.h @@ -96,11 +96,12 @@ class DenseMatrix : public Matrix int64_t colUpperExcl); /** - * @brief Creates a `DenseMatrix` around an existing array of values without copying the data. + * @brief Creates a `DenseMatrix` around an existing `DenseMatrix` without copying the data. + * This is used in transpose and reshape operations for example; * * @param numRows The exact number of rows. * @param numCols The exact number of columns. - * @param values A `std::shared_ptr` to an existing array of values. + * @param src A source matrix */ DenseMatrix(size_t numRows, size_t numCols, const DenseMatrix* src); @@ -119,10 +120,10 @@ class DenseMatrix : public Matrix const size_t startPosIncl = pos(lastAppendedRowIdx, lastAppendedColIdx) + 1; const size_t endPosExcl = pos(rowIdx, colIdx); if(startPosIncl < endPosExcl) - memset(values.get() + startPosIncl, 0, (endPosExcl - startPosIncl) * sizeof(ValueType)); + memset(getValues() + startPosIncl, 0, (endPosExcl - startPosIncl) * sizeof(ValueType)); } else { - auto v = values.get() + lastAppendedRowIdx * rowSkip; + auto v = getValues() + lastAppendedRowIdx * rowSkip; memset(v + lastAppendedColIdx + 1, 0, (numCols - lastAppendedColIdx - 1) * sizeof(ValueType)); v += rowSkip; for(size_t r = lastAppendedRowIdx + 1; r < rowIdx; r++) { @@ -136,39 +137,9 @@ class DenseMatrix : public Matrix void printValue(std::ostream & os, ValueType val) const; - void alloc_shared_values(std::shared_ptr src = nullptr, size_t offset = 0); - + void alloc_shared_values(bool zero = false, std::shared_ptr src = nullptr, size_t offset = 0); - /** - * @brief The getValuesInternal method fetches a pointer to an allocation of values. Optionally a sub range - * can be specified. - * - * This method is called by the public getValues() methods either in const or non-const fashion. The const version - * returns a data pointer that is meant to be read-only. Read only access updates the list of up-to-date - * allocations (the latest_versions "list"). This way several copies of the data in various allocations can be - * kept without invalidating their copy. If the read write version of getValues() is used, the latest_versions - * list is cleared because a write to an allocation is assumed, which renders the other allocations of the - * same data out of sync. - * - * @param alloc_desc An instance of an IAllocationDescriptor derived class that is used to specify what type of - * allocation is requested. If no allocation descriptor is provided, a host allocation (plain main memory) is - * assumed by default. - * - * @param range An optional range describing which rows and columns of a matrix-like structure are requested. - * By default this is null and means all rows and columns. - * @return A tuple of three values is returned: - * 1: bool - is the returend allocation in the latest_versions list - * 2: size_t - the ID of the data placement (a structure relating an allocation to a range) - * 3: ValueType* - the pointer to the actual data - */ - - auto getValuesInternal(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) - -> std::tuple; - [[nodiscard]] size_t offset() const { return this->row_offset * rowSkip + this->col_offset; } - - - ValueType* startAddress() const { return isPartialBuffer() ? values.get() + offset() : values.get(); } public: @@ -200,13 +171,7 @@ class DenseMatrix : public Matrix * @param range A Range object describing optionally requesting a sub range of a data structure. * @return A pointer to the data in the requested memory space */ - const ValueType* getValues(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) const - { - auto[isLatest, id, ptr] = const_cast *>(this)->getValuesInternal(alloc_desc, range); - if(!isLatest) - this->mdo->addLatest(id); - return ptr; - } + const ValueType* getValues(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) const; /** * @brief Fetch a pointer to the data held by this structure meant for read-write access. @@ -217,22 +182,19 @@ class DenseMatrix : public Matrix * @param alloc_desc An allocation descriptor describing which type of memory is requested (e.g. main memory in * the current system, memory in an accelerator card or memory in another host) * + * * @param range A Range object describing optionally requesting a sub range of a data structure. * @return A pointer to the data in the requested memory space */ - ValueType* getValues(IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) { - auto [isLatest, id, ptr] = const_cast*>(this)->getValuesInternal(alloc_desc, range); - if(!isLatest) - this->mdo->setLatest(id); - return ptr; - } + ValueType* getValues(const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr); std::shared_ptr getValuesSharedPtr() const { return values; } ValueType get(size_t rowIdx, size_t colIdx) const override { - return getValues()[pos(rowIdx, colIdx, isPartialBuffer())]; + auto position = pos(rowIdx, colIdx, isPartialBuffer()); + return getValues()[position]; } void set(size_t rowIdx, size_t colIdx, ValueType value) override { @@ -243,7 +205,7 @@ class DenseMatrix : public Matrix void prepareAppend() override { // The matrix might be empty. if (numRows != 0 && numCols != 0) - values.get()[0] = ValueType(0); + getValues()[0] = ValueType(0); lastAppendedRowIdx = 0; lastAppendedColIdx = 0; } @@ -252,7 +214,7 @@ class DenseMatrix : public Matrix // Set all cells since the last one that was appended to zero. fillZeroUntil(rowIdx, colIdx); // Set the specified cell. - values.get()[pos(rowIdx, colIdx)] = value; + getValues()[pos(rowIdx, colIdx)] = value; // Update append state. lastAppendedRowIdx = rowIdx; lastAppendedColIdx = colIdx; @@ -298,41 +260,7 @@ class DenseMatrix : public Matrix [[nodiscard]] size_t getBufferSize() const { return bufferSize; } - bool operator==(const DenseMatrix & rhs) const { - // Note that we do not use the generic `get` interface to matrices here since - // this operator is meant to be used for writing tests for, besides others, - // those generic interfaces. - - if(this == &rhs) - return true; - - const size_t numRows = this->getNumRows(); - const size_t numCols = this->getNumCols(); - - if(numRows != rhs.getNumRows() || numCols != rhs.getNumCols()) - return false; - - const ValueType* valuesLhs = this->getValues(); - const ValueType* valuesRhs = rhs.getValues(); - - const size_t rowSkipLhs = this->getRowSkip(); - const size_t rowSkipRhs = rhs.getRowSkip(); - - if(valuesLhs == valuesRhs && rowSkipLhs == rowSkipRhs) - return true; - - if(rowSkipLhs == numCols && rowSkipRhs == numCols) - return !memcmp(valuesLhs, valuesRhs, numRows * numCols * sizeof(ValueType)); - else { - for(size_t r = 0; r < numRows; r++) { - if(memcmp(valuesLhs, valuesRhs, numCols * sizeof(ValueType))) - return false; - valuesLhs += rowSkipLhs; - valuesRhs += rowSkipRhs; - } - return true; - } - } + bool operator==(const DenseMatrix & rhs) const; size_t serialize(std::vector &buf) const override; }; diff --git a/src/runtime/local/datastructures/IAllocationDescriptor.h b/src/runtime/local/datastructures/IAllocationDescriptor.h index 75eb9fa88..34c2689b8 100644 --- a/src/runtime/local/datastructures/IAllocationDescriptor.h +++ b/src/runtime/local/datastructures/IAllocationDescriptor.h @@ -17,6 +17,7 @@ #pragma once #include +#include // An alphabetically sorted wishlist of supported allocation types ;-) // Supporting all of that is probably unmaintainable :-/ @@ -36,6 +37,11 @@ enum class ALLOCATION_TYPE { NUM_ALLOC_TYPES }; +// string representation of enum +static std::string_view allocation_types[] = { "DIST_GRPC", "DIST_GRPC_ASYNC", "DIST_GRPC_SYNC", "DIST_MPI", "DIST_SPARK", + "GPU_CUDA", "GPU_HIP", "HOST", "HOST_PINNED_CUDA", "FPGA_INT", "FPGA_XLX", + "ONEAPI", "NUM_ALLOC_TYPES" }; + /** * @brief The IAllocationDescriptor interface class describes an abstract interface to handle memory allocations * @@ -47,14 +53,19 @@ enum class ALLOCATION_TYPE { * the allocator. */ class IAllocationDescriptor { +protected: + size_t size = 0; public: virtual ~IAllocationDescriptor() = default; [[nodiscard]] virtual ALLOCATION_TYPE getType() const = 0; - virtual void createAllocation(size_t size, bool zero) = 0; - virtual std::string getLocation() const = 0; + [[nodiscard]] virtual std::unique_ptr createAllocation(size_t size, bool zero) const = 0; + [[nodiscard]] virtual std::string getLocation() const = 0; virtual std::shared_ptr getData() = 0; virtual void transferTo(std::byte* src, size_t size) = 0; virtual void transferFrom(std::byte* dst, size_t size) = 0; [[nodiscard]] virtual std::unique_ptr clone() const = 0; virtual bool operator==(const IAllocationDescriptor* other) const { return (getType() == other->getType()); } + [[nodiscard]] virtual size_t getSize() const { return size; }; + + virtual std::string_view getTypeName() const { return allocation_types[static_cast(getType())]; } }; diff --git a/src/runtime/local/datastructures/MetaDataObject.cpp b/src/runtime/local/datastructures/MetaDataObject.cpp index 1005a15b4..4ff7bfe66 100644 --- a/src/runtime/local/datastructures/MetaDataObject.cpp +++ b/src/runtime/local/datastructures/MetaDataObject.cpp @@ -17,21 +17,37 @@ #include "DataPlacement.h" #include "MetaDataObject.h" -DataPlacement* MetaDataObject::addDataPlacement(const IAllocationDescriptor *allocInfo, Range *r) { - data_placements[static_cast(allocInfo->getType())].emplace_back(std::make_unique( - allocInfo->clone(), r == nullptr ? nullptr : r->clone())); - return data_placements[static_cast(allocInfo->getType())].back().get(); +#include + +DataPlacement* MetaDataObject::addDataPlacement(std::vector>& allocInfos, Range *r) { + if(r) { + auto type = static_cast(allocInfos.front()->getType()); + range_data_placements[type].emplace_back(std::make_unique(std::move(allocInfos), r->clone())); + return range_data_placements[type].back().get(); + } + else { + auto type = static_cast(allocInfos.front()->getType()); + data_placements[type] = std::make_unique( + std::move(allocInfos), nullptr); + return data_placements[type].get(); + } + } -auto MetaDataObject::getDataPlacementByType(ALLOCATION_TYPE type) const +auto MetaDataObject::getRangeDataPlacementByType(ALLOCATION_TYPE type) const -> const std::vector>* { - return &(data_placements[static_cast(type)]); + return &(range_data_placements[static_cast(type)]); +} + +auto MetaDataObject::getDataPlacementByType(ALLOCATION_TYPE type) const -> DataPlacement* { + return data_placements[static_cast(type)].get(); } DataPlacement* MetaDataObject::getDataPlacementByLocation(const std::string& location) const { - for (const auto &_omdType: data_placements) { + // ToDo: no range? + for (const auto &_omdType: range_data_placements) { for (auto &_omd: _omdType) { - if(_omd->allocation->getLocation() == location) + if(_omd->getLocation() == location) return const_cast(_omd.get()); } } @@ -39,10 +55,10 @@ DataPlacement* MetaDataObject::getDataPlacementByLocation(const std::string& loc } void MetaDataObject::updateRangeDataPlacementByID(size_t id, Range *r) { - for(auto &_omdType : data_placements) { + for(auto &_omdType : range_data_placements) { for(auto& _omd : _omdType) { - if(_omd->dp_id == id){ - _omd->range = r->clone(); + if(_omd->getID() == id){ + _omd->setRange(r->clone()); return; } } @@ -50,30 +66,19 @@ void MetaDataObject::updateRangeDataPlacementByID(size_t id, Range *r) { } DataPlacement *MetaDataObject::getDataPlacementByID(size_t id) const { - for (const auto &_omdType: data_placements) { - for (auto &_omd: _omdType) { - if(_omd->dp_id == id) - return const_cast(_omd.get()); - } + for (const auto &dp: data_placements) { + if(dp) + if(dp->getID() == id) + return const_cast(dp.get()); } - return nullptr; -} - -const DataPlacement* MetaDataObject::findDataPlacementByType(const IAllocationDescriptor *alloc_desc, const Range *range) const { - auto res = getDataPlacementByType(alloc_desc->getType()); - if(res->empty()) - return nullptr; - else { - for (size_t i = 0; i < res->size(); ++i) { - if((*res)[i]->allocation->operator==(alloc_desc)) { - if(((*res)[i]->range == nullptr && range == nullptr) || - ((*res)[i]->range != nullptr && (*res)[i]->range->operator==(range))) { - return (*res)[i].get(); - } - } + for (const auto &rdp_by_type: range_data_placements) { + for (auto &rdp: rdp_by_type) { + if(rdp) + if(rdp->getID() == id) + return const_cast(rdp.get()); } - return nullptr; } + return nullptr; } bool MetaDataObject::isLatestVersion(size_t placement) const { @@ -92,3 +97,69 @@ void MetaDataObject::setLatest(size_t id) { auto MetaDataObject::getLatest() const -> std::vector { return latest_version; } + +DataPlacement *MetaDataObject::getDataPlacement(const IAllocationDescriptor* alloc_desc) { + auto dp = getDataPlacementByType(alloc_desc->getType()); + if(!dp) { + // find other allocation type X (preferably host allocation) to transfer from in latest_version + auto latest = getLatest(); + DataPlacement *placement = getDataPlacementByID(latest.front()); + + std::vector> allocations; + for(uint32_t i = 0; i < placement->getNumAllocations(); ++i) { + auto other_alloc = placement->getAllocation(i); + auto new_alloc = alloc_desc->createAllocation(other_alloc->getSize(), false); + + // transfer to requested data placement + // ToDo: this works for the HOST <-> CUDA use case for now + // ToDo: update *Matrix own shared ptrs when transferring back from other storage + if(new_alloc->getType() == ALLOCATION_TYPE::HOST) + other_alloc->transferFrom(new_alloc->getData().get(), new_alloc->getSize()); + else + new_alloc->transferTo(other_alloc->getData().get(), other_alloc->getSize()); + allocations.emplace_back(std::move(new_alloc)); + } + return addDataPlacement(allocations); + } + else { + bool isLatest = isLatestVersion(dp->getID()); + if(!isLatest) { + auto latest = getDataPlacementByID(getLatest().front()); + for(uint32_t i = 0; i < latest->getNumAllocations(); ++i) { + auto other_alloc = latest->getAllocation(i); + dp->getAllocation(i)->transferTo(other_alloc->getData().get(), other_alloc->getSize()); + } + } + return dp; + } +} + +std::pair MetaDataObject::getDataInternal(uint32_t alloc_idx, const IAllocationDescriptor* alloc_desc, const Range* range) { + DataPlacement *dp; + if (!range) { + if (!alloc_desc) { + auto ad = AllocationDescriptorHost(); + dp = getDataPlacement(&ad); + } else + dp = getDataPlacement(alloc_desc); + + return std::make_pair(dp->getID(), dp->getAllocation(alloc_idx)->getData().get()); + } else + throw std::runtime_error("Range support under construction"); +} + +const std::byte* MetaDataObject::getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, const Range *range) const { + const std::lock_guard lock(mtx); + + auto [id, ptr] = const_cast(this)->getDataInternal(alloc_idx, alloc_desc, range); + const_cast(this)->addLatest(id); + return ptr; +} + + std::byte*MetaDataObject::getData(uint32_t alloc_idx, const IAllocationDescriptor *alloc_desc, const Range *range) { + const std::lock_guard lock(mtx); + + auto [id, ptr] = getDataInternal(alloc_idx, alloc_desc, range); + setLatest(id); + return ptr; +} \ No newline at end of file diff --git a/src/runtime/local/datastructures/MetaDataObject.h b/src/runtime/local/datastructures/MetaDataObject.h index 75d1ea3b4..cb339ced9 100644 --- a/src/runtime/local/datastructures/MetaDataObject.h +++ b/src/runtime/local/datastructures/MetaDataObject.h @@ -35,21 +35,29 @@ class DataPlacement; */ class MetaDataObject { std::array>, - static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES)> data_placements; - std::vector latest_version; + static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES)> range_data_placements; + std::array, static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES)> data_placements; + mutable std::mutex mtx{}; + std::vector latest_version; + std::pair getDataInternal(uint32_t alloc_idx, const IAllocationDescriptor* alloc_desc, const Range* range); public: - DataPlacement *addDataPlacement(const IAllocationDescriptor *allocInfo, Range *r = nullptr); - const DataPlacement *findDataPlacementByType(const IAllocationDescriptor *alloc_desc, const Range *range) const; + DataPlacement *addDataPlacement(std::vector>& allocInfos, Range *r = nullptr); [[nodiscard]] DataPlacement *getDataPlacementByID(size_t id) const; [[nodiscard]] DataPlacement *getDataPlacementByLocation(const std::string& location) const; - [[nodiscard]] auto getDataPlacementByType(ALLOCATION_TYPE type) const -> + [[nodiscard]] auto getRangeDataPlacementByType(ALLOCATION_TYPE type) const -> const std::vector>*; + [[nodiscard]] auto getDataPlacementByType(ALLOCATION_TYPE type) const -> DataPlacement*; + void updateRangeDataPlacementByID(size_t id, Range *r); + DataPlacement* getDataPlacement(const IAllocationDescriptor* alloc_desc); + [[nodiscard]] bool isLatestVersion(size_t placement) const; void addLatest(size_t id); void setLatest(size_t id); [[nodiscard]] auto getLatest() const -> std::vector; + const std::byte* getData(uint32_t alloc_idx, const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr) const; + std::byte* getData(uint32_t alloc_idx, const IAllocationDescriptor* alloc_desc = nullptr, const Range* range = nullptr); }; diff --git a/src/runtime/local/datastructures/Structure.cpp b/src/runtime/local/datastructures/Structure.cpp index 234b1982a..2c129dfaa 100644 --- a/src/runtime/local/datastructures/Structure.cpp +++ b/src/runtime/local/datastructures/Structure.cpp @@ -19,18 +19,12 @@ Structure::Structure(size_t numRows, size_t numCols) : refCounter(1), numRows(numRows), numCols(numCols) { mdo = std::make_shared(); -}; +} -void Structure::clone_mdo(const Structure* src) { - // FIXME: This clones the meta data to avoid locking (thread synchronization for data copy) - for(int i = 0; i < static_cast(ALLOCATION_TYPE::NUM_ALLOC_TYPES); i++) { - auto placements = src->mdo->getDataPlacementByType(static_cast(i)); - for(auto it = placements->begin(); it != placements->end(); it++) { - auto src_alloc = it->get()->allocation.get(); - auto src_range = it->get()->range.get(); - auto new_data_placement = this->mdo->addDataPlacement(src_alloc, src_range); - if(src->mdo->isLatestVersion(it->get()->dp_id)) - this->mdo->addLatest(new_data_placement->dp_id); - } - } +bool Structure::isPinned(const IAllocationDescriptor* alloc_desc) const { + return alloc_desc ? pinned_allocation == alloc_desc->getType() : pinned_allocation == ALLOCATION_TYPE::HOST; +} + +void Structure::pin(const IAllocationDescriptor *alloc_desc) { + alloc_desc ? pinned_allocation = alloc_desc->getType() : pinned_allocation = ALLOCATION_TYPE::HOST; } \ No newline at end of file diff --git a/src/runtime/local/datastructures/Structure.h b/src/runtime/local/datastructures/Structure.h index 81cee5116..f08ebc0a7 100644 --- a/src/runtime/local/datastructures/Structure.h +++ b/src/runtime/local/datastructures/Structure.h @@ -39,13 +39,17 @@ class Structure size_t col_offset{}; size_t numRows; size_t numCols; + mutable ALLOCATION_TYPE pinned_allocation{}; + mutable std::byte* pinned_mem{}; Structure(size_t numRows, size_t numCols); mutable std::shared_ptr mdo; - void clone_mdo(const Structure* src); - + [[nodiscard]] bool isPinned(const IAllocationDescriptor* alloc_desc) const; + + void pin(const IAllocationDescriptor* alloc_desc); + public: virtual ~Structure() = default; diff --git a/src/runtime/local/io/DaphneSerializer.h b/src/runtime/local/io/DaphneSerializer.h index f81ca667e..566f3f92c 100644 --- a/src/runtime/local/io/DaphneSerializer.h +++ b/src/runtime/local/io/DaphneSerializer.h @@ -561,7 +561,7 @@ struct DaphneSerializer, false> { return bufferIdx; - const size_t * colIdxs = arg->getColIdxs(0); + const size_t * colIdxs = arg->getColIdxsOfRow(0); if (serializeFromByte < serializationIdx + nzb * sizeof(size_t)){ size_t startOffset = serializeFromByte > serializationIdx ? serializeFromByte - serializationIdx : 0; size_t bytesToCopy = 0; @@ -584,7 +584,7 @@ struct DaphneSerializer, false> { if (chunkSize <= bufferIdx) return bufferIdx; - const VT * vals = arg->getValues(0); + const VT * vals = arg->getRowValues(0); if (serializeFromByte < serializationIdx + nzb * sizeof(VT)){ size_t startOffset = serializeFromByte > serializationIdx ? serializeFromByte - serializationIdx : 0; size_t bytesToCopy = 0; diff --git a/src/runtime/local/kernels/AggAll.h b/src/runtime/local/kernels/AggAll.h index 95d1ef55e..f2f7a26ca 100644 --- a/src/runtime/local/kernels/AggAll.h +++ b/src/runtime/local/kernels/AggAll.h @@ -141,7 +141,7 @@ struct AggAll> { EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(opCode)); return aggArray( - arg->getValues(0), + arg->getRowValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), func, @@ -153,7 +153,7 @@ struct AggAll> { else { // The op-code is either MEAN or STDDEV or VAR. EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); auto agg = aggArray( - arg->getValues(0), + arg->getRowValues(0), arg->getNumNonZeros(), arg->getNumRows() * arg->getNumCols(), func, diff --git a/src/runtime/local/kernels/AggCol.h b/src/runtime/local/kernels/AggCol.h index ccdff51fc..0730e87f2 100644 --- a/src/runtime/local/kernels/AggCol.h +++ b/src/runtime/local/kernels/AggCol.h @@ -207,8 +207,8 @@ struct AggCol, CSRMatrix> { // for MEAN and STDDDEV, we need to sum func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); - const VTArg * valuesArg = arg->getValues(0); - const size_t * colIdxsArg = arg->getColIdxs(0); + const VTArg * valuesArg = arg->getRowValues(0); + const size_t * colIdxsArg = arg->getColIdxsOfRow(0); const size_t numNonZeros = arg->getNumNonZeros(); diff --git a/src/runtime/local/kernels/AggCum.h b/src/runtime/local/kernels/AggCum.h index 314f5ddb1..ec1b3b623 100644 --- a/src/runtime/local/kernels/AggCum.h +++ b/src/runtime/local/kernels/AggCum.h @@ -69,18 +69,21 @@ struct AggCum, DenseMatrix> { AggOpCodeUtils::getBinaryOpCode(opCode) ); + auto arg_inc = arg->isView() ? arg->getNumCols() : arg->getRowSkip(); + auto res_inc = res->isView() ? res->getNumCols() : res->getRowSkip(); + // First row: copy from arg to res. for(size_t c = 0; c < numCols; c++) valuesResCur[c] = valuesArg[c]; - valuesArg += arg->getRowSkip(); - valuesResCur += res->getRowSkip(); + valuesArg += arg_inc; + valuesResCur += res_inc; // Remaining rows: calculate from previous res row and current arg row. for(size_t r = 1; r < numRows; r++) { for(size_t c = 0; c < numCols; c++) valuesResCur[c] = func(valuesResPrv[c], valuesArg[c], ctx); - valuesArg += arg->getRowSkip(); - valuesResPrv += res->getRowSkip(); - valuesResCur += res->getRowSkip(); + valuesArg += arg_inc; + valuesResPrv += res_inc; + valuesResCur += res_inc; } } }; diff --git a/src/runtime/local/kernels/AggRow.h b/src/runtime/local/kernels/AggRow.h index 82a34ae9b..83c7be7f6 100644 --- a/src/runtime/local/kernels/AggRow.h +++ b/src/runtime/local/kernels/AggRow.h @@ -191,7 +191,7 @@ struct AggRow, CSRMatrix> { for(size_t r = 0; r < numRows; r++) { *valuesRes = AggAll>::aggArray( - arg->getValues(r), + arg->getRowValues(r), arg->getNumNonZeros(r), numCols, func, @@ -212,7 +212,7 @@ struct AggRow, CSRMatrix> { EwBinaryScaFuncPtr func = getEwBinaryScaFuncPtr(AggOpCodeUtils::getBinaryOpCode(AggOpCode::SUM)); for (size_t r = 0; r < numRows; r++){ *valuesRes = AggAll>::aggArray( - arg->getValues(r), + arg->getRowValues(r), arg->getNumNonZeros(r), numCols, func, diff --git a/src/runtime/local/kernels/CMakeLists.txt b/src/runtime/local/kernels/CMakeLists.txt index 0996ca58c..4d87da88e 100644 --- a/src/runtime/local/kernels/CMakeLists.txt +++ b/src/runtime/local/kernels/CMakeLists.txt @@ -91,6 +91,8 @@ set(HEADERS_cpp_kernels set(SOURCES_cpp_kernels ${PREFIX}/MatMul.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/instrumentation/KernelInstrumentation.cpp + ${PROJECT_SOURCE_DIR}/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h + ${PROJECT_SOURCE_DIR}/src/runtime/local/kernels/DistributedPipeline.h ${PROJECT_BINARY_DIR}/src/runtime/local/kernels/kernels.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/kernels/CreateDaphneContext.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/kernels/Pooling.cpp @@ -99,6 +101,7 @@ set(SOURCES_cpp_kernels ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/MTWrapper_sparse.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/Tasks.cpp ${PROJECT_SOURCE_DIR}/src/runtime/local/vectorized/WorkerCPU.h + ) # The library of pre-compiled kernels. Will be linked into the JIT-compiled user program. add_library(AllKernels SHARED ${SOURCES_cpp_kernels} ${HEADERS_cpp_kernels}) diff --git a/src/runtime/local/kernels/CheckEqApprox.h b/src/runtime/local/kernels/CheckEqApprox.h index 370a85313..afda7f7aa 100644 --- a/src/runtime/local/kernels/CheckEqApprox.h +++ b/src/runtime/local/kernels/CheckEqApprox.h @@ -141,8 +141,8 @@ struct CheckEqApprox> { return false; for(size_t r = 0; r < numRows; r++){ - const VT * valuesLhs = lhs->getValues(r); - const VT * valuesRhs = rhs->getValues(r); + const VT * valuesLhs = lhs->getRowValues(r); + const VT * valuesRhs = rhs->getRowValues(r); const size_t nnzElementsLhs= lhs->getNumNonZeros(r); const size_t nnzElementsRhs= rhs->getNumNonZeros(r); if (nnzElementsLhs!=nnzElementsRhs) diff --git a/src/runtime/local/kernels/DiagMatrix.h b/src/runtime/local/kernels/DiagMatrix.h index 17800fdd0..c8ae87970 100644 --- a/src/runtime/local/kernels/DiagMatrix.h +++ b/src/runtime/local/kernels/DiagMatrix.h @@ -138,7 +138,7 @@ struct DiagMatrix, CSRMatrix> { for(size_t r = 0, pos = 0; r < numRowsCols; r++) { if (arg->getNumNonZeros(r)) { - valuesRes[pos] = *(arg->getValues(r)); + valuesRes[pos] = *(arg->getRowValues(r)); colIdxsRes[pos++] = r; } rowOffsetsRes[r + 1] = pos; diff --git a/src/runtime/local/kernels/EwBinaryMat.h b/src/runtime/local/kernels/EwBinaryMat.h index 070fdec75..52047f833 100644 --- a/src/runtime/local/kernels/EwBinaryMat.h +++ b/src/runtime/local/kernels/EwBinaryMat.h @@ -149,12 +149,12 @@ struct EwBinaryMat, CSRMatrix, CSRMatrix> { size_t nnzRowRhs = rhs->getNumNonZeros(rowIdx); if(nnzRowLhs && nnzRowRhs) { // merge within row - const VT * valuesRowLhs = lhs->getValues(rowIdx); - const VT * valuesRowRhs = rhs->getValues(rowIdx); - VT * valuesRowRes = res->getValues(rowIdx); - const size_t * colIdxsRowLhs = lhs->getColIdxs(rowIdx); - const size_t * colIdxsRowRhs = rhs->getColIdxs(rowIdx); - size_t * colIdxsRowRes = res->getColIdxs(rowIdx); + const VT * valuesRowLhs = lhs->getRowValues(rowIdx); + const VT * valuesRowRhs = rhs->getRowValues(rowIdx); + VT * valuesRowRes = res->getRowValues(rowIdx); + const size_t * colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + const size_t * colIdxsRowRhs = rhs->getColIdxsOfRow(rowIdx); + size_t * colIdxsRowRes = res->getColIdxsOfRow(rowIdx); size_t posLhs = 0; size_t posRhs = 0; size_t posRes = 0; @@ -190,14 +190,14 @@ struct EwBinaryMat, CSRMatrix, CSRMatrix> { } else if(nnzRowLhs) { // copy from left - memcpy(res->getValues(rowIdx), lhs->getValues(rowIdx), nnzRowLhs * sizeof(VT)); - memcpy(res->getColIdxs(rowIdx), lhs->getColIdxs(rowIdx), nnzRowLhs * sizeof(size_t)); + memcpy(res->getRowValues(rowIdx), lhs->getRowValues(rowIdx), nnzRowLhs * sizeof(VT)); + memcpy(res->getColIdxsOfRow(rowIdx), lhs->getColIdxsOfRow(rowIdx), nnzRowLhs * sizeof(size_t)); rowOffsetsRes[rowIdx + 1] = rowOffsetsRes[rowIdx] + nnzRowLhs; } else if(nnzRowRhs) { // copy from right - memcpy(res->getValues(rowIdx), rhs->getValues(rowIdx), nnzRowRhs * sizeof(VT)); - memcpy(res->getColIdxs(rowIdx), rhs->getColIdxs(rowIdx), nnzRowRhs * sizeof(size_t)); + memcpy(res->getRowValues(rowIdx), rhs->getRowValues(rowIdx), nnzRowRhs * sizeof(VT)); + memcpy(res->getColIdxsOfRow(rowIdx), rhs->getColIdxsOfRow(rowIdx), nnzRowRhs * sizeof(size_t)); rowOffsetsRes[rowIdx + 1] = rowOffsetsRes[rowIdx] + nnzRowRhs; } else @@ -212,12 +212,12 @@ struct EwBinaryMat, CSRMatrix, CSRMatrix> { size_t nnzRowRhs = rhs->getNumNonZeros(rowIdx); if(nnzRowLhs && nnzRowRhs) { // intersect within row - const VT * valuesRowLhs = lhs->getValues(rowIdx); - const VT * valuesRowRhs = rhs->getValues(rowIdx); - VT * valuesRowRes = res->getValues(rowIdx); - const size_t * colIdxsRowLhs = lhs->getColIdxs(rowIdx); - const size_t * colIdxsRowRhs = rhs->getColIdxs(rowIdx); - size_t * colIdxsRowRes = res->getColIdxs(rowIdx); + const VT * valuesRowLhs = lhs->getRowValues(rowIdx); + const VT * valuesRowRhs = rhs->getRowValues(rowIdx); + VT * valuesRowRes = res->getRowValues(rowIdx); + const size_t * colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + const size_t * colIdxsRowRhs = rhs->getColIdxsOfRow(rowIdx); + size_t * colIdxsRowRes = res->getColIdxsOfRow(rowIdx); size_t posLhs = 0; size_t posRhs = 0; size_t posRes = 0; @@ -288,10 +288,10 @@ struct EwBinaryMat, CSRMatrix, DenseMatrix> { size_t nnzRowLhs = lhs->getNumNonZeros(rowIdx); if(nnzRowLhs) { // intersect within row - const VT * valuesRowLhs = lhs->getValues(rowIdx); - VT * valuesRowRes = res->getValues(rowIdx); - const size_t * colIdxsRowLhs = lhs->getColIdxs(rowIdx); - size_t * colIdxsRowRes = res->getColIdxs(rowIdx); + const VT * valuesRowLhs = lhs->getRowValues(rowIdx); + VT * valuesRowRes = res->getRowValues(rowIdx); + const size_t * colIdxsRowLhs = lhs->getColIdxsOfRow(rowIdx); + size_t * colIdxsRowRes = res->getColIdxsOfRow(rowIdx); auto rhsRow = (rhs->getNumRows() == 1 ? 0 : rowIdx); size_t posRes = 0; for (size_t posLhs = 0; posLhs < nnzRowLhs; ++posLhs) { diff --git a/src/runtime/local/kernels/Gemv.h b/src/runtime/local/kernels/Gemv.h index 20a4f6683..601fb305d 100644 --- a/src/runtime/local/kernels/Gemv.h +++ b/src/runtime/local/kernels/Gemv.h @@ -20,6 +20,8 @@ #include #include #include +#include + #include @@ -125,8 +127,8 @@ struct Gemv, CSRMatrix, DenseMatrix> { memset(valuesRes, VT(0), sizeof(VT) * nr1 * nc2); for(size_t r = 0; r < nr1; r++) { const size_t rowNumNonZeros = mat->getNumNonZeros(r); - const size_t * rowColIdxs = mat->getColIdxs(r); - const VT * rowValues = mat->getValues(r); + const size_t * rowColIdxs = mat->getColIdxsOfRow(r); + const VT * rowValues = mat->getRowValues(r); const size_t rowIdxRes = r * rowSkipRes; for(size_t i = 0; i < rowNumNonZeros; i++) { diff --git a/src/runtime/local/kernels/HasSpecialValue.h b/src/runtime/local/kernels/HasSpecialValue.h index 0e666d35f..20fb4bb0c 100644 --- a/src/runtime/local/kernels/HasSpecialValue.h +++ b/src/runtime/local/kernels/HasSpecialValue.h @@ -99,8 +99,8 @@ template struct HasSpecialValue, auto numCols = arg->getNumCols(); auto numNonZeros = arg->getNumNonZeros(); auto numElements = numRows*numCols; - auto vBegin = arg->getValues(0); - auto vEnd = arg->getValues(numRows); + auto vBegin = arg->getRowValues(0); + auto vEnd = arg->getRowValues(numRows); auto hasZeroes = numNonZeros < numElements; auto zero = VT(0); diff --git a/src/runtime/local/kernels/IsSymmetric.h b/src/runtime/local/kernels/IsSymmetric.h index b6e0c2952..8e8b27400 100644 --- a/src/runtime/local/kernels/IsSymmetric.h +++ b/src/runtime/local/kernels/IsSymmetric.h @@ -105,8 +105,8 @@ template struct IsSymmetric> { for (size_t rowIdx = 0; rowIdx < numRows; rowIdx++) { - const VT* rowA = arg->getValues(rowIdx); - const size_t* colIdxsA = arg->getColIdxs(rowIdx); + const VT* rowA = arg->getRowValues(rowIdx); + const size_t* colIdxsA = arg->getColIdxsOfRow(rowIdx); const size_t numNonZerosA = arg->getNumNonZeros(rowIdx); for (size_t idx = 0; idx < numNonZerosA; idx++) { @@ -120,8 +120,8 @@ template struct IsSymmetric> { VT valA = rowA[idx]; // B references the transposed element to compare for symmetry. - const VT* rowB = arg->getValues(colIdxA); - const size_t* colIdxsB = arg->getColIdxs(colIdxA); + const VT* rowB = arg->getRowValues(colIdxA); + const size_t* colIdxsB = arg->getColIdxsOfRow(colIdxA); const size_t numNonZerosB = arg->getNumNonZeros(colIdxA); positions[colIdxA]++; // colIdxA is rowIdxB diff --git a/src/runtime/local/kernels/MatMul.h b/src/runtime/local/kernels/MatMul.h index 19f4129ce..738beb1a5 100644 --- a/src/runtime/local/kernels/MatMul.h +++ b/src/runtime/local/kernels/MatMul.h @@ -80,8 +80,8 @@ struct MatMul, CSRMatrix, DenseMatrix> { memset(valuesRes, VT(0), sizeof(VT) * nr1 * nc2); for(size_t r = 0; r < nr1; r++) { const size_t rowNumNonZeros = lhs->getNumNonZeros(r); - const size_t * rowColIdxs = lhs->getColIdxs(r); - const VT * rowValues = lhs->getValues(r); + const size_t * rowColIdxs = lhs->getColIdxsOfRow(r); + const VT * rowValues = lhs->getRowValues(r); const size_t rowIdxRes = r * rowSkipRes; for(size_t i = 0; i < rowNumNonZeros; i++) { diff --git a/src/runtime/local/kernels/NumDistinctApprox.h b/src/runtime/local/kernels/NumDistinctApprox.h index 6dcf70ab2..39ff79b5b 100644 --- a/src/runtime/local/kernels/NumDistinctApprox.h +++ b/src/runtime/local/kernels/NumDistinctApprox.h @@ -125,7 +125,7 @@ template struct NumDistinctApprox> { } for(size_t rowIdx = 0; rowIdx < numRows; rowIdx++) { - const VT* values = arg->getValues(rowIdx); + const VT* values = arg->getRowValues(rowIdx); const size_t numNonZerosInRow = arg->getNumNonZeros(rowIdx); for(size_t colIdx = 0; colIdx < numNonZerosInRow; colIdx++) { diff --git a/src/runtime/local/kernels/Replace.h b/src/runtime/local/kernels/Replace.h index 743c9928f..d12aa75f0 100644 --- a/src/runtime/local/kernels/Replace.h +++ b/src/runtime/local/kernels/Replace.h @@ -195,8 +195,8 @@ struct Replace, CSRMatrix, VT> { //--------main logic -------------------------- if(pattern!=pattern){ // pattern is NaN for(size_t r = 0; r < numRows; r++){ - const VT * allValues = arg->getValues(r); - VT * allUpdatedValues = res->getValues(r); + const VT * allValues = arg->getRowValues(r); + VT * allUpdatedValues = res->getRowValues(r); const size_t nnzElementsRes= arg->getNumNonZeros(r); for(size_t c = 0; c < nnzElementsRes; c++){ if(allValues[c]!=allValues[c]){ @@ -207,8 +207,8 @@ struct Replace, CSRMatrix, VT> { } else{ for(size_t r = 0; r < numRows; r++){ - const VT * allValues = arg->getValues(r); - VT * allUpdatedValues = res->getValues(r); + const VT * allValues = arg->getRowValues(r); + VT * allUpdatedValues = res->getRowValues(r); const size_t nnzElementsRes= arg->getNumNonZeros(r); for(size_t c = 0; c < nnzElementsRes; c++){ if(allValues[c]==pattern){ diff --git a/src/runtime/local/kernels/Reshape.h b/src/runtime/local/kernels/Reshape.h index d15fe7636..8764a0bbc 100644 --- a/src/runtime/local/kernels/Reshape.h +++ b/src/runtime/local/kernels/Reshape.h @@ -58,8 +58,9 @@ struct Reshape, DenseMatrix> { if(numRows * numCols != arg->getNumRows() * arg->getNumCols()) throw std::runtime_error("reshape must retain the number of cells"); - if(arg->getRowSkip() == arg->getNumCols() && res == nullptr) - res = DataObjectFactory::create>(numRows, numCols, arg->getValuesSharedPtr()); + if(arg->getRowSkip() == arg->getNumCols() && arg->isView() == false) { + res = DataObjectFactory::create>(numRows, numCols, arg); + } else { if(res == nullptr) res = DataObjectFactory::create>(numRows, numCols, false); diff --git a/src/runtime/local/kernels/Tri.h b/src/runtime/local/kernels/Tri.h index 9ff9a08f9..db67976ca 100644 --- a/src/runtime/local/kernels/Tri.h +++ b/src/runtime/local/kernels/Tri.h @@ -131,8 +131,8 @@ struct Tri> { rowOffsetsRes[0] = 0; for(size_t r = 0, pos = 0; r < numRows; r++, (*inc)++) { const size_t rowNumNonZeros = arg->getNumNonZeros(r); - const size_t * rowColIdxs = arg->getColIdxs(r); - const VT * rowValues = arg->getValues(r); + const size_t * rowColIdxs = arg->getColIdxsOfRow(r); + const VT * rowValues = arg->getRowValues(r); for(size_t i = 0; i < rowNumNonZeros; i++) { const size_t c = rowColIdxs[i]; diff --git a/src/runtime/local/kernels/kernels.json b/src/runtime/local/kernels/kernels.json index 8ac24bfda..a0adb4cd2 100644 --- a/src/runtime/local/kernels/kernels.json +++ b/src/runtime/local/kernels/kernels.json @@ -2396,6 +2396,7 @@ [["DenseMatrix", "int64_t"], "int64_t"], [["DenseMatrix", "uint8_t"], "uint8_t"], [["CSRMatrix", "double"], "double"], + [["CSRMatrix", "float"], "float"], [["CSRMatrix", "int64_t"], "int64_t"] ] }, @@ -2503,6 +2504,7 @@ [["DenseMatrix", "uint8_t"]], [["CSRMatrix", "double"]], [["CSRMatrix", "float"]], + [["CSRMatrix", "int64_t"]], ["Frame"] ] }, diff --git a/test/runtime/local/datastructures/DenseMatrixTest.cpp b/test/runtime/local/datastructures/DenseMatrixTest.cpp index 6f07176cd..35ab61060 100644 --- a/test/runtime/local/datastructures/DenseMatrixTest.cpp +++ b/test/runtime/local/datastructures/DenseMatrixTest.cpp @@ -175,8 +175,8 @@ TEST_CASE("DenseMatrix sub-matrix works properly", TAG_DATASTRUCTURES) { const size_t numColsOrig = 7; const size_t numCellsOrig = numRowsOrig * numColsOrig; - DenseMatrix * mOrig = DataObjectFactory::create>(numRowsOrig, numColsOrig, true); - DenseMatrix * mSub = DataObjectFactory::create>(mOrig, 3, 5, 1, 4); + auto mOrig = DataObjectFactory::create>(numRowsOrig, numColsOrig, true); + auto mSub = DataObjectFactory::create>(mOrig, 3, 5, 1, 4); // Sub-matrix dimensions are as expected. CHECK(mSub->getNumRows() == 2); @@ -184,8 +184,8 @@ TEST_CASE("DenseMatrix sub-matrix works properly", TAG_DATASTRUCTURES) { CHECK(mSub->getRowSkip() == numColsOrig); // Sub-matrix shares data array with original. - ValueType * valuesOrig = mOrig->getValues(); - ValueType * valuesSub = mSub->getValues(); + ValueType* valuesOrig = mOrig->getValues(); + ValueType* valuesSub = mSub->getValues(); CHECK((valuesSub >= valuesOrig && valuesSub < valuesOrig + numCellsOrig)); valuesSub[0] = 123; CHECK(valuesOrig[3 * numColsOrig + 1] == 123);