Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support duplicate_keep_option in cudf::distinct #11052

Merged
merged 96 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 92 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
c22b11c
Test passed
ttnghia Jun 4, 2022
5a6602c
Cleanup
ttnghia Jun 4, 2022
79fa051
Add bench
ttnghia Jun 4, 2022
a7e3463
Add comments
ttnghia Jun 4, 2022
3b7124e
Add unique bench
ttnghia Jun 4, 2022
4f933bf
Rewrite benchs
ttnghia Jun 4, 2022
bf2a717
Add parameter to the API
ttnghia Jun 7, 2022
11f9dd1
Fix compile errors
ttnghia Jun 7, 2022
b8832f7
Switch to use the old atomic ops
ttnghia Jun 7, 2022
f12a509
Add simple tests
ttnghia Jun 7, 2022
edcb612
Revert "Add simple tests"
ttnghia Jun 7, 2022
5b40152
Misc
ttnghia Jun 7, 2022
f2dc1eb
Change doxygen
ttnghia Jun 7, 2022
8a800e9
Cleanup
ttnghia Jun 7, 2022
c0144b3
Revert "Change doxygen"
ttnghia Jun 8, 2022
7d89f4c
Implement `distinct` by `distinct_map`
ttnghia Jun 8, 2022
ae891b3
Rewrite doxygen
ttnghia Jun 8, 2022
1786dbb
Add detail declaration of `distinct_map`
ttnghia Jun 8, 2022
6dfb4f4
Fix bug
ttnghia Jun 8, 2022
3352967
Alias `KEEP_ANY` to `KEEP_FIRST`
ttnghia Jun 8, 2022
52e29b1
Cleanup headers
ttnghia Jun 8, 2022
819a669
Rewrite comments and reorganize code
ttnghia Jun 8, 2022
2e5d41a
Rewrite comments
ttnghia Jun 8, 2022
8393809
Reverse tests
ttnghia Jun 8, 2022
5cdefa3
Fix old tests
ttnghia Jun 8, 2022
3626efb
Add a new overload for `cudf::distinct`
ttnghia Jun 8, 2022
bcc4abe
Reverse back breaking changes
ttnghia Jun 8, 2022
edbcc78
Fix compile error
ttnghia Jun 8, 2022
2f1ce5a
Reverse benchmark
ttnghia Jun 8, 2022
882a67a
Complete `StringKeyColumn` tests
ttnghia Jun 8, 2022
cdae2ac
Fix tests
ttnghia Jun 8, 2022
5b21d88
Fix tests
ttnghia Jun 8, 2022
3f18057
Rename function
ttnghia Jun 9, 2022
21456e7
Add `NonNullTable` tests
ttnghia Jun 9, 2022
8a17581
Add `SlicedNonNullTable` tests
ttnghia Jun 9, 2022
e05ad48
Add `InputWithNulls` tests
ttnghia Jun 9, 2022
03fb093
Change variable
ttnghia Jun 9, 2022
3c12942
Refactor
ttnghia Jun 9, 2022
6ab9673
Add `BasicList` tests
ttnghia Jun 9, 2022
37dfdcb
Add `NullableLists` tests
ttnghia Jun 9, 2022
b78cf5b
Add `ListsOfStructs` tests
ttnghia Jun 9, 2022
8de0948
Add `SlicedStructsOfLists` tests
ttnghia Jun 9, 2022
9e8c4a5
Misc
ttnghia Jun 9, 2022
7fa65ee
Add `ListsOfEmptyStructs` tests
ttnghia Jun 9, 2022
ff6e03e
Modify `EmptyDeepList` tests
ttnghia Jun 9, 2022
374545a
Add `StructsOfLists` tests
ttnghia Jun 9, 2022
9bf540a
Use `distinct` in Cython
ttnghia Jun 9, 2022
e1c3cd5
Merge branch 'branch-22.08' into refactor_stream_compaction
ttnghia Jun 9, 2022
70d3164
Fix Python style
ttnghia Jun 9, 2022
bba15c2
Revert "Fix Python style"
ttnghia Jun 9, 2022
d895f48
Revert "Use `distinct` in Cython"
ttnghia Jun 9, 2022
56e791c
Fix compiling errors due to merging
ttnghia Jun 10, 2022
6ffc9b0
Fix doxygen
ttnghia Jun 10, 2022
dd8c845
Rewrite comment and rename variable
ttnghia Jun 10, 2022
d9c0ab9
Address review comments
ttnghia Jun 15, 2022
7770265
Remove one overload
ttnghia Jun 15, 2022
96a36c4
Fix benchmark
ttnghia Jun 16, 2022
0210228
Rename struct, and use CTAD
ttnghia Jun 16, 2022
65190cc
Add comment
ttnghia Jun 16, 2022
a4db720
Rename variable
ttnghia Jun 16, 2022
df05dc8
Misc
ttnghia Jun 16, 2022
5f7d778
WIP
ttnghia Jun 16, 2022
154645a
Rewrite doxygen
ttnghia Jun 16, 2022
8f04d50
Remove `keys` parameter from `get_distinct_indices`
ttnghia Jun 16, 2022
d806278
Rewrite doxygen
ttnghia Jun 16, 2022
3734344
Use another version of `gather`
ttnghia Jun 16, 2022
f731d35
Fix wrong doxygen
ttnghia Jun 16, 2022
e44c85d
Fix wrong doxygen again
ttnghia Jun 16, 2022
a74f71e
Misc
ttnghia Jun 16, 2022
f9de181
Update doxygen
ttnghia Jun 16, 2022
6cec1eb
Define hash_map and add todo
ttnghia Jun 16, 2022
661400a
Rename variable
ttnghia Jun 17, 2022
700e465
Fix doxygen
ttnghia Jun 17, 2022
1c783e8
Rename tests
ttnghia Jun 17, 2022
47c5eec
Fix a bug when comparing nulls as unequal
ttnghia Jun 18, 2022
4db34db
Add `InputWithNullsUnequal` tests
ttnghia Jun 19, 2022
fab367b
Add `ListsWithNullsUnequal` tests
ttnghia Jun 19, 2022
9ec27af
Rewrite doxygen
ttnghia Jun 19, 2022
1359ee0
Rewrite doxygen for `duplicate_keep_option` and add back performance …
ttnghia Jun 19, 2022
aa0a4ed
Remove redundant docsc
ttnghia Jun 19, 2022
01e03b6
Rename functor
ttnghia Jun 20, 2022
cdc3000
Modify comments
ttnghia Jun 20, 2022
45dec2a
Merge branch 'branch-22.08' into refactor_stream_compaction
ttnghia Jun 20, 2022
cba4759
Merge branch 'branch-22.08' into refactor_stream_compaction
ttnghia Jun 20, 2022
7247101
Attempt to split files, not yet cleanup
ttnghia Jun 20, 2022
120377b
Cleanup
ttnghia Jun 20, 2022
68133d4
Change functor name
ttnghia Jun 20, 2022
aefdadf
Add doxygen
ttnghia Jun 20, 2022
faf6778
Reorganize code
ttnghia Jun 20, 2022
e839323
Fix headers
ttnghia Jun 20, 2022
f5646b3
Fix header
ttnghia Jun 20, 2022
538ff08
Fix `mr` usage, and rewrite some comments
ttnghia Jun 21, 2022
a755bea
Pass `std::shared_ptr` by value
ttnghia Jun 21, 2022
f0ee266
Fix doxygen and change function name
ttnghia Jun 22, 2022
0b35671
Update doxygen
ttnghia Jun 22, 2022
1ac6501
Merge branch 'branch-22.08' into refactor_stream_compaction
ttnghia Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ add_library(
src/stream_compaction/apply_boolean_mask.cu
src/stream_compaction/distinct.cu
src/stream_compaction/distinct_count.cu
src/stream_compaction/distinct_reduce.cu
src/stream_compaction/drop_nans.cu
src/stream_compaction/drop_nulls.cu
src/stream_compaction/unique.cu
Expand Down
9 changes: 7 additions & 2 deletions cpp/benchmarks/stream_compaction/distinct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ void nvbench_distinct(nvbench::state& state, nvbench::type_list<Type>)

state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
auto result = cudf::detail::distinct(input_table, {0}, cudf::null_equality::EQUAL, stream_view);
auto result = cudf::detail::distinct(input_table,
{0},
cudf::duplicate_keep_option::KEEP_ANY,
cudf::null_equality::EQUAL,
stream_view);
});
}

Expand Down Expand Up @@ -86,7 +90,8 @@ void nvbench_distinct_list(nvbench::state& state, nvbench::type_list<Type>)

state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
auto result = cudf::detail::distinct(*table, {0}, cudf::null_equality::EQUAL, stream_view);
auto result = cudf::detail::distinct(
*table, {0}, cudf::duplicate_keep_option::KEEP_ANY, cudf::null_equality::EQUAL, stream_view);
});
}

Expand Down
22 changes: 22 additions & 0 deletions cpp/include/cudf/detail/stream_compaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,32 @@ std::unique_ptr<table> unique(
std::unique_ptr<table> distinct(
table_view const& input,
std::vector<size_type> const& keys,
duplicate_keep_option keep = duplicate_keep_option::KEEP_ANY,
null_equality nulls_equal = null_equality::EQUAL,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create a column of indices of all distinct rows in the input table.
*
* Given an `input` table_view, an output array of row indices is generated by adding as many
* indices as possible, in an unspecified order, such that all rows corresponding to these indices
* are distinct (i.e., an arbitrary pair of rows are never be compared as equivalent).
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
*
* @param input The input table
* @param keep Get index of the first, last, any, or none row among the found duplicates rows
* @param nulls_equal Flag to specify whether null elements should be considered as equal
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned vector
* @return A device_uvector containing the result indices
*/
rmm::device_uvector<size_type> get_distinct_indices(
table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::unique_count(column_view const&, null_policy, nan_policy)
*
Expand Down
43 changes: 20 additions & 23 deletions cpp/include/cudf/stream_compaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,35 +208,33 @@ std::unique_ptr<table> apply_boolean_mask(
* @brief Choices for drop_duplicates API for retainment of duplicate rows
*/
enum class duplicate_keep_option {
KEEP_FIRST = 0, ///< Keeps first duplicate element and unique elements
KEEP_LAST, ///< Keeps last duplicate element and unique elements
KEEP_NONE ///< Keeps only unique elements
KEEP_ANY = 0, ///< Keep an unspecified occurrence
KEEP_FIRST, ///< Keep first occurrence
KEEP_LAST, ///< Keep last occurrence
KEEP_NONE ///< Keep no (remove all) occurrences of duplicates
};

/**
* @brief Create a new table with consecutive duplicate rows removed.
*
* Given an `input` table_view, one specific row from a group of equivalent elements is copied to
* output table depending on the value of @p keep:
* - KEEP_FIRST: only the first of a sequence of duplicate rows is copied
* - KEEP_LAST: only the last of a sequence of duplicate rows is copied
* - KEEP_NONE: no duplicate rows are copied
* Given an `input` table_view, each row is copied to the output table to create a set of distinct
* rows. If there are duplicate rows, which row to be copied depends on the specified value of
* the `keep` parameter.
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
*
* The order of rows in the output table remains the same as in the input.
*
* A row is distinct if there are no equivalent rows in the table. A row is unique if there is no
* adjacent equivalent row. That is, keeping distinct rows removes all duplicates in the
* table/column, while keeping unique rows only removes duplicates from consecutive groupings.
*
* Performance hints:
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
* - Always use `cudf::unique` instead of `cudf::distinct` if the input is pre-sorted
* - If the input is not pre-sorted and the behavior of pandas.DataFrame.drop_duplicates is desired:
* - If `keep` is not relevant, use `cudf::distinct`
* - If `keep` control is required, stable sort the input then `cudf::unique`
* Performance hint: if the input is pre-sorted, `cudf::unique` can produce an equivalent result
* (i.e., same set of output rows) but with less running time than `cudf::distinct`.
*
* @throws cudf::logic_error if the `keys` column indices are out of bounds in the `input` table.
*
* @param[in] input input table_view to copy only unique rows
* @param[in] keys vector of indices representing key columns from `input`
* @param[in] keep keep first row, last row, or no rows of the found duplicates
* @param[in] keep keep the first, last, any, or no rows of the found duplicates
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] nulls_equal flag to denote nulls are equal if null_equality::EQUAL, nulls are not
* equal if null_equality::UNEQUAL
* @param[in] mr Device memory resource used to allocate the returned table's device
Expand All @@ -254,20 +252,18 @@ std::unique_ptr<table> unique(
/**
* @brief Create a new table without duplicate rows.
*
* Given an `input` table_view, each row is copied to output table if the corresponding
* row of `keys` columns is distinct (no other equivalent row exists in the table). If duplicate
* rows are present, it is unspecified which row is copied.
* Given an `input` table_view, each row is copied to the output table to create a set of distinct
* rows. If there are duplicate rows, which row to be copied depends on the specified value of
* the `keep` parameter.
*
* The order of elements in the output table is not specified.
* The order of rows in the output table is not specified.
*
* Performance hints:
* - Always use `cudf::unique` instead of `cudf::distinct` if the input is pre-sorted
* - If the input is not pre-sorted and the behavior of pandas.DataFrame.drop_duplicates is desired:
* - If `keep` is not relevant, use `cudf::distinct`
* - If `keep` control is required, stable sort the input then `cudf::unique`
* Performance hint: if the input is pre-sorted, `cudf::unique` can produce an equivalent result
* (i.e., same set of output rows) but with less running time than `cudf::distinct`.
*
* @param[in] input input table_view to copy only distinct rows
* @param[in] keys vector of indices representing key columns from `input`
* @param[in] keep keep the first, last, any, or no rows of the found duplicates
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] nulls_equal flag to denote nulls are equal if null_equality::EQUAL, nulls are not
* equal if null_equality::UNEQUAL
* @param[in] mr Device memory resource used to allocate the returned table's device
Expand All @@ -278,6 +274,7 @@ std::unique_ptr<table> unique(
std::unique_ptr<table> distinct(
table_view const& input,
std::vector<size_type> const& keys,
duplicate_keep_option keep = duplicate_keep_option::KEEP_ANY,
null_equality nulls_equal = null_equality::EQUAL,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand Down
1 change: 1 addition & 0 deletions cpp/src/dictionary/add_keys.cu
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ std::unique_ptr<column> add_keys(
// sort(distinct([a,b,c,d,f,d,b,e])) = [a,b,c,d,e,f]
auto table_keys = cudf::detail::distinct(table_view{{combined_keys->view()}},
std::vector<size_type>{0}, // only one key column
duplicate_keep_option::KEEP_ANY,
null_equality::EQUAL,
stream,
mr);
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/dictionary/detail/concatenate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,12 @@ std::unique_ptr<column> concatenate(host_span<column_view const> columns,

// sort keys and remove duplicates;
// this becomes the keys child for the output dictionary column
auto table_keys = cudf::detail::distinct(
table_view{{all_keys->view()}}, std::vector<size_type>{0}, null_equality::EQUAL, stream, mr);
auto table_keys = cudf::detail::distinct(table_view{{all_keys->view()}},
std::vector<size_type>{0},
duplicate_keep_option::KEEP_ANY,
null_equality::EQUAL,
stream,
mr);
auto sorted_keys = cudf::detail::sort(table_keys->view(),
std::vector<order>{order::ASCENDING},
std::vector<null_order>{null_order::BEFORE},
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/dictionary/set_keys.cu
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ std::unique_ptr<column> set_keys(

// copy the keys -- use cudf::distinct to make sure there are no duplicates,
// then sort the results.
auto distinct_keys = cudf::detail::distinct(
table_view{{new_keys}}, std::vector<size_type>{0}, null_equality::EQUAL, stream, mr);
auto sorted_keys = cudf::detail::sort(distinct_keys->view(),
auto distinct_keys = cudf::detail::distinct(table_view{{new_keys}},
std::vector<size_type>{0},
duplicate_keep_option::KEEP_ANY,
null_equality::EQUAL,
stream,
mr);
auto sorted_keys = cudf::detail::sort(distinct_keys->view(),
std::vector<order>{order::ASCENDING},
std::vector<null_order>{null_order::BEFORE},
stream,
Expand Down
135 changes: 86 additions & 49 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,19 @@
* limitations under the License.
*/

#include "stream_compaction_common.cuh"
#include "stream_compaction_common.hpp"
#include "distinct_reduce.cuh"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/sorting.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/stream_compaction.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/type_dispatcher.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/copy.h>
#include <thrust/execution_policy.h>
#include <thrust/functional.h>
#include <thrust/distance.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>

Expand All @@ -47,8 +35,87 @@

namespace cudf {
namespace detail {

rmm::device_uvector<size_type> get_distinct_indices(table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (input.num_rows() == 0 or input.num_columns() == 0) {
return rmm::device_uvector<size_type>(0, stream, mr);
}

auto map = hash_map_type{compute_hash_table_size(input.num_rows()),
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
cuco::sentinel::empty_key{COMPACTION_EMPTY_KEY_SENTINEL},
cuco::sentinel::empty_value{COMPACTION_EMPTY_VALUE_SENTINEL},
detail::hash_table_allocator_type{default_allocator<char>{}, stream},
stream.value()};

auto const preprocessed_input =
cudf::experimental::row::hash::preprocessed_table::create(input, stream);
auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)};

auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls));

auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input);
auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal);

auto const pair_iter = cudf::detail::make_counting_transform_iterator(
size_type{0}, [] __device__(size_type const i) { return cuco::make_pair(i, i); });
map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value());

auto output_indices = rmm::device_uvector<size_type>(map.get_size(), stream, mr);

// If we don't care about order, just gather indices of distinct keys taken from map.
if (keep == duplicate_keep_option::KEEP_ANY) {
map.retrieve_all(output_indices.begin(), thrust::make_discard_iterator(), stream.value());
return output_indices;
}

// For other keep options, perform a (sparse) reduce-by-row on the rows compared equal.
auto const reduction_results = spare_reduce_by_row(map,
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
preprocessed_input,
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
input.num_rows(),
has_nulls,
keep,
nulls_equal,
stream,
rmm::mr::get_current_device_resource());

// Extract the desired output indices from reduction results.
auto const map_end = [&] {
if (keep == duplicate_keep_option::KEEP_NONE) {
// Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`.
// Thus, we only output index of the rows in the groups having group size of `1`.
return thrust::copy_if(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.num_rows()),
output_indices.begin(),
[reduction_results = reduction_results.begin()] __device__(
auto const idx) { return reduction_results[idx] == size_type{1}; });
}

// Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in
// each group of equal rows (which are the desired output indicies), or the value given by
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
// `reduction_init_value()`.
return thrust::copy_if(rmm::exec_policy(stream),
reduction_results.begin(),
reduction_results.end(),
output_indices.begin(),
[init_value = reduction_init_value(keep)] __device__(auto const idx) {
return idx != init_value;
});
}();

output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream);
return output_indices;
}

std::unique_ptr<table> distinct(table_view const& input,
std::vector<size_type> const& keys,
duplicate_keep_option keep,
null_equality nulls_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
Expand All @@ -57,42 +124,11 @@ std::unique_ptr<table> distinct(table_view const& input,
return empty_like(input);
}

auto keys_view = input.select(keys);
auto preprocessed_keys =
cudf::experimental::row::hash::preprocessed_table::create(keys_view, stream);
auto const has_null = nullate::DYNAMIC{cudf::has_nested_nulls(keys_view)};
auto const num_rows{keys_view.num_rows()};

hash_map_type key_map{compute_hash_table_size(num_rows),
cuco::sentinel::empty_key{COMPACTION_EMPTY_KEY_SENTINEL},
cuco::sentinel::empty_value{COMPACTION_EMPTY_VALUE_SENTINEL},
detail::hash_table_allocator_type{default_allocator<char>{}, stream},
stream.value()};

auto row_hash = cudf::experimental::row::hash::row_hasher(preprocessed_keys);
experimental::compaction_hash hash_key(row_hash.device_hasher(has_null));

cudf::experimental::row::equality::self_comparator row_equal(preprocessed_keys);
auto key_equal = row_equal.equal_to(has_null, nulls_equal);

auto iter = cudf::detail::make_counting_transform_iterator(
0, [] __device__(size_type i) { return cuco::make_pair(i, i); });
// insert distinct indices into the map.
key_map.insert(iter, iter + num_rows, hash_key, key_equal, stream.value());

auto const output_size{key_map.get_size()};
auto distinct_indices = cudf::make_numeric_column(
data_type{type_id::INT32}, output_size, mask_state::UNALLOCATED, stream, mr);
// write distinct indices to a numeric column
key_map.retrieve_all(distinct_indices->mutable_view().begin<cudf::size_type>(),
thrust::make_discard_iterator(),
stream.value());

// run gather operation to establish new order
auto const gather_map = get_distinct_indices(input.select(keys), keep, nulls_equal, stream);
return detail::gather(input,
distinct_indices->view(),
gather_map,
out_of_bounds_policy::DONT_CHECK,
detail::negative_index_policy::NOT_ALLOWED,
negative_index_policy::NOT_ALLOWED,
stream,
mr);
}
Expand All @@ -101,11 +137,12 @@ std::unique_ptr<table> distinct(table_view const& input,

std::unique_ptr<table> distinct(table_view const& input,
std::vector<size_type> const& keys,
duplicate_keep_option keep,
null_equality nulls_equal,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::distinct(input, keys, nulls_equal, rmm::cuda_stream_default, mr);
return detail::distinct(input, keys, keep, nulls_equal, rmm::cuda_stream_default, mr);
}

} // namespace cudf
Loading