Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement concat_json to join JSON strings given by strings column #2457

Merged
merged 46 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
cd235e2
add concat_jsons
karthikeyann Aug 30, 2024
c1c022e
Fix compile error
ttnghia Sep 5, 2024
033f07e
Optimize stream sync
ttnghia Sep 5, 2024
6d86005
Fix interface
ttnghia Sep 5, 2024
02c7636
Add JNI binding
ttnghia Sep 5, 2024
fbe9620
Merge branch 'branch-24.10' into concat_json
ttnghia Sep 6, 2024
ec2e877
Change delimiter Java type
ttnghia Sep 6, 2024
2768577
Fix null mask
ttnghia Sep 6, 2024
081a14e
Return `is_valid` column
ttnghia Sep 6, 2024
c41b947
Separate input and output validity
ttnghia Sep 9, 2024
ccb6b4c
Merge branch 'branch-24.10' into concat_json
ttnghia Sep 19, 2024
ed1b87e
Make changes in cudf's Java code
ttnghia Sep 19, 2024
0cbb8a4
Merge branch 'branch-24.10' into concat_json
ttnghia Sep 25, 2024
13278f0
Merge branch 'branch-24.10' into concat_json
ttnghia Sep 30, 2024
c67a195
Change `delimiter` type from `byte` to `char`, and rewrite docs
ttnghia Sep 30, 2024
0e919d2
Restore source file
ttnghia Sep 30, 2024
652e0ea
Rename file
ttnghia Sep 30, 2024
8470ea1
Add new source file
ttnghia Sep 30, 2024
c19a3df
Add file to cmake
ttnghia Sep 30, 2024
91769c7
Optimize implementation
ttnghia Sep 30, 2024
45a929f
Fix start character
ttnghia Sep 30, 2024
dae0081
Check for white space characters that are not just space character
ttnghia Oct 1, 2024
cab47c1
Check for delimiter if the character is acceptable
ttnghia Oct 1, 2024
e6921f9
Change `not_whitespace`
ttnghia Oct 1, 2024
ed06891
Optimize searching for delimiter in just one kernel call
ttnghia Oct 1, 2024
f5a9ab7
Use existence map instead of histogram
ttnghia Oct 1, 2024
99cf06f
Remove utf8 processing code
ttnghia Oct 1, 2024
f5d4dc2
Fix `num_values`
ttnghia Oct 1, 2024
1c17ef0
Search only for 128 characters
ttnghia Oct 1, 2024
e885863
Fix `is_null_or_empty`
ttnghia Oct 1, 2024
86a01c0
Merge branch 'branch-24.10' into concat_json
ttnghia Oct 1, 2024
5e9e81f
Change back to use `cub::DeviceHistogram::HistogramEven`
ttnghia Oct 1, 2024
a2c4e64
Implement `JSONUtils.makeStructs`
ttnghia Oct 2, 2024
fe1071c
Rename variables and update docs
ttnghia Oct 3, 2024
b0db299
Merge branch 'branch-24.10' into concat_json
ttnghia Oct 3, 2024
2b726f0
Misc
ttnghia Oct 3, 2024
a048801
Add stream sync and extract code into separate functions for profiling
ttnghia Oct 3, 2024
143afc4
Revert "Add stream sync and extract code into separate functions for …
ttnghia Oct 3, 2024
f10e73a
Reorganize code
ttnghia Oct 3, 2024
ddc383e
Revert "Reorganize code"
ttnghia Oct 3, 2024
aab5d51
Misc
ttnghia Oct 3, 2024
bb6fceb
Use one warp per row to improve performance
ttnghia Oct 3, 2024
9af88ca
Optimize write
ttnghia Oct 3, 2024
e982290
Revert "Optimize write"
ttnghia Oct 3, 2024
1417301
Reorganize code
ttnghia Oct 4, 2024
3500afa
Merge branch 'branch-24.12' into concat_json
ttnghia Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ add_library(
src/from_json_to_raw_map.cu
src/get_json_object.cu
src/histogram.cu
src/json_utils.cu
src/murmur_hash.cu
src/parse_uri.cu
src/regex_rewrite_utils.cu
Expand Down
46 changes: 45 additions & 1 deletion src/main/cpp/src/JSONUtilsJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

#include "cudf_jni_apis.hpp"
#include "from_json.hpp"
#include "get_json_object.hpp"
#include "json_utils.hpp"

#include <cudf/strings/strings_column_view.hpp>

Expand Down Expand Up @@ -154,4 +154,48 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_extractRawMap
}
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_concatenateJsonStrings(
JNIEnv* env, jclass, jlong j_input)
{
JNI_NULL_CHECK(env, j_input, "j_input is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const input_cv = reinterpret_cast<cudf::column_view const*>(j_input);
auto [is_valid, joined_strings, delimiter] =
spark_rapids_jni::concat_json(cudf::strings_column_view{*input_cv});

// The output array contains 5 elements:
// [0]: address of the cudf::column object `is_valid` in host memory
// [1]: address of data buffer of the concatenated strings in device memory
// [2]: data length
// [3]: address of the rmm::device_buffer object (of the concatenated strings) in host memory
// [4]: delimiter char
auto out_handles = cudf::jni::native_jlongArray(env, 5);
out_handles[0] = reinterpret_cast<jlong>(is_valid.release());
out_handles[1] = reinterpret_cast<jlong>(joined_strings->data());
out_handles[2] = static_cast<jlong>(joined_strings->size());
out_handles[3] = reinterpret_cast<jlong>(joined_strings.release());
out_handles[4] = static_cast<jlong>(delimiter);
return out_handles.get_jArray();
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_makeStructs(
JNIEnv* env, jclass, jlongArray j_children, jlong j_is_null)
{
JNI_NULL_CHECK(env, j_children, "j_children is null", 0);
JNI_NULL_CHECK(env, j_is_null, "j_is_null is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const children =
cudf::jni::native_jpointerArray<cudf::column_view>{env, j_children}.get_dereferenced();
auto const is_null = *reinterpret_cast<cudf::column_view const*>(j_is_null);
return cudf::jni::ptr_as_jlong(spark_rapids_jni::make_structs(children, is_null).release());
}
CATCH_STD(env, 0);
}
}
275 changes: 275 additions & 0 deletions src/main/cpp/src/json_utils.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/strings/detail/combine.hpp>
#include <cudf/strings/string_view.cuh>
#include <cudf/strings/strings_column_view.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cub/device/device_histogram.cuh>
#include <thrust/find.h>
#include <thrust/functional.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/transform.h>
#include <thrust/tuple.h>
#include <thrust/uninitialized_fill.h>

namespace spark_rapids_jni {

namespace detail {

namespace {

constexpr bool not_whitespace(cudf::char_utf8 ch)
{
return ch != ' ' && ch != '\r' && ch != '\n' && ch != '\t';
}

constexpr bool can_be_delimiter(char c)
{
// The character list below is from `json_reader_options.set_delimiter`.
switch (c) {
case '{':
case '[':
case '}':
case ']':
case ',':
case ':':
case '"':
case '\'':
case '\\':
case ' ':
case '\t':
case '\r': return false;
default: return true;
}
}

} // namespace

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
cudf::strings_column_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
auto const default_mr = rmm::mr::get_current_device_resource();

// Check if the input rows are either null, equal to `null` string literal, or empty.
// This will be used for masking out the input when doing string concatenation.
rmm::device_uvector<bool> is_valid_input(input.size(), stream, default_mr);

// Check if the input rows are either null or empty.
// This will be returned to the caller.
rmm::device_uvector<bool> is_null_or_empty(input.size(), stream, mr);

thrust::for_each(
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0L),
thrust::make_counting_iterator(input.size() * static_cast<int64_t>(cudf::detail::warp_size)),
[input = *d_input_ptr,
output = thrust::make_zip_iterator(thrust::make_tuple(
is_valid_input.begin(), is_null_or_empty.begin()))] __device__(int64_t tidx) {
// Execute one warp per row to minimize thread divergence.
if ((tidx % cudf::detail::warp_size) != 0) { return; }
auto const idx = tidx / cudf::detail::warp_size;

if (input.is_null(idx)) {
output[idx] = thrust::make_tuple(false, true);
return;
}

auto const d_str = input.element<cudf::string_view>(idx);
auto const size = d_str.size_bytes();
int i = 0;
char ch;

// Skip the very first whitespace characters.
for (; i < size; ++i) {
ch = d_str[i];
if (not_whitespace(ch)) { break; }
}

if (i + 3 < size &&
(d_str[i] == 'n' && d_str[i + 1] == 'u' && d_str[i + 2] == 'l' && d_str[i + 3] == 'l')) {
i += 4;

// Skip the very last whitespace characters.
bool is_null_literal{true};
for (; i < size; ++i) {
ch = d_str[i];
if (not_whitespace(ch)) {
is_null_literal = false;
break;
}
}

// The current row contains only `null` string literal and not any other non-whitespace
// characters. Such rows need to be masked out as null when doing concatenation.
if (is_null_literal) {
output[idx] = thrust::make_tuple(false, false);
return;
}
}

auto const not_eol = i < size;

// If the current row is not null or empty, it should start with `{`. Otherwise, we need to
// replace it by a null. This is necessary for libcudf's JSON reader to work.
// Note that if we want to support ARRAY schema, we need to check for `[` instead.
auto constexpr start_character = '{';
if (not_eol && ch != start_character) {
output[idx] = thrust::make_tuple(false, false);
return;
}

output[idx] = thrust::make_tuple(not_eol, !not_eol);
});

auto constexpr num_levels = 256;
auto constexpr lower_level = std::numeric_limits<char>::min();
auto constexpr upper_level = std::numeric_limits<char>::max();
auto const num_chars = input.chars_size(stream);

rmm::device_uvector<uint32_t> histogram(num_levels, stream, default_mr);
thrust::uninitialized_fill(
rmm::exec_policy_nosync(stream), histogram.begin(), histogram.end(), 0);

size_t temp_storage_bytes = 0;
cub::DeviceHistogram::HistogramEven(nullptr,
temp_storage_bytes,
input.chars_begin(stream),
histogram.begin(),
num_levels,
lower_level,
upper_level,
num_chars,
stream.value());
rmm::device_buffer d_temp(temp_storage_bytes, stream);
cub::DeviceHistogram::HistogramEven(d_temp.data(),
temp_storage_bytes,
input.chars_begin(stream),
histogram.begin(),
num_levels,
lower_level,
upper_level,
num_chars,
stream.value());

auto const it = thrust::make_counting_iterator(0);
auto const zero_level_idx = -lower_level; // the bin storing count for character `\0`
auto const zero_level_it = it + zero_level_idx;
auto const end = it + num_levels;

auto const first_zero_count_pos =
thrust::find_if(rmm::exec_policy_nosync(stream),
zero_level_it, // ignore the negative characters
end,
[zero_level_idx, counts = histogram.begin()] __device__(auto idx) -> bool {
auto const count = counts[idx];
if (count > 0) { return false; }
auto const first_non_existing_char = static_cast<char>(idx - zero_level_idx);
return can_be_delimiter(first_non_existing_char);
});

// This should never happen since the input should never cover the entire char range.
if (first_zero_count_pos == end) {
throw std::logic_error(
"Cannot find any character suitable as delimiter during joining json strings.");
}
auto const delimiter = static_cast<char>(thrust::distance(zero_level_it, first_zero_count_pos));

auto [null_mask, null_count] = cudf::detail::valid_if(
is_valid_input.begin(), is_valid_input.end(), thrust::identity{}, stream, default_mr);
// If the null count doesn't change, that mean we do not have any rows containing `null` string
// literal or empty rows. In such cases, just use the input column for concatenation.
auto const input_applied_null =
null_count == input.null_count()
? cudf::column_view{}
: cudf::column_view{cudf::data_type{cudf::type_id::STRING},
input.size(),
input.chars_begin(stream),
reinterpret_cast<cudf::bitmask_type const*>(null_mask.data()),
null_count,
0,
std::vector<cudf::column_view>{input.offsets()}};

auto concat_strings = cudf::strings::detail::join_strings(
null_count == input.null_count() ? input : cudf::strings_column_view{input_applied_null},
cudf::string_scalar(std::string(1, delimiter), true, stream, default_mr),
cudf::string_scalar("{}", true, stream, default_mr),
stream,
mr);

return {std::make_unique<cudf::column>(std::move(is_null_or_empty), rmm::device_buffer{}, 0),
std::move(concat_strings->release().data),
delimiter};
}

std::unique_ptr<cudf::column> make_structs(std::vector<cudf::column_view> const& children,
cudf::column_view const& is_null,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (children.size() == 0) { return nullptr; }

auto const row_count = children.front().size();
for (auto const& col : children) {
CUDF_EXPECTS(col.size() == row_count, "All columns must have the same number of rows.");
}

auto const [null_mask, null_count] = cudf::detail::valid_if(
is_null.begin<bool>(), is_null.end<bool>(), thrust::logical_not{}, stream, mr);

auto const structs =
cudf::column_view(cudf::data_type{cudf::type_id::STRUCT},
row_count,
nullptr,
reinterpret_cast<cudf::bitmask_type const*>(null_mask.data()),
null_count,
0,
children);
return std::make_unique<cudf::column>(structs, stream, mr);
}

} // namespace detail

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
cudf::strings_column_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::concat_json(input, stream, mr);
}

std::unique_ptr<cudf::column> make_structs(std::vector<cudf::column_view> const& children,
cudf::column_view const& is_null,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::make_structs(children, is_null, stream, mr);
}

} // namespace spark_rapids_jni
12 changes: 12 additions & 0 deletions src/main/cpp/src/from_json.hpp → src/main/cpp/src/json_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cudf/utilities/default_stream.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/resource_ref.hpp>

#include <memory>
Expand All @@ -31,4 +32,15 @@ std::unique_ptr<cudf::column> from_json_to_raw_map(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
cudf::strings_column_view const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

std::unique_ptr<cudf::column> make_structs(
std::vector<cudf::column_view> const& input,
cudf::column_view const& is_null,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

} // namespace spark_rapids_jni
Loading
Loading