Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace cudf's concurrent_ordered_map with cuco::static_map in semi/anti joins #9666

Merged
merged 14 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/cmake/thirdparty/get_cucollections.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function(find_and_configure_cucollections)
cuco 0.0
GLOBAL_TARGETS cuco::cuco
CPM_ARGS GITHUB_REPOSITORY NVIDIA/cuCollections
GIT_TAG 6433e8ad7571f14cc5384051b049029c60dd1ce0
GIT_TAG 193de1aa74f5721717f991ca757dc610c852bb17
OPTIONS "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" "BUILD_EXAMPLES OFF"
)

Expand Down
16 changes: 0 additions & 16 deletions cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,6 @@ namespace detail {

namespace {

/**
* @brief Device functor to determine if a row is valid.
*/
class row_is_valid {
public:
row_is_valid(bitmask_type const* row_bitmask) : _row_bitmask{row_bitmask} {}

__device__ __inline__ bool operator()(const size_type& i) const noexcept
{
return cudf::bit_is_set(_row_bitmask, i);
}

private:
bitmask_type const* _row_bitmask;
};

} // anonymous namespace

std::pair<std::unique_ptr<table>, std::unique_ptr<table>> get_empty_joined_table(
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@
namespace cudf {
namespace detail {

/**
* @brief Device functor to determine if a row is valid.
*/
class row_is_valid {
public:
row_is_valid(bitmask_type const* row_bitmask) : _row_bitmask{row_bitmask} {}

__device__ __inline__ bool operator()(const size_type& i) const noexcept
{
return cudf::bit_is_set(_row_bitmask, i);
}

private:
bitmask_type const* _row_bitmask;
};

/**
* @brief Device functor to determine if two pairs are identical.
*/
Expand Down
105 changes: 62 additions & 43 deletions cpp/src/join/semi_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

#include <hash/concurrent_unordered_map.cuh>
#include <join/join_common_utils.cuh>
#include <join/join_common_utils.hpp>

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/structs/utilities.hpp>
Expand All @@ -34,10 +35,28 @@
#include <thrust/copy.h>
#include <thrust/distance.h>
#include <thrust/sequence.h>
#include <thrust/tuple.h>

#include <cuco/static_map.cuh>

namespace cudf {
namespace detail {

namespace {
/**
* @brief Device functor to create a pair of hash value and index for a given row.
*/
struct make_pair_function {
__device__ __forceinline__ cudf::detail::pair_type operator()(size_type i) const noexcept
{
// The value is irrelevant since we only ever use the hash map to check for
// membership of a particular row index.
return cuco::make_pair<hash_value_type, size_type>(i, 0);
}
};

} // namespace

std::unique_ptr<rmm::device_uvector<cudf::size_type>> left_semi_anti_join(
join_kind const kind,
cudf::table_view const& left_keys,
Expand Down Expand Up @@ -71,67 +90,67 @@ std::unique_ptr<rmm::device_uvector<cudf::size_type>> left_semi_anti_join(
auto right_flattened_keys = right_flattened_tables.flattened_columns();
auto left_flattened_keys = left_flattened_tables.flattened_columns();

// Only care about existence, so we'll use an unordered map (other joins need a multimap)
using hash_table_type = concurrent_unordered_map<cudf::size_type, bool, row_hash, row_equality>;
// Create hash table.
auto hash_table = cuco::
static_map<hash_value_type, size_type, cuda::thread_scope_device, hash_table_allocator_type>{
compute_hash_table_size(right_num_rows),
std::numeric_limits<hash_value_type>::max(),
cudf::detail::JoinNoneValue,
hash_table_allocator_type{default_allocator<char>{}, stream},
stream.value()};

// Create hash table containing all keys found in right table
auto right_rows_d = table_device_view::create(right_flattened_keys, stream);
size_t const hash_table_size = compute_hash_table_size(right_num_rows);
auto const right_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(right_flattened_keys)};
row_hash hash_build{right_nulls, *right_rows_d};
auto right_rows_d = table_device_view::create(right_flattened_keys, stream);
auto const right_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(right_flattened_keys)};
row_hash const hash_build{right_nulls, *right_rows_d};
row_equality equality_build{right_nulls, *right_rows_d, *right_rows_d, compare_nulls};
make_pair_function pair_func_build{};

auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func_build);

// Going to join it with left table
auto left_rows_d = table_device_view::create(left_flattened_keys, stream);
auto const left_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(left_flattened_keys)};
row_hash hash_probe{left_nulls, *left_rows_d};
row_equality equality_probe{left_nulls, *left_rows_d, *right_rows_d, compare_nulls};

auto hash_table_ptr = hash_table_type::create(hash_table_size,
stream,
std::numeric_limits<bool>::max(),
std::numeric_limits<cudf::size_type>::max(),
hash_build,
equality_build);
auto hash_table = *hash_table_ptr;

// if compare_nulls == UNEQUAL, we can simply ignore any rows that
// contain a NULL in any column as they will never compare to equal.
auto const row_bitmask = (compare_nulls == null_equality::EQUAL)
? rmm::device_buffer{}
: cudf::detail::bitmask_and(right_flattened_keys, stream).first;
// skip rows that are null here.
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
right_num_rows,
[hash_table, row_bitmask = static_cast<bitmask_type const*>(row_bitmask.data())] __device__(
size_type idx) mutable {
if (!row_bitmask || cudf::bit_is_set(row_bitmask, idx)) {
hash_table.insert(thrust::make_pair(idx, true));
}
});
if ((compare_nulls == null_equality::EQUAL) or (not nullable(right_keys))) {
hash_table.insert(iter, iter + right_num_rows, hash_build, equality_build, stream.value());
} else {
thrust::counting_iterator<size_type> stencil(0);
auto const [row_bitmask, _] = cudf::detail::bitmask_and(right_flattened_keys, stream);
row_is_valid pred{static_cast<bitmask_type const*>(row_bitmask.data())};

// insert valid rows
hash_table.insert_if(
iter, iter + right_num_rows, stencil, pred, hash_build, equality_build, stream.value());
}

//
// Now we have a hash table, we need to iterate over the rows of the left table
// and check to see if they are contained in the hash table
//
auto left_rows_d = table_device_view::create(left_flattened_keys, stream);
auto const left_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(left_flattened_keys)};
row_hash hash_probe{left_nulls, *left_rows_d};
// Note: This equality comparator violates symmetry of equality and is
// therefore relying on the implementation detail of the order in which its
// operator is invoked. If cuco makes no promises about the order of
// invocation this seems a bit unsafe.
row_equality equality_probe{left_nulls, *right_rows_d, *left_rows_d, compare_nulls};

// For semi join we want contains to be true, for anti join we want contains to be false
bool const join_type_boolean = (kind == join_kind::LEFT_SEMI_JOIN);

auto hash_table_view = hash_table.get_device_view();

auto gather_map =
std::make_unique<rmm::device_uvector<cudf::size_type>>(left_num_rows, stream, mr);

// gather_map_end will be the end of valid data in gather_map
auto gather_map_end = thrust::copy_if(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(left_num_rows),
thrust::make_counting_iterator(0),
vyasr marked this conversation as resolved.
Show resolved Hide resolved
thrust::make_counting_iterator(left_num_rows),
gather_map->begin(),
[hash_table, join_type_boolean, hash_probe, equality_probe] __device__(size_type idx) {
auto pos = hash_table.find(idx, hash_probe, equality_probe);
return (pos != hash_table.end()) == join_type_boolean;
[hash_table_view, join_type_boolean, hash_probe, equality_probe] __device__(
size_type const idx) {
// Look up this row. The hash function used here needs to map a (left) row index to the hash
// of the row, so it's a row hash. The equality check needs to verify
return hash_table_view.contains(idx, hash_probe, equality_probe) == join_type_boolean;
});

auto join_size = thrust::distance(gather_map->begin(), gather_map_end);
Expand Down
15 changes: 15 additions & 0 deletions cpp/tests/join/semi_anti_join_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ using Table = cudf::table;
struct JoinTest : public cudf::test::BaseFixture {
};

TEST_F(JoinTest, TestSimple)
{
column_wrapper<int32_t> left_col0{0, 1, 2};
column_wrapper<int32_t> right_col0{0, 1, 3};

auto left = cudf::table_view{{left_col0}};
auto right = cudf::table_view{{right_col0}};

auto result = cudf::left_semi_join(left, right);
auto result_cv = cudf::column_view(
cudf::data_type{cudf::type_to_id<cudf::size_type>()}, result->size(), result->data());
column_wrapper<cudf::size_type> expected{0, 1};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result_cv);
};

std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>> get_saj_tables(
std::vector<bool> const& left_is_human_nulls, std::vector<bool> const& right_is_human_nulls)
{
Expand Down