From eee66245ceb981ecde31fc8eb14571902eb46700 Mon Sep 17 00:00:00 2001 From: Tamas Bela Feher Date: Wed, 11 Nov 2020 15:52:00 +0100 Subject: [PATCH 1/4] ENH Allow generic reductions for the map then reduce op --- cpp/include/raft/linalg/map_then_reduce.cuh | 70 ++++++++++++++++----- cpp/test/linalg/map_then_reduce.cu | 56 +++++++++++++++++ 2 files changed, 112 insertions(+), 14 deletions(-) diff --git a/cpp/include/raft/linalg/map_then_reduce.cuh b/cpp/include/raft/linalg/map_then_reduce.cuh index 1a6513b915..c937bea042 100644 --- a/cpp/include/raft/linalg/map_then_reduce.cuh +++ b/cpp/include/raft/linalg/map_then_reduce.cuh @@ -24,8 +24,10 @@ namespace raft { namespace linalg { +struct sum_tag {}; + template -__device__ void reduce(Type *out, const Type acc) { +__device__ void reduce(Type *out, const Type acc, sum_tag) { typedef cub::BlockReduce BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; Type tmp = BlockReduce(temp_storage).Sum(acc); @@ -34,10 +36,22 @@ __device__ void reduce(Type *out, const Type acc) { } } -template -__global__ void mapThenSumReduceKernel(Type *out, size_t len, MapOp map, - const Type *in, Args... args) { - Type acc = (Type)0; +template +__device__ void reduce(Type *out, const Type acc, ReduceLambda op) { + typedef cub::BlockReduce BlockReduce; + __shared__ typename BlockReduce::TempStorage temp_storage; + Type tmp = BlockReduce(temp_storage).Reduce(acc, op); + if (threadIdx.x == 0) { + raft::myAtomicReduce(out, tmp, op); + } +} + +template +__global__ void mapThenReduceKernel(Type *out, size_t len, Type neutral, + MapOp map, ReduceLambda op, const Type *in, + Args... args) { + Type acc = neutral; auto idx = (threadIdx.x + (blockIdx.x * blockDim.x)); if (idx < len) { @@ -46,16 +60,18 @@ __global__ void mapThenSumReduceKernel(Type *out, size_t len, MapOp map, __syncthreads(); - reduce(out, acc); + reduce(out, acc, op); } -template -void mapThenSumReduceImpl(Type *out, size_t len, MapOp map, cudaStream_t stream, - const Type *in, Args... args) { - CUDA_CHECK(cudaMemsetAsync(out, 0, sizeof(Type), stream)); +template +void mapThenReduceImpl(Type *out, size_t len, Type neutral, MapOp map, + ReduceLambda op, cudaStream_t stream, const Type *in, + Args... args) { + raft::update_device(out, &neutral, 1, stream); const int nblks = raft::ceildiv(len, (size_t)TPB); - mapThenSumReduceKernel - <<>>(out, len, map, in, args...); + mapThenReduceKernel + <<>>(out, len, neutral, map, op, in, args...); CUDA_CHECK(cudaPeekAtLastError()); } @@ -76,9 +92,35 @@ void mapThenSumReduceImpl(Type *out, size_t len, MapOp map, cudaStream_t stream, template void mapThenSumReduce(Type *out, size_t len, MapOp map, cudaStream_t stream, const Type *in, Args... args) { - mapThenSumReduceImpl(out, len, map, stream, in, - args...); + mapThenReduceImpl( + out, len, (Type)0, map, sum_tag(), stream, in, args...); } +/** + * @brief CUDA version of map and then generic reduction operation + * @tparam Type data-type upon which the math operation will be performed + * @tparam MapOp the device-lambda performing the actual map operation + * @tparam ReduceLambda the device-lambda performing the actual reduction + * @tparam TPB threads-per-block in the final kernel launched + * @tparam Args additional parameters + * @param out the output reduced value (assumed to be a device pointer) + * @param len number of elements in the input array + * @param neutral The neutral element of the reduction operation. For example: + * 0 for sum, 1 for multiply, +Inf for Min, -Inf for Max + * @param map the device-lambda + * @param op the reduction device lambda + * @param stream cuda-stream where to launch this kernel + * @param in the input array + * @param args additional input arrays + */ + +template +void mapThenReduce(Type *out, size_t len, Type neutral, MapOp map, + ReduceLambda op, cudaStream_t stream, const Type *in, + Args... args) { + mapThenReduceImpl( + out, len, neutral, map, op, stream, in, args...); +} }; // end namespace linalg }; // end namespace raft diff --git a/cpp/test/linalg/map_then_reduce.cu b/cpp/test/linalg/map_then_reduce.cu index adbb339de2..365c19ceaf 100644 --- a/cpp/test/linalg/map_then_reduce.cu +++ b/cpp/test/linalg/map_then_reduce.cu @@ -16,6 +16,7 @@ #include #include +#include #include #include #include "../test_utils.h" @@ -114,5 +115,60 @@ TEST_P(MapReduceTestD, Result) { INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestD, ::testing::ValuesIn(inputsd)); +template +class MapGenericReduceTest : public ::testing::Test { + protected: + MapGenericReduceTest() + : allocator(handle.get_device_allocator()), + input(allocator, handle.get_stream(), n), + output(allocator, handle.get_stream(), 1) { + CUDA_CHECK(cudaStreamCreate(&stream)); + handle.set_stream(stream); + initInput(input.data(), input.size(), stream); + } + + void TearDown() override { CUDA_CHECK(cudaStreamDestroy(stream)); } + + public: + void initInput(math_t *input, int n, cudaStream_t stream) { + raft::random::Rng r(137); + r.uniform(input, n, math_t(2), math_t(3), stream); + math_t val = 1; + raft::update_device(input + 42, &val, 1, stream); + val = 5; + raft::update_device(input + 337, &val, 1, stream); + } + + void testMin() { + auto op = [] __device__(math_t in) { return in; }; + const math_t neutral = std::numeric_limits::max(); + mapThenReduce(output.data(), input.size(), neutral, op, cub::Min(), stream, + input.data()); + EXPECT_TRUE( + raft::devArrMatch(math_t(1), output.data(), 1, raft::Compare())); + } + void testMax() { + auto op = [] __device__(math_t in) { return in; }; + const math_t neutral = std::numeric_limits::min(); + mapThenReduce(output.data(), input.size(), neutral, op, cub::Max(), stream, + input.data()); + EXPECT_TRUE( + raft::devArrMatch(math_t(5), output.data(), 1, raft::Compare())); + } + + protected: + int n = 1237; + raft::handle_t handle; + cudaStream_t stream; + std::shared_ptr allocator; + raft::mr::device::buffer input; + raft::mr::device::buffer output; +}; + +typedef ::testing::Types FloatTypes; + +TYPED_TEST_CASE(MapGenericReduceTest, FloatTypes); +TYPED_TEST(MapGenericReduceTest, min) { this->testMin(); } +TYPED_TEST(MapGenericReduceTest, max) { this->testMax(); } } // end namespace linalg } // end namespace raft From ce4b8518f0e536d977582349a25da0a9fbb2c67f Mon Sep 17 00:00:00 2001 From: Tamas Bela Feher Date: Wed, 11 Nov 2020 16:04:37 +0100 Subject: [PATCH 2/4] DOC Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd6d76720f..278fcfa0ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## New Features - PR #65: Adding cuml prims that break circular dependency between cuml and cumlprims projects - PR #93: Incorporate Date/Nagi implementation of Hungarian Algorithm +- PR #94: Allow generic reductions for the map then reduce op ## Improvements - PR #73: Move DistanceType enum from cuML to RAFT From 006b03b863f80b973f731c79e0b2724b749a5ae7 Mon Sep 17 00:00:00 2001 From: Tamas Bela Feher Date: Fri, 20 Nov 2020 19:39:47 +0100 Subject: [PATCH 3/4] Allow different input and output type for MapThenReduce --- cpp/include/raft/linalg/map_then_reduce.cuh | 64 ++++++++------- cpp/test/linalg/map_then_reduce.cu | 90 ++++++++++++--------- 2 files changed, 85 insertions(+), 69 deletions(-) diff --git a/cpp/include/raft/linalg/map_then_reduce.cuh b/cpp/include/raft/linalg/map_then_reduce.cuh index c937bea042..6e13374112 100644 --- a/cpp/include/raft/linalg/map_then_reduce.cuh +++ b/cpp/include/raft/linalg/map_then_reduce.cuh @@ -26,32 +26,32 @@ namespace linalg { struct sum_tag {}; -template -__device__ void reduce(Type *out, const Type acc, sum_tag) { - typedef cub::BlockReduce BlockReduce; +template +__device__ void reduce(OutType *out, const InType acc, sum_tag) { + typedef cub::BlockReduce BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; - Type tmp = BlockReduce(temp_storage).Sum(acc); + OutType tmp = BlockReduce(temp_storage).Sum(acc); if (threadIdx.x == 0) { raft::myAtomicAdd(out, tmp); } } -template -__device__ void reduce(Type *out, const Type acc, ReduceLambda op) { - typedef cub::BlockReduce BlockReduce; +template +__device__ void reduce(OutType *out, const InType acc, ReduceLambda op) { + typedef cub::BlockReduce BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; - Type tmp = BlockReduce(temp_storage).Reduce(acc, op); + OutType tmp = BlockReduce(temp_storage).Reduce(acc, op); if (threadIdx.x == 0) { raft::myAtomicReduce(out, tmp, op); } } -template -__global__ void mapThenReduceKernel(Type *out, size_t len, Type neutral, - MapOp map, ReduceLambda op, const Type *in, - Args... args) { - Type acc = neutral; +template +__global__ void mapThenReduceKernel(OutType *out, size_t len, InType neutral, + MapOp map, ReduceLambda op, + const InType *in, Args... args) { + InType acc = neutral; auto idx = (threadIdx.x + (blockIdx.x * blockDim.x)); if (idx < len) { @@ -60,17 +60,18 @@ __global__ void mapThenReduceKernel(Type *out, size_t len, Type neutral, __syncthreads(); - reduce(out, acc, op); + reduce(out, acc, op); } -template -void mapThenReduceImpl(Type *out, size_t len, Type neutral, MapOp map, - ReduceLambda op, cudaStream_t stream, const Type *in, +template +void mapThenReduceImpl(OutType *out, size_t len, InType neutral, MapOp map, + ReduceLambda op, cudaStream_t stream, const InType *in, Args... args) { - raft::update_device(out, &neutral, 1, stream); + OutType n = neutral; + raft::update_device(out, &n, 1, stream); const int nblks = raft::ceildiv(len, (size_t)TPB); - mapThenReduceKernel + mapThenReduceKernel <<>>(out, len, neutral, map, op, in, args...); CUDA_CHECK(cudaPeekAtLastError()); } @@ -89,11 +90,12 @@ void mapThenReduceImpl(Type *out, size_t len, Type neutral, MapOp map, * @param args additional input arrays */ -template -void mapThenSumReduce(Type *out, size_t len, MapOp map, cudaStream_t stream, - const Type *in, Args... args) { - mapThenReduceImpl( - out, len, (Type)0, map, sum_tag(), stream, in, args...); +template +void mapThenSumReduce(OutType *out, size_t len, MapOp map, cudaStream_t stream, + const InType *in, Args... args) { + mapThenReduceImpl( + out, len, (InType)0, map, sum_tag(), stream, in, args...); } /** @@ -114,12 +116,12 @@ void mapThenSumReduce(Type *out, size_t len, MapOp map, cudaStream_t stream, * @param args additional input arrays */ -template -void mapThenReduce(Type *out, size_t len, Type neutral, MapOp map, - ReduceLambda op, cudaStream_t stream, const Type *in, +template +void mapThenReduce(OutType *out, size_t len, InType neutral, MapOp map, + ReduceLambda op, cudaStream_t stream, const InType *in, Args... args) { - mapThenReduceImpl( + mapThenReduceImpl( out, len, neutral, map, op, stream, in, args...); } }; // end namespace linalg diff --git a/cpp/test/linalg/map_then_reduce.cu b/cpp/test/linalg/map_then_reduce.cu index 365c19ceaf..e282a1d63e 100644 --- a/cpp/test/linalg/map_then_reduce.cu +++ b/cpp/test/linalg/map_then_reduce.cu @@ -24,21 +24,21 @@ namespace raft { namespace linalg { -template -__global__ void naiveMapReduceKernel(Type *out, const Type *in, size_t len, +template +__global__ void naiveMapReduceKernel(OutType *out, const InType *in, size_t len, MapOp map) { int idx = threadIdx.x + blockIdx.x * blockDim.x; if (idx < len) { - raft::myAtomicAdd(out, map(in[idx])); + raft::myAtomicAdd(out, (OutType)map(in[idx])); } } -template -void naiveMapReduce(Type *out, const Type *in, size_t len, MapOp map, +template +void naiveMapReduce(OutType *out, const InType *in, size_t len, MapOp map, cudaStream_t stream) { static const int TPB = 64; int nblks = raft::ceildiv(len, (size_t)TPB); - naiveMapReduceKernel + naiveMapReduceKernel <<>>(out, in, len, map); CUDA_CHECK(cudaPeekAtLastError()); } @@ -58,19 +58,19 @@ template // Or else, we get the following compilation error // for an extended __device__ lambda cannot have private or protected access // within its class -template -void mapReduceLaunch(T *out_ref, T *out, const T *in, size_t len, - cudaStream_t stream) { - auto op = [] __device__(T in) { return in; }; +template +void mapReduceLaunch(OutType *out_ref, OutType *out, const InType *in, + size_t len, cudaStream_t stream) { + auto op = [] __device__(InType in) { return in; }; naiveMapReduce(out_ref, in, len, op, stream); mapThenSumReduce(out, len, op, 0, in); } -template -class MapReduceTest : public ::testing::TestWithParam> { +template +class MapReduceTest : public ::testing::TestWithParam> { protected: void SetUp() override { - params = ::testing::TestWithParam>::GetParam(); + params = ::testing::TestWithParam>::GetParam(); raft::random::Rng r(params.seed); auto len = params.len; @@ -79,7 +79,7 @@ class MapReduceTest : public ::testing::TestWithParam> { allocate(in, len); allocate(out_ref, len); allocate(out, len); - r.uniform(in, len, T(-1.0), T(1.0), stream); + r.uniform(in, len, InType(-1.0), InType(1.0), stream); mapReduceLaunch(out_ref, out, in, len, stream); CUDA_CHECK(cudaStreamDestroy(stream)); } @@ -91,32 +91,44 @@ class MapReduceTest : public ::testing::TestWithParam> { } protected: - MapReduceInputs params; - T *in, *out_ref, *out; + MapReduceInputs params; + InType *in; + OutType *out_ref, *out; }; const std::vector> inputsf = { {0.001f, 1024 * 1024, 1234ULL}}; -typedef MapReduceTest MapReduceTestF; -TEST_P(MapReduceTestF, Result) { +typedef MapReduceTest MapReduceTestFF; +TEST_P(MapReduceTestFF, Result) { ASSERT_TRUE(devArrMatch(out_ref, out, params.len, CompareApprox(params.tolerance))); } -INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestF, +INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestFF, + ::testing::ValuesIn(inputsf)); + +typedef MapReduceTest MapReduceTestFD; +TEST_P(MapReduceTestFD, Result) { + ASSERT_TRUE(devArrMatch(out_ref, out, params.len, + CompareApprox(params.tolerance))); +} +INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestFD, ::testing::ValuesIn(inputsf)); const std::vector> inputsd = { {0.000001, 1024 * 1024, 1234ULL}}; -typedef MapReduceTest MapReduceTestD; -TEST_P(MapReduceTestD, Result) { +typedef MapReduceTest MapReduceTestDD; +TEST_P(MapReduceTestDD, Result) { ASSERT_TRUE(devArrMatch(out_ref, out, params.len, CompareApprox(params.tolerance))); } -INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestD, +INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestDD, ::testing::ValuesIn(inputsd)); -template +template class MapGenericReduceTest : public ::testing::Test { + using InType = typename T::first_type; + using OutType = typename T::second_type; + protected: MapGenericReduceTest() : allocator(handle.get_device_allocator()), @@ -130,30 +142,30 @@ class MapGenericReduceTest : public ::testing::Test { void TearDown() override { CUDA_CHECK(cudaStreamDestroy(stream)); } public: - void initInput(math_t *input, int n, cudaStream_t stream) { + void initInput(InType *input, int n, cudaStream_t stream) { raft::random::Rng r(137); - r.uniform(input, n, math_t(2), math_t(3), stream); - math_t val = 1; + r.uniform(input, n, InType(2), InType(3), stream); + InType val = 1; raft::update_device(input + 42, &val, 1, stream); val = 5; raft::update_device(input + 337, &val, 1, stream); } void testMin() { - auto op = [] __device__(math_t in) { return in; }; - const math_t neutral = std::numeric_limits::max(); + auto op = [] __device__(InType in) { return in; }; + const InType neutral = std::numeric_limits::max(); mapThenReduce(output.data(), input.size(), neutral, op, cub::Min(), stream, input.data()); - EXPECT_TRUE( - raft::devArrMatch(math_t(1), output.data(), 1, raft::Compare())); + EXPECT_TRUE(raft::devArrMatch(OutType(1), output.data(), 1, + raft::Compare())); } void testMax() { - auto op = [] __device__(math_t in) { return in; }; - const math_t neutral = std::numeric_limits::min(); + auto op = [] __device__(InType in) { return in; }; + const InType neutral = std::numeric_limits::min(); mapThenReduce(output.data(), input.size(), neutral, op, cub::Max(), stream, input.data()); - EXPECT_TRUE( - raft::devArrMatch(math_t(5), output.data(), 1, raft::Compare())); + EXPECT_TRUE(raft::devArrMatch(OutType(5), output.data(), 1, + raft::Compare())); } protected: @@ -161,13 +173,15 @@ class MapGenericReduceTest : public ::testing::Test { raft::handle_t handle; cudaStream_t stream; std::shared_ptr allocator; - raft::mr::device::buffer input; - raft::mr::device::buffer output; + raft::mr::device::buffer input; + raft::mr::device::buffer output; }; -typedef ::testing::Types FloatTypes; +using IoTypePair = + ::testing::Types, std::pair, + std::pair>; -TYPED_TEST_CASE(MapGenericReduceTest, FloatTypes); +TYPED_TEST_CASE(MapGenericReduceTest, IoTypePair); TYPED_TEST(MapGenericReduceTest, min) { this->testMin(); } TYPED_TEST(MapGenericReduceTest, max) { this->testMax(); } } // end namespace linalg From 0d0f6174276f79e4d7787e1cb5ebb97c16f081b1 Mon Sep 17 00:00:00 2001 From: Tamas Bela Feher Date: Wed, 25 Nov 2020 11:42:41 +0100 Subject: [PATCH 4/4] Set accumulator and neutral type to OutType --- cpp/include/raft/linalg/map_then_reduce.cuh | 13 ++++++------- cpp/test/linalg/map_then_reduce.cu | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/cpp/include/raft/linalg/map_then_reduce.cuh b/cpp/include/raft/linalg/map_then_reduce.cuh index 6e13374112..f2f198670a 100644 --- a/cpp/include/raft/linalg/map_then_reduce.cuh +++ b/cpp/include/raft/linalg/map_then_reduce.cuh @@ -48,10 +48,10 @@ __device__ void reduce(OutType *out, const InType acc, ReduceLambda op) { template -__global__ void mapThenReduceKernel(OutType *out, size_t len, InType neutral, +__global__ void mapThenReduceKernel(OutType *out, size_t len, OutType neutral, MapOp map, ReduceLambda op, const InType *in, Args... args) { - InType acc = neutral; + OutType acc = neutral; auto idx = (threadIdx.x + (blockIdx.x * blockDim.x)); if (idx < len) { @@ -65,11 +65,10 @@ __global__ void mapThenReduceKernel(OutType *out, size_t len, InType neutral, template -void mapThenReduceImpl(OutType *out, size_t len, InType neutral, MapOp map, +void mapThenReduceImpl(OutType *out, size_t len, OutType neutral, MapOp map, ReduceLambda op, cudaStream_t stream, const InType *in, Args... args) { - OutType n = neutral; - raft::update_device(out, &n, 1, stream); + raft::update_device(out, &neutral, 1, stream); const int nblks = raft::ceildiv(len, (size_t)TPB); mapThenReduceKernel <<>>(out, len, neutral, map, op, in, args...); @@ -95,7 +94,7 @@ template ( - out, len, (InType)0, map, sum_tag(), stream, in, args...); + out, len, (OutType)0, map, sum_tag(), stream, in, args...); } /** @@ -118,7 +117,7 @@ void mapThenSumReduce(OutType *out, size_t len, MapOp map, cudaStream_t stream, template -void mapThenReduce(OutType *out, size_t len, InType neutral, MapOp map, +void mapThenReduce(OutType *out, size_t len, OutType neutral, MapOp map, ReduceLambda op, cudaStream_t stream, const InType *in, Args... args) { mapThenReduceImpl( diff --git a/cpp/test/linalg/map_then_reduce.cu b/cpp/test/linalg/map_then_reduce.cu index e282a1d63e..6e146fa4bb 100644 --- a/cpp/test/linalg/map_then_reduce.cu +++ b/cpp/test/linalg/map_then_reduce.cu @@ -153,7 +153,7 @@ class MapGenericReduceTest : public ::testing::Test { void testMin() { auto op = [] __device__(InType in) { return in; }; - const InType neutral = std::numeric_limits::max(); + const OutType neutral = std::numeric_limits::max(); mapThenReduce(output.data(), input.size(), neutral, op, cub::Min(), stream, input.data()); EXPECT_TRUE(raft::devArrMatch(OutType(1), output.data(), 1, @@ -161,7 +161,7 @@ class MapGenericReduceTest : public ::testing::Test { } void testMax() { auto op = [] __device__(InType in) { return in; }; - const InType neutral = std::numeric_limits::min(); + const OutType neutral = std::numeric_limits::min(); mapThenReduce(output.data(), input.size(), neutral, op, cub::Max(), stream, input.data()); EXPECT_TRUE(raft::devArrMatch(OutType(5), output.data(), 1,