Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor distinct using static_map insert_or_apply #16484

Open
wants to merge 10 commits into
base: branch-24.10
Choose a base branch
from
167 changes: 165 additions & 2 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/resource_ref.hpp>

#include <cuco/static_map.cuh>
PointKernel marked this conversation as resolved.
Show resolved Hide resolved

#include <functional>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -72,8 +75,146 @@ rmm::device_uvector<cudf::size_type> dipatch_row_equal(
return func(d_equal);
}
}

struct plus_op {
template <typename T, cuda::thread_scope Scope>
__device__ void operator()(cuda::atomic_ref<T, Scope> ref, T val)
{
ref.fetch_add(1, cuda::memory_order_relaxed);
}
};

struct min_op {
template <typename T, cuda::thread_scope Scope>
__device__ void operator()(cuda::atomic_ref<T, Scope> ref, T val)
{
ref.fetch_min(val, cuda::memory_order_relaxed);
}
};

struct max_op {
template <typename T, cuda::thread_scope Scope>
__device__ void operator()(cuda::atomic_ref<T, Scope> ref, T val)
{
ref.fetch_max(val, cuda::memory_order_relaxed);
}
};

template <typename Map>
rmm::device_uvector<size_type> process_keep(Map& map,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if ((keep == duplicate_keep_option::KEEP_FIRST) or (keep == duplicate_keep_option::KEEP_LAST)) {
auto pairs = thrust::make_transform_iterator(
thrust::counting_iterator<size_type>(0),
cuda::proclaim_return_type<cuco::pair<size_type, size_type>>([] __device__(size_type i) {
return cuco::pair<size_type, size_type>{i, i};
}));

if (keep == duplicate_keep_option::KEEP_FIRST) {
map.insert_or_apply(pairs, pairs + num_rows, min_op{}, stream.value());
} else {
map.insert_or_apply(pairs, pairs + num_rows, max_op{}, stream.value());
}
int map_size = map.size(stream.value());
auto keys = rmm::device_uvector<size_type>(map_size, stream, mr);
auto values = rmm::device_uvector<size_type>(map_size, stream, mr);

map.retrieve_all(keys.begin(), values.begin(), stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a discard iterator for the keys? I don't think we need to materialize them since they're not returned.

return values;
}

auto pairs = thrust::make_transform_iterator(
thrust::counting_iterator<size_type>(0),
cuda::proclaim_return_type<cuco::pair<size_type, size_type>>([] __device__(size_type i) {
return cuco::pair<size_type, size_type>{i, 1};
}));

auto plusop = plus_op{};
map.insert_or_apply(pairs, pairs + num_rows, plusop, stream.value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto plusop = plus_op{};
map.insert_or_apply(pairs, pairs + num_rows, plusop, stream.value());
map.insert_or_apply(pairs, pairs + num_rows, plus_op{}, stream.value());


int map_size = map.size(stream.value());
auto keys = rmm::device_uvector<size_type>(map_size, stream, mr);
auto values = rmm::device_uvector<size_type>(map_size, stream, mr);
map.retrieve_all(keys.begin(), values.begin(), stream.value());

auto output_indices = rmm::device_uvector<size_type>(map_size, stream, mr);
auto output_indices_filtered = rmm::device_uvector<size_type>(map_size, stream, mr);

thrust::for_each(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(map_size),
[values = values.begin(),
keys = keys.begin(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels more natural to order these as keys, then values.

Suggested change
[values = values.begin(),
keys = keys.begin(),
[keys = keys.begin(),
values = values.begin(),

output_indices = output_indices.begin()] __device__(size_type const idx) mutable {
if (values[idx] == size_type{1}) {
output_indices[idx] = keys[idx];
} else {
output_indices[idx] = -1;
}
});

auto const map_end = thrust::copy_if(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for_each above is unnecessary afaict. I think we can use a single copy_if with a counting transform iterator that checks the value for a given index, and copies the key for that index if so. That avoids materializing the intermediate output_indices.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

rmm::exec_policy(stream),
output_indices.begin(),
output_indices.end(),
output_indices_filtered.begin(),
cuda::proclaim_return_type<bool>([] __device__(auto const idx) { return idx != -1; }));

output_indices_filtered.resize(thrust::distance(output_indices_filtered.begin(), map_end),
stream);
return output_indices_filtered;
}

} // namespace

/**
* @brief Return the reduction identity used to initialize results of `hash_reduce_by_row`.
*
* @param keep A value of `duplicate_keep_option` type, must not be `KEEP_ANY`.
* @return The initial reduction value.
*/
auto constexpr reduction_init_value(duplicate_keep_option keep)
{
switch (keep) {
case duplicate_keep_option::KEEP_FIRST: return std::numeric_limits<size_type>::max();
case duplicate_keep_option::KEEP_LAST: return std::numeric_limits<size_type>::min();
case duplicate_keep_option::KEEP_NONE: return size_type{0};
default: CUDF_UNREACHABLE("This function should not be called with KEEP_ANY");
}
}

template <typename RowHasher>
using hash_set_type =
cuco::static_set<size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
RowHasher,
cuco::linear_probing<1,
cudf::experimental::row::hash::device_row_hasher<
cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>>,
cudf::detail::cuco_allocator,
cuco::storage<1>>;

template <typename RowHasher>
using hash_map_type =
cuco::static_map<size_type,
size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
RowHasher,
cuco::linear_probing<1,
cudf::experimental::row::hash::device_row_hasher<
cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>>,
cudf::detail::cuco_allocator,
cuco::storage<1>>;

rmm::device_uvector<size_type> distinct_indices(table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
Expand All @@ -97,16 +238,38 @@ rmm::device_uvector<size_type> distinct_indices(table_view const& input,

auto const helper_func = [&](auto const& d_equal) {
using RowHasher = std::decay_t<decltype(d_equal)>;
auto set = hash_set_type<RowHasher>{num_rows,
// If we don't care about order, just gather indices of distinct keys taken from set.
if (keep == duplicate_keep_option::KEEP_ANY) {
auto set = hash_set_type<RowHasher>{num_rows,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_equal,
{row_hash.device_hasher(has_nulls)},
{},
{},
cudf::detail::cuco_allocator{stream},
stream.value()};

auto const iter = thrust::counting_iterator<cudf::size_type>{0};
set.insert_async(iter, iter + num_rows, stream.value());
auto output_indices = rmm::device_uvector<size_type>(num_rows, stream, mr);
auto const output_end = set.retrieve_all(output_indices.begin(), stream.value());
output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream);
return output_indices;
}

auto const init = reduction_init_value(keep);
auto map = hash_map_type<RowHasher>{num_rows,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
cuco::empty_value{init},
d_equal,
{row_hash.device_hasher(has_nulls)},
{},
{},
cudf::detail::cuco_allocator{stream},
stream.value()};
return detail::reduce_by_row(set, num_rows, keep, stream, mr);
return process_keep(map, num_rows, keep, stream, mr);
};

if (cudf::detail::has_nested_columns(input)) {
Expand Down
118 changes: 0 additions & 118 deletions cpp/src/stream_compaction/distinct_helpers.cu
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, this file is now empty and should be removed and deleted from CMakeLists.txt.

Original file line number Diff line number Diff line change
Expand Up @@ -21,122 +21,4 @@

namespace cudf::detail {

template <typename RowHasher>
rmm::device_uvector<size_type> reduce_by_row(hash_set_type<RowHasher>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto output_indices = rmm::device_uvector<size_type>(num_rows, stream, mr);

// If we don't care about order, just gather indices of distinct keys taken from set.
if (keep == duplicate_keep_option::KEEP_ANY) {
auto const iter = thrust::counting_iterator<cudf::size_type>{0};
set.insert_async(iter, iter + num_rows, stream.value());
auto const output_end = set.retrieve_all(output_indices.begin(), stream.value());
output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream);
return output_indices;
}

auto reduction_results = rmm::device_uvector<size_type>(num_rows, stream, mr);
thrust::uninitialized_fill(rmm::exec_policy_nosync(stream),
reduction_results.begin(),
reduction_results.end(),
reduction_init_value(keep));

auto set_ref = set.ref(cuco::op::insert_and_find);

thrust::for_each(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_rows),
[set_ref, keep, reduction_results = reduction_results.begin()] __device__(
size_type const idx) mutable {
auto const [inserted_idx_ptr, _] = set_ref.insert_and_find(idx);

auto ref = cuda::atomic_ref<size_type, cuda::thread_scope_device>{
reduction_results[*inserted_idx_ptr]};
if (keep == duplicate_keep_option::KEEP_FIRST) {
// Store the smallest index of all rows that are equal.
ref.fetch_min(idx, cuda::memory_order_relaxed);
} else if (keep == duplicate_keep_option::KEEP_LAST) {
// Store the greatest index of all rows that are equal.
ref.fetch_max(idx, cuda::memory_order_relaxed);
} else {
// Count the number of rows in each group of rows that are compared equal.
ref.fetch_add(size_type{1}, cuda::memory_order_relaxed);
}
});

auto const map_end = [&] {
if (keep == duplicate_keep_option::KEEP_NONE) {
// Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`.
// Thus, we only output index of the rows in the groups having group size of `1`.
return thrust::copy_if(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_rows),
output_indices.begin(),
cuda::proclaim_return_type<bool>(
[reduction_results = reduction_results.begin()] __device__(auto const idx) {
return reduction_results[idx] == size_type{1};
}));
}

// Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in
// each group of equal rows (which are the desired output indices), or the value given by
// `reduction_init_value()`.
return thrust::copy_if(
rmm::exec_policy(stream),
reduction_results.begin(),
reduction_results.end(),
output_indices.begin(),
cuda::proclaim_return_type<bool>([init_value = reduction_init_value(keep)] __device__(
auto const idx) { return idx != init_value; }));
}();

output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream);
return output_indices;
}

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
false,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
true,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
false,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::physical_equality_comparator>>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
true,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::physical_equality_comparator>>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

} // namespace cudf::detail
62 changes: 0 additions & 62 deletions cpp/src/stream_compaction/distinct_helpers.hpp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header is now empty. Can we delete it and remove it from #includes?

Original file line number Diff line number Diff line change
Expand Up @@ -31,66 +31,4 @@

namespace cudf::detail {

/**
* @brief Return the reduction identity used to initialize results of `hash_reduce_by_row`.
*
* @param keep A value of `duplicate_keep_option` type, must not be `KEEP_ANY`.
* @return The initial reduction value.
*/
auto constexpr reduction_init_value(duplicate_keep_option keep)
{
switch (keep) {
case duplicate_keep_option::KEEP_FIRST: return std::numeric_limits<size_type>::max();
case duplicate_keep_option::KEEP_LAST: return std::numeric_limits<size_type>::min();
case duplicate_keep_option::KEEP_NONE: return size_type{0};
default: CUDF_UNREACHABLE("This function should not be called with KEEP_ANY");
}
}

template <typename RowHasher>
using hash_set_type =
cuco::static_set<size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
RowHasher,
cuco::linear_probing<1,
cudf::experimental::row::hash::device_row_hasher<
cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>>,
cudf::detail::cuco_allocator,
cuco::storage<1>>;

/**
* @brief Perform a reduction on groups of rows that are compared equal and returns output indices
* of the occurrences of the distinct elements based on `keep` parameter.
*
* This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared
* equal. A hash set is used to find groups of equal rows.
*
* Depending on the `keep` parameter, the reduction operation for each row group is:
* - If `keep == KEEP_ANY` : order does not matter.
* - If `keep == KEEP_FIRST`: min of row indices in the group.
* - If `keep == KEEP_LAST`: max of row indices in the group.
* - If `keep == KEEP_NONE`: count of equivalent rows (group size).
*
* Note that this function is not needed when `keep == KEEP_NONE`.
*
* At the beginning of the operation, the entire output array is filled with a value given by
* the `reduction_init_value()` function. Then, the reduction result for each row group is written
* into the output array at the index of an unspecified row in the group.
*
* @param set The auxiliary set to perform reduction
* @param set_size The number of elements in set
* @param num_rows The number of all input rows
* @param keep The parameter to determine what type of reduction to perform
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned vector
* @return A device_uvector containing the output indices
*/
template <typename RowHasher>
rmm::device_uvector<size_type> reduce_by_row(hash_set_type<RowHasher>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
} // namespace cudf::detail
Loading