Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Allow generic reductions for the map then reduce op #94

Merged
merged 5 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## New Features
- PR #65: Adding cuml prims that break circular dependency between cuml and cumlprims projects
- PR #93: Incorporate Date/Nagi implementation of Hungarian Algorithm
- PR #94: Allow generic reductions for the map then reduce op
- RP #95: Cholesky rank one update prim

## Improvements
Expand Down
83 changes: 63 additions & 20 deletions cpp/include/raft/linalg/map_then_reduce.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,34 @@
namespace raft {
namespace linalg {

template <typename Type, int TPB>
__device__ void reduce(Type *out, const Type acc) {
typedef cub::BlockReduce<Type, TPB> BlockReduce;
struct sum_tag {};

template <typename InType, typename OutType, int TPB>
__device__ void reduce(OutType *out, const InType acc, sum_tag) {
typedef cub::BlockReduce<InType, TPB> BlockReduce;
__shared__ typename BlockReduce::TempStorage temp_storage;
Type tmp = BlockReduce(temp_storage).Sum(acc);
OutType tmp = BlockReduce(temp_storage).Sum(acc);
if (threadIdx.x == 0) {
raft::myAtomicAdd(out, tmp);
}
}

template <typename Type, typename MapOp, int TPB, typename... Args>
__global__ void mapThenSumReduceKernel(Type *out, size_t len, MapOp map,
const Type *in, Args... args) {
Type acc = (Type)0;
template <typename InType, typename OutType, int TPB, typename ReduceLambda>
__device__ void reduce(OutType *out, const InType acc, ReduceLambda op) {
typedef cub::BlockReduce<InType, TPB> BlockReduce;
__shared__ typename BlockReduce::TempStorage temp_storage;
OutType tmp = BlockReduce(temp_storage).Reduce(acc, op);
if (threadIdx.x == 0) {
raft::myAtomicReduce(out, tmp, op);
}
}

template <typename InType, typename OutType, typename MapOp,
typename ReduceLambda, int TPB, typename... Args>
__global__ void mapThenReduceKernel(OutType *out, size_t len, OutType neutral,
MapOp map, ReduceLambda op,
const InType *in, Args... args) {
OutType acc = neutral;
auto idx = (threadIdx.x + (blockIdx.x * blockDim.x));

if (idx < len) {
Expand All @@ -46,16 +60,18 @@ __global__ void mapThenSumReduceKernel(Type *out, size_t len, MapOp map,

__syncthreads();

reduce<Type, TPB>(out, acc);
reduce<InType, OutType, TPB>(out, acc, op);
}

template <typename Type, typename MapOp, int TPB, typename... Args>
void mapThenSumReduceImpl(Type *out, size_t len, MapOp map, cudaStream_t stream,
const Type *in, Args... args) {
CUDA_CHECK(cudaMemsetAsync(out, 0, sizeof(Type), stream));
template <typename InType, typename OutType, typename MapOp,
typename ReduceLambda, int TPB, typename... Args>
void mapThenReduceImpl(OutType *out, size_t len, OutType neutral, MapOp map,
ReduceLambda op, cudaStream_t stream, const InType *in,
Args... args) {
raft::update_device(out, &neutral, 1, stream);
const int nblks = raft::ceildiv(len, (size_t)TPB);
mapThenSumReduceKernel<Type, MapOp, TPB, Args...>
<<<nblks, TPB, 0, stream>>>(out, len, map, in, args...);
mapThenReduceKernel<InType, OutType, MapOp, ReduceLambda, TPB, Args...>
<<<nblks, TPB, 0, stream>>>(out, len, neutral, map, op, in, args...);
CUDA_CHECK(cudaPeekAtLastError());
}

Expand All @@ -73,12 +89,39 @@ void mapThenSumReduceImpl(Type *out, size_t len, MapOp map, cudaStream_t stream,
* @param args additional input arrays
*/

template <typename Type, typename MapOp, int TPB = 256, typename... Args>
void mapThenSumReduce(Type *out, size_t len, MapOp map, cudaStream_t stream,
const Type *in, Args... args) {
mapThenSumReduceImpl<Type, MapOp, TPB, Args...>(out, len, map, stream, in,
args...);
template <typename InType, typename MapOp, int TPB = 256, typename... Args,
typename OutType = InType>
void mapThenSumReduce(OutType *out, size_t len, MapOp map, cudaStream_t stream,
const InType *in, Args... args) {
mapThenReduceImpl<InType, OutType, MapOp, sum_tag, TPB, Args...>(
out, len, (OutType)0, map, sum_tag(), stream, in, args...);
}

/**
* @brief CUDA version of map and then generic reduction operation
* @tparam Type data-type upon which the math operation will be performed
* @tparam MapOp the device-lambda performing the actual map operation
* @tparam ReduceLambda the device-lambda performing the actual reduction
* @tparam TPB threads-per-block in the final kernel launched
* @tparam Args additional parameters
* @param out the output reduced value (assumed to be a device pointer)
* @param len number of elements in the input array
* @param neutral The neutral element of the reduction operation. For example:
* 0 for sum, 1 for multiply, +Inf for Min, -Inf for Max
* @param map the device-lambda
* @param op the reduction device lambda
* @param stream cuda-stream where to launch this kernel
* @param in the input array
* @param args additional input arrays
*/

template <typename InType, typename MapOp, typename ReduceLambda, int TPB = 256,
typename OutType = InType, typename... Args>
void mapThenReduce(OutType *out, size_t len, OutType neutral, MapOp map,
ReduceLambda op, cudaStream_t stream, const InType *in,
Args... args) {
mapThenReduceImpl<InType, OutType, MapOp, ReduceLambda, TPB, Args...>(
out, len, neutral, map, op, stream, in, args...);
}
}; // end namespace linalg
}; // end namespace raft
114 changes: 92 additions & 22 deletions cpp/test/linalg/map_then_reduce.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,29 @@

#include <gtest/gtest.h>
#include <raft/cudart_utils.h>
#include <limits>
#include <raft/linalg/map_then_reduce.cuh>
#include <raft/random/rng.cuh>
#include "../test_utils.h"

namespace raft {
namespace linalg {

template <typename Type, typename MapOp>
__global__ void naiveMapReduceKernel(Type *out, const Type *in, size_t len,
template <typename InType, typename OutType, typename MapOp>
__global__ void naiveMapReduceKernel(OutType *out, const InType *in, size_t len,
MapOp map) {
int idx = threadIdx.x + blockIdx.x * blockDim.x;
if (idx < len) {
raft::myAtomicAdd(out, map(in[idx]));
raft::myAtomicAdd(out, (OutType)map(in[idx]));
}
}

template <typename Type, typename MapOp>
void naiveMapReduce(Type *out, const Type *in, size_t len, MapOp map,
template <typename InType, typename OutType, typename MapOp>
void naiveMapReduce(OutType *out, const InType *in, size_t len, MapOp map,
cudaStream_t stream) {
static const int TPB = 64;
int nblks = raft::ceildiv(len, (size_t)TPB);
naiveMapReduceKernel<Type, MapOp>
naiveMapReduceKernel<InType, OutType, MapOp>
<<<nblks, TPB, 0, stream>>>(out, in, len, map);
CUDA_CHECK(cudaPeekAtLastError());
}
Expand All @@ -57,19 +58,19 @@ template <typename T>
// Or else, we get the following compilation error
// for an extended __device__ lambda cannot have private or protected access
// within its class
template <typename T>
void mapReduceLaunch(T *out_ref, T *out, const T *in, size_t len,
cudaStream_t stream) {
auto op = [] __device__(T in) { return in; };
template <typename InType, typename OutType>
void mapReduceLaunch(OutType *out_ref, OutType *out, const InType *in,
size_t len, cudaStream_t stream) {
auto op = [] __device__(InType in) { return in; };
naiveMapReduce(out_ref, in, len, op, stream);
mapThenSumReduce(out, len, op, 0, in);
}

template <typename T>
class MapReduceTest : public ::testing::TestWithParam<MapReduceInputs<T>> {
template <typename InType, typename OutType>
class MapReduceTest : public ::testing::TestWithParam<MapReduceInputs<InType>> {
protected:
void SetUp() override {
params = ::testing::TestWithParam<MapReduceInputs<T>>::GetParam();
params = ::testing::TestWithParam<MapReduceInputs<InType>>::GetParam();
raft::random::Rng r(params.seed);
auto len = params.len;

Expand All @@ -78,7 +79,7 @@ class MapReduceTest : public ::testing::TestWithParam<MapReduceInputs<T>> {
allocate(in, len);
allocate(out_ref, len);
allocate(out, len);
r.uniform(in, len, T(-1.0), T(1.0), stream);
r.uniform(in, len, InType(-1.0), InType(1.0), stream);
mapReduceLaunch(out_ref, out, in, len, stream);
CUDA_CHECK(cudaStreamDestroy(stream));
}
Expand All @@ -90,29 +91,98 @@ class MapReduceTest : public ::testing::TestWithParam<MapReduceInputs<T>> {
}

protected:
MapReduceInputs<T> params;
T *in, *out_ref, *out;
MapReduceInputs<InType> params;
InType *in;
OutType *out_ref, *out;
};

const std::vector<MapReduceInputs<float>> inputsf = {
{0.001f, 1024 * 1024, 1234ULL}};
typedef MapReduceTest<float> MapReduceTestF;
TEST_P(MapReduceTestF, Result) {
typedef MapReduceTest<float, float> MapReduceTestFF;
TEST_P(MapReduceTestFF, Result) {
ASSERT_TRUE(devArrMatch(out_ref, out, params.len,
CompareApprox<float>(params.tolerance)));
}
INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestF,
INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestFF,
::testing::ValuesIn(inputsf));

typedef MapReduceTest<float, double> MapReduceTestFD;
TEST_P(MapReduceTestFD, Result) {
ASSERT_TRUE(devArrMatch(out_ref, out, params.len,
CompareApprox<double>(params.tolerance)));
}
INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestFD,
::testing::ValuesIn(inputsf));

const std::vector<MapReduceInputs<double>> inputsd = {
{0.000001, 1024 * 1024, 1234ULL}};
typedef MapReduceTest<double> MapReduceTestD;
TEST_P(MapReduceTestD, Result) {
typedef MapReduceTest<double, double> MapReduceTestDD;
TEST_P(MapReduceTestDD, Result) {
ASSERT_TRUE(devArrMatch(out_ref, out, params.len,
CompareApprox<double>(params.tolerance)));
}
INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestD,
INSTANTIATE_TEST_SUITE_P(MapReduceTests, MapReduceTestDD,
::testing::ValuesIn(inputsd));

template <typename T>
class MapGenericReduceTest : public ::testing::Test {
using InType = typename T::first_type;
using OutType = typename T::second_type;

protected:
MapGenericReduceTest()
: allocator(handle.get_device_allocator()),
input(allocator, handle.get_stream(), n),
output(allocator, handle.get_stream(), 1) {
CUDA_CHECK(cudaStreamCreate(&stream));
handle.set_stream(stream);
initInput(input.data(), input.size(), stream);
}

void TearDown() override { CUDA_CHECK(cudaStreamDestroy(stream)); }

public:
void initInput(InType *input, int n, cudaStream_t stream) {
raft::random::Rng r(137);
r.uniform(input, n, InType(2), InType(3), stream);
InType val = 1;
raft::update_device(input + 42, &val, 1, stream);
val = 5;
raft::update_device(input + 337, &val, 1, stream);
}

void testMin() {
auto op = [] __device__(InType in) { return in; };
const OutType neutral = std::numeric_limits<InType>::max();
mapThenReduce(output.data(), input.size(), neutral, op, cub::Min(), stream,
input.data());
EXPECT_TRUE(raft::devArrMatch(OutType(1), output.data(), 1,
raft::Compare<OutType>()));
}
void testMax() {
auto op = [] __device__(InType in) { return in; };
const OutType neutral = std::numeric_limits<InType>::min();
mapThenReduce(output.data(), input.size(), neutral, op, cub::Max(), stream,
input.data());
EXPECT_TRUE(raft::devArrMatch(OutType(5), output.data(), 1,
raft::Compare<OutType>()));
}

protected:
int n = 1237;
raft::handle_t handle;
cudaStream_t stream;
std::shared_ptr<raft::mr::device::allocator> allocator;
raft::mr::device::buffer<InType> input;
raft::mr::device::buffer<OutType> output;
};

using IoTypePair =
::testing::Types<std::pair<float, float>, std::pair<float, double>,
std::pair<double, double>>;

TYPED_TEST_CASE(MapGenericReduceTest, IoTypePair);
TYPED_TEST(MapGenericReduceTest, min) { this->testMin(); }
TYPED_TEST(MapGenericReduceTest, max) { this->testMax(); }
} // end namespace linalg
} // end namespace raft