diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index 24e2692cb6f..abd4bb41e8e 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -32,6 +32,9 @@ #include #include +#include + +#include #include #include @@ -96,18 +99,41 @@ rmm::device_uvector distinct_indices(table_view const& input, auto const row_equal = cudf::experimental::row::equality::self_comparator(preprocessed_input); auto const helper_func = [&](auto const& d_equal) { - using RowHasher = std::decay_t; - auto set = hash_set_type{ + using RowHasher = cuda::std::decay_t; + + // If we don't care about order, just gather indices of distinct keys taken from set. + if (keep == duplicate_keep_option::KEEP_ANY) { + auto set = hash_set_type{ + num_rows, + 0.5, // desired load factor + cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL}, + d_equal, + {row_hash.device_hasher(has_nulls)}, + {}, + {}, + cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}, + stream.value()}; + + auto const iter = thrust::counting_iterator{0}; + set.insert_async(iter, iter + num_rows, stream.value()); + auto output_indices = rmm::device_uvector(num_rows, stream, mr); + auto const output_end = set.retrieve_all(output_indices.begin(), stream.value()); + output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream); + return output_indices; + } + + auto map = hash_map_type{ num_rows, 0.5, // desired load factor cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL}, + cuco::empty_value{reduction_init_value(keep)}, d_equal, {row_hash.device_hasher(has_nulls)}, {}, {}, cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}, stream.value()}; - return detail::reduce_by_row(set, num_rows, keep, stream, mr); + return reduce_by_row(map, num_rows, keep, stream, mr); }; if (cudf::detail::has_nested_columns(input)) { diff --git a/cpp/src/stream_compaction/distinct_helpers.cu b/cpp/src/stream_compaction/distinct_helpers.cu index c3a004b7f28..882be4532dc 100644 --- a/cpp/src/stream_compaction/distinct_helpers.cu +++ b/cpp/src/stream_compaction/distinct_helpers.cu @@ -22,118 +22,105 @@ namespace cudf::detail { template -rmm::device_uvector reduce_by_row(hash_set_type& set, +rmm::device_uvector reduce_by_row(hash_map_type& map, size_type num_rows, duplicate_keep_option keep, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto output_indices = rmm::device_uvector(num_rows, stream, mr); + if ((keep == duplicate_keep_option::KEEP_FIRST) or (keep == duplicate_keep_option::KEEP_LAST)) { + auto output_indices = rmm::device_uvector(num_rows, stream, mr); + + auto pairs = + thrust::make_transform_iterator(thrust::counting_iterator(0), + cuda::proclaim_return_type>( + [] __device__(size_type const i) { + return cuco::pair{i, i}; + })); + + if (keep == duplicate_keep_option::KEEP_FIRST) { + map.insert_or_apply_async(pairs, pairs + num_rows, min_op{}, stream.value()); + } else { + map.insert_or_apply_async(pairs, pairs + num_rows, max_op{}, stream.value()); + } - // If we don't care about order, just gather indices of distinct keys taken from set. - if (keep == duplicate_keep_option::KEEP_ANY) { - auto const iter = thrust::counting_iterator{0}; - set.insert_async(iter, iter + num_rows, stream.value()); - auto const output_end = set.retrieve_all(output_indices.begin(), stream.value()); + auto const [_, output_end] = + map.retrieve_all(thrust::make_discard_iterator(), output_indices.begin(), stream.value()); output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream); return output_indices; } - auto reduction_results = rmm::device_uvector(num_rows, stream, mr); - thrust::uninitialized_fill(rmm::exec_policy_nosync(stream), - reduction_results.begin(), - reduction_results.end(), - reduction_init_value(keep)); - - auto set_ref = set.ref(cuco::op::insert_and_find); - - thrust::for_each(rmm::exec_policy_nosync(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(num_rows), - [set_ref, keep, reduction_results = reduction_results.begin()] __device__( - size_type const idx) mutable { - auto const [inserted_idx_ptr, _] = set_ref.insert_and_find(idx); - - auto ref = cuda::atomic_ref{ - reduction_results[*inserted_idx_ptr]}; - if (keep == duplicate_keep_option::KEEP_FIRST) { - // Store the smallest index of all rows that are equal. - ref.fetch_min(idx, cuda::memory_order_relaxed); - } else if (keep == duplicate_keep_option::KEEP_LAST) { - // Store the greatest index of all rows that are equal. - ref.fetch_max(idx, cuda::memory_order_relaxed); - } else { - // Count the number of rows in each group of rows that are compared equal. - ref.fetch_add(size_type{1}, cuda::memory_order_relaxed); - } - }); - - auto const map_end = [&] { - if (keep == duplicate_keep_option::KEEP_NONE) { - // Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`. - // Thus, we only output index of the rows in the groups having group size of `1`. - return thrust::copy_if( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(num_rows), - output_indices.begin(), - cuda::proclaim_return_type( - [reduction_results = reduction_results.begin()] __device__(auto const idx) { - return reduction_results[idx] == size_type{1}; - })); - } + auto keys = rmm::device_uvector(num_rows, stream); + auto values = rmm::device_uvector(num_rows, stream); + + auto pairs = thrust::make_transform_iterator( + thrust::counting_iterator(0), + cuda::proclaim_return_type>([] __device__(size_type const i) { + return cuco::pair{i, 1}; + })); + + map.insert_or_apply_async(pairs, pairs + num_rows, plus_op{}, stream.value()); + auto const [keys_end, _] = map.retrieve_all(keys.begin(), values.begin(), stream.value()); + + auto num_distinct_keys = thrust::distance(keys.begin(), keys_end); + keys.resize(num_distinct_keys, stream); + values.resize(num_distinct_keys, stream); + + auto output_indices = rmm::device_uvector(num_distinct_keys, stream, mr); + + auto const output_iter = cudf::detail::make_counting_transform_iterator( + size_type(0), + cuda::proclaim_return_type( + [keys = keys.begin(), values = values.begin()] __device__(auto const idx) { + return values[idx] == size_type{1} ? keys[idx] : -1; + })); - // Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in - // each group of equal rows (which are the desired output indices), or the value given by - // `reduction_init_value()`. - return thrust::copy_if( - rmm::exec_policy(stream), - reduction_results.begin(), - reduction_results.end(), - output_indices.begin(), - cuda::proclaim_return_type([init_value = reduction_init_value(keep)] __device__( - auto const idx) { return idx != init_value; })); - }(); + auto const map_end = thrust::copy_if( + rmm::exec_policy_nosync(stream), + output_iter, + output_iter + num_distinct_keys, + output_indices.begin(), + cuda::proclaim_return_type([] __device__(auto const idx) { return idx != -1; })); output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream); return output_indices; } template rmm::device_uvector reduce_by_row( - hash_set_type>& set, + cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& map, size_type num_rows, duplicate_keep_option keep, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); template rmm::device_uvector reduce_by_row( - hash_set_type>& set, + cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& map, size_type num_rows, duplicate_keep_option keep, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); template rmm::device_uvector reduce_by_row( - hash_set_type>& set, + cudf::experimental::row::equality::physical_equality_comparator>>& map, size_type num_rows, duplicate_keep_option keep, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); template rmm::device_uvector reduce_by_row( - hash_set_type>& set, + cudf::experimental::row::equality::physical_equality_comparator>>& map, size_type num_rows, duplicate_keep_option keep, rmm::cuda_stream_view stream, diff --git a/cpp/src/stream_compaction/distinct_helpers.hpp b/cpp/src/stream_compaction/distinct_helpers.hpp index bea02e3dbe8..de5933f6776 100644 --- a/cpp/src/stream_compaction/distinct_helpers.hpp +++ b/cpp/src/stream_compaction/distinct_helpers.hpp @@ -23,11 +23,13 @@ #include #include +#include #include #include #include #include #include +#include namespace cudf::detail { @@ -47,6 +49,31 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) } } +struct plus_op { + template + __device__ void operator()(cuda::atomic_ref ref, size_type const val) + { + ref.fetch_add(static_cast(1), cuda::memory_order_relaxed); + } +}; + +struct min_op { + template + __device__ void operator()(cuda::atomic_ref ref, size_type const val) + { + ref.fetch_min(val, cuda::memory_order_relaxed); + } +}; + +struct max_op { + template + __device__ void operator()(cuda::atomic_ref ref, size_type const val) + { + ref.fetch_max(val, cuda::memory_order_relaxed); + } +}; + +// The static_set type used to process `keep_any` option template using hash_set_type = cuco::static_set, cuco::storage<1>>; +// The static_map type used to process `keep_first`, `keep_last` and `keep_none` option +template +using hash_map_type = + cuco::static_map, + cuda::thread_scope_device, + RowHasher, + cuco::linear_probing<1, + cudf::experimental::row::hash::device_row_hasher< + cudf::hashing::detail::default_hash, + cudf::nullate::DYNAMIC>>, + cudf::detail::cuco_allocator, + cuco::storage<1>>; + /** * @brief Perform a reduction on groups of rows that are compared equal and returns output indices * of the occurrences of the distinct elements based on `keep` parameter. * * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared - * equal. A hash set is used to find groups of equal rows. + * equal. A hash map is used to find groups of equal rows. * * Depending on the `keep` parameter, the reduction operation for each row group is: * - If `keep == KEEP_ANY` : order does not matter. @@ -79,8 +121,7 @@ using hash_set_type = * the `reduction_init_value()` function. Then, the reduction result for each row group is written * into the output array at the index of an unspecified row in the group. * - * @param set The auxiliary set to perform reduction - * @param set_size The number of elements in set + * @param map The auxiliary map to perform reduction * @param num_rows The number of all input rows * @param keep The parameter to determine what type of reduction to perform * @param stream CUDA stream used for device memory operations and kernel launches @@ -88,7 +129,7 @@ using hash_set_type = * @return A device_uvector containing the output indices */ template -rmm::device_uvector reduce_by_row(hash_set_type& set, +rmm::device_uvector reduce_by_row(hash_map_type& map, size_type num_rows, duplicate_keep_option keep, rmm::cuda_stream_view stream,