Skip to content

Commit

Permalink
Exposed stream-ordering to join API (#16793)
Browse files Browse the repository at this point in the history
Adds stream ordering to the public join APIs:

- `inner_join`
- `left_join`
- `full_join`
- `left_semi_join`
- `left_anti_join`
- `cross_join`
- `conditional_inner_join`
- `conditional_left_join`
- `conditional_full_join`
- `conditional_left_semi_join`
- `conditional_left_anti_join`
- `mixed_inner_join`
- `mixed_left_join`
- `mixed_full_join`
- `mixed_left_semi_join`
- `mixed_left_anti_join`
- `mixed_inner_join_size`
- `mixed_left_join_size`
- `conditional_inner_join_size`
- `conditional_left_join_size`
- `conditional_left_semi_join_size`
- `conditional_left_anti_join_size`

closes #16792 
follows up #13744

Authors:
  - Basit Ayantunde (https://github.com/lamarrr)

Approvers:
  - Paul Mattione (https://github.com/pmattione-nvidia)
  - Nghia Truong (https://github.com/ttnghia)
  - David Wendt (https://github.com/davidwendt)

URL: #16793
  • Loading branch information
lamarrr authored Sep 20, 2024
1 parent eeb4bae commit 69ab988
Show file tree
Hide file tree
Showing 15 changed files with 349 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ std::unique_ptr<cudf::table> perform_left_join(cudf::table_view const& left_inpu
auto const left_selected = left_input.select(left_on);
auto const right_selected = right_input.select(right_on);
auto const [left_join_indices, right_join_indices] =
cudf::left_join(left_selected, right_selected, cudf::null_equality::EQUAL, mr);
cudf::left_join(left_selected, right_selected, cudf::null_equality::EQUAL, stream, mr);

auto const left_indices_span = cudf::device_span<cudf::size_type const>{*left_join_indices};
auto const right_indices_span = cudf::device_span<cudf::size_type const>{*right_join_indices};
Expand Down
15 changes: 10 additions & 5 deletions cpp/benchmarks/ndsh/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <cudf/stream_compaction.hpp>
#include <cudf/table/table.hpp>
#include <cudf/transform.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <cstdlib>
#include <ctime>
Expand Down Expand Up @@ -146,11 +147,15 @@ std::unique_ptr<cudf::table> join_and_gather(cudf::table_view const& left_input,
cudf::null_equality compare_nulls)
{
CUDF_FUNC_RANGE();
constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK;
auto const left_selected = left_input.select(left_on);
auto const right_selected = right_input.select(right_on);
auto const [left_join_indices, right_join_indices] = cudf::inner_join(
left_selected, right_selected, compare_nulls, cudf::get_current_device_resource_ref());
constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK;
auto const left_selected = left_input.select(left_on);
auto const right_selected = right_input.select(right_on);
auto const [left_join_indices, right_join_indices] =
cudf::inner_join(left_selected,
right_selected,
compare_nulls,
cudf::get_default_stream(),
cudf::get_current_device_resource_ref());

auto const left_indices_span = cudf::device_span<cudf::size_type const>{*left_join_indices};
auto const right_indices_span = cudf::device_span<cudf::size_type const>{*right_join_indices};
Expand Down
9 changes: 7 additions & 2 deletions cpp/examples/parquet_io/parquet_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include "../utilities/timer.hpp"

#include <cudf/utilities/default_stream.hpp>

/**
* @file parquet_io.cpp
* @brief Demonstrates usage of the libcudf APIs to read and write
Expand Down Expand Up @@ -159,8 +161,11 @@ int main(int argc, char const** argv)
// Left anti-join the original and transcoded tables
// identical tables should not throw an exception and
// return an empty indices vector
auto const indices = cudf::left_anti_join(
input->view(), transcoded_input->view(), cudf::null_equality::EQUAL, resource.get());
auto const indices = cudf::left_anti_join(input->view(),
transcoded_input->view(),
cudf::null_equality::EQUAL,
cudf::get_default_stream(),
resource.get());

// No exception thrown, check indices
auto const valid = indices->size() == 0;
Expand Down
44 changes: 44 additions & 0 deletions cpp/include/cudf/join.hpp

Large diffs are not rendered by default.

75 changes: 25 additions & 50 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <cudf/table/table_device_view.cuh>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -377,16 +376,12 @@ conditional_inner_join(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::INNER_JOIN,
output_size,
cudf::get_default_stream(),
mr);
return detail::conditional_join(
left, right, binary_predicate, detail::join_kind::INNER_JOIN, output_size, stream, mr);
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
Expand All @@ -395,115 +390,95 @@ conditional_left_join(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::LEFT_JOIN,
output_size,
cudf::get_default_stream(),
mr);
return detail::conditional_join(
left, right, binary_predicate, detail::join_kind::LEFT_JOIN, output_size, stream, mr);
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
conditional_full_join(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::FULL_JOIN,
{},
cudf::get_default_stream(),
mr);
return detail::conditional_join(
left, right, binary_predicate, detail::join_kind::FULL_JOIN, {}, stream, mr);
}

std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_semi_join(
table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::conditional_join_anti_semi(left,
right,
binary_predicate,
detail::join_kind::LEFT_SEMI_JOIN,
output_size,
cudf::get_default_stream(),
mr);
return detail::conditional_join_anti_semi(
left, right, binary_predicate, detail::join_kind::LEFT_SEMI_JOIN, output_size, stream, mr);
}

std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_anti_join(
table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::conditional_join_anti_semi(left,
right,
binary_predicate,
detail::join_kind::LEFT_ANTI_JOIN,
output_size,
cudf::get_default_stream(),
mr);
return detail::conditional_join_anti_semi(
left, right, binary_predicate, detail::join_kind::LEFT_ANTI_JOIN, output_size, stream, mr);
}

std::size_t conditional_inner_join_size(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::compute_conditional_join_output_size(
left, right, binary_predicate, detail::join_kind::INNER_JOIN, cudf::get_default_stream(), mr);
left, right, binary_predicate, detail::join_kind::INNER_JOIN, stream, mr);
}

std::size_t conditional_left_join_size(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::compute_conditional_join_output_size(
left, right, binary_predicate, detail::join_kind::LEFT_JOIN, cudf::get_default_stream(), mr);
left, right, binary_predicate, detail::join_kind::LEFT_JOIN, stream, mr);
}

std::size_t conditional_left_semi_join_size(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::compute_conditional_join_output_size(left,
right,
binary_predicate,
detail::join_kind::LEFT_SEMI_JOIN,
cudf::get_default_stream(),
mr);
return detail::compute_conditional_join_output_size(
left, right, binary_predicate, detail::join_kind::LEFT_SEMI_JOIN, stream, mr);
}

std::size_t conditional_left_anti_join_size(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::compute_conditional_join_output_size(left,
right,
binary_predicate,
detail::join_kind::LEFT_ANTI_JOIN,
cudf::get_default_stream(),
mr);
return detail::compute_conditional_join_output_size(
left, right, binary_predicate, detail::join_kind::LEFT_ANTI_JOIN, stream, mr);
}

} // namespace cudf
1 change: 0 additions & 1 deletion cpp/src/join/conditional_join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <cudf/ast/expressions.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/join/cross_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>

Expand Down Expand Up @@ -75,10 +74,11 @@ std::unique_ptr<cudf::table> cross_join(cudf::table_view const& left,

std::unique_ptr<cudf::table> cross_join(cudf::table_view const& left,
cudf::table_view const& right,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::cross_join(left, right, cudf::get_default_stream(), mr);
return detail::cross_join(left, right, stream, mr);
}

} // namespace cudf
10 changes: 6 additions & 4 deletions cpp/src/join/join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <cudf/join.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -120,32 +119,35 @@ std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
inner_join(table_view const& left,
table_view const& right,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::inner_join(left, right, compare_nulls, cudf::get_default_stream(), mr);
return detail::inner_join(left, right, compare_nulls, stream, mr);
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
left_join(table_view const& left,
table_view const& right,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::left_join(left, right, compare_nulls, cudf::get_default_stream(), mr);
return detail::left_join(left, right, compare_nulls, stream, mr);
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
full_join(table_view const& left,
table_view const& right,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::full_join(left, right, compare_nulls, cudf::get_default_stream(), mr);
return detail::full_join(left, right, compare_nulls, stream, mr);
}

} // namespace cudf
16 changes: 10 additions & 6 deletions cpp/src/join/mixed_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include <cudf/table/table_device_view.cuh>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

Expand Down Expand Up @@ -484,6 +483,7 @@ mixed_inner_join(
ast::expression const& binary_predicate,
null_equality compare_nulls,
std::optional<std::pair<std::size_t, device_span<size_type const>>> const output_size_data,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
Expand All @@ -495,7 +495,7 @@ mixed_inner_join(
compare_nulls,
detail::join_kind::INNER_JOIN,
output_size_data,
cudf::get_default_stream(),
stream,
mr);
}

Expand All @@ -506,6 +506,7 @@ std::pair<std::size_t, std::unique_ptr<rmm::device_uvector<size_type>>> mixed_in
table_view const& right_conditional,
ast::expression const& binary_predicate,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
Expand All @@ -516,7 +517,7 @@ std::pair<std::size_t, std::unique_ptr<rmm::device_uvector<size_type>>> mixed_in
binary_predicate,
compare_nulls,
detail::join_kind::INNER_JOIN,
cudf::get_default_stream(),
stream,
mr);
}

Expand All @@ -530,6 +531,7 @@ mixed_left_join(
ast::expression const& binary_predicate,
null_equality compare_nulls,
std::optional<std::pair<std::size_t, device_span<size_type const>>> const output_size_data,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
Expand All @@ -541,7 +543,7 @@ mixed_left_join(
compare_nulls,
detail::join_kind::LEFT_JOIN,
output_size_data,
cudf::get_default_stream(),
stream,
mr);
}

Expand All @@ -552,6 +554,7 @@ std::pair<std::size_t, std::unique_ptr<rmm::device_uvector<size_type>>> mixed_le
table_view const& right_conditional,
ast::expression const& binary_predicate,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
Expand All @@ -562,7 +565,7 @@ std::pair<std::size_t, std::unique_ptr<rmm::device_uvector<size_type>>> mixed_le
binary_predicate,
compare_nulls,
detail::join_kind::LEFT_JOIN,
cudf::get_default_stream(),
stream,
mr);
}

Expand All @@ -576,6 +579,7 @@ mixed_full_join(
ast::expression const& binary_predicate,
null_equality compare_nulls,
std::optional<std::pair<std::size_t, device_span<size_type const>>> const output_size_data,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
Expand All @@ -587,7 +591,7 @@ mixed_full_join(
compare_nulls,
detail::join_kind::FULL_JOIN,
output_size_data,
cudf::get_default_stream(),
stream,
mr);
}

Expand Down
Loading

0 comments on commit 69ab988

Please sign in to comment.