Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added batch memset to memset data and validity buffers in parquet reader #16281

Merged
merged 42 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
f0e8558
Initital Commit
sdrp713 Jun 25, 2024
812ca51
Added Null Memsest
sdrp713 Jun 27, 2024
078a451
Update branch
sdrp713 Jul 8, 2024
5c2aa8b
Update tests
sdrp713 Jul 9, 2024
36a0670
Added template
sdrp713 Jul 12, 2024
b449207
Fix template
sdrp713 Jul 12, 2024
810e38e
Added rebase
sdrp713 Jul 12, 2024
c5c94ea
PR Push
sdrp713 Jul 15, 2024
48d19c1
Added cub Batch Function
sdrp713 Jul 17, 2024
34c0024
Merge branch 'branch-24.08' into multibuffermemset
mhaseeb123 Jul 23, 2024
980ca9b
Added PR Fixes
sdrp713 Jul 23, 2024
eef5473
Merge branch 'multibuffermemset' of github.com:sdrp713/cudf into mult…
sdrp713 Jul 23, 2024
3df0cd8
Fix PR tests
sdrp713 Jul 24, 2024
59333a1
Formatting and Assertion Fix
sdrp713 Jul 24, 2024
163400f
Initital Commit
sdrp713 Jun 25, 2024
23ef866
Added Null Memsest
sdrp713 Jun 27, 2024
1b9a47d
Update branch
sdrp713 Jul 8, 2024
d9aded0
Update tests
sdrp713 Jul 9, 2024
8c058ab
Added template
sdrp713 Jul 12, 2024
07062bc
Fix template
sdrp713 Jul 12, 2024
a1992be
Added rebase
sdrp713 Jul 12, 2024
82b530f
PR Push
sdrp713 Jul 15, 2024
10f62b7
Added cub Batch Function
sdrp713 Jul 17, 2024
f345e61
Added PR Fixes
sdrp713 Jul 23, 2024
4e6768a
Fix PR tests
sdrp713 Jul 24, 2024
692b61d
Formatting and Assertion Fix
sdrp713 Jul 24, 2024
a02b5fb
Merge branch 'multibuffermemset' of github.com:sdrp713/cudf into mult…
sdrp713 Jul 24, 2024
2869b5e
Formatting Fixes
sdrp713 Jul 24, 2024
f68779d
Merge branch 'branch-24.10' into multibuffermemset
sdrp713 Jul 24, 2024
dad2cdf
Mem Resource Fix
sdrp713 Jul 25, 2024
a09442c
Iterator Fix
sdrp713 Jul 25, 2024
bd877a5
Final PR Fixes
sdrp713 Jul 29, 2024
7c70358
Merge branch 'branch-24.10' into multibuffermemset
mhaseeb123 Jul 29, 2024
4da4578
Namespace Fix
sdrp713 Jul 29, 2024
9229a1a
Merge branch 'branch-24.10' into multibuffermemset
mhaseeb123 Jul 30, 2024
5d292ef
Style Fix
sdrp713 Jul 30, 2024
478b088
Argument Fixes
sdrp713 Jul 31, 2024
81e94ce
Format Fixes
sdrp713 Jul 31, 2024
a64481f
Added Template
sdrp713 Aug 2, 2024
c7af590
Type Fix
sdrp713 Aug 5, 2024
4d0497b
Small Type Fixes
sdrp713 Aug 5, 2024
65dda74
Merge branch 'branch-24.10' into multibuffermemset
mhaseeb123 Aug 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ add_library(
src/io/utilities/base64_utilities.cpp
src/io/utilities/column_buffer.cpp
src/io/utilities/column_buffer_strings.cu
src/io/utilities/batched_memset.cu
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
src/io/utilities/config_utils.cpp
src/io/utilities/data_casting.cu
src/io/utilities/data_sink.cpp
Expand Down
5 changes: 5 additions & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ ConfigureNVBench(JSON_READER_NVBENCH io/json/nested_json.cpp io/json/json_reader
ConfigureNVBench(JSON_READER_OPTION_NVBENCH io/json/json_reader_option.cpp)
ConfigureNVBench(JSON_WRITER_NVBENCH io/json/json_writer.cpp)

# ##################################################################################################
# * multi buffer memset benchmark
# ----------------------------------------------------------------------
ConfigureNVBench(BATCHED_MEMSET_BENCH io/utilities/batched_memset_bench.cpp)

# ##################################################################################################
# * io benchmark ---------------------------------------------------------------------
ConfigureNVBench(MULTIBYTE_SPLIT_NVBENCH io/text/multibyte_split.cpp)
Expand Down
102 changes: 102 additions & 0 deletions cpp/benchmarks/io/utilities/batched_memset_bench.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/io/cuio_common.hpp>
#include <benchmarks/io/nvbench_helpers.hpp>

#include <cudf/io/parquet.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <nvbench/nvbench.cuh>
#include <src/io/utilities/batched_memset.hpp>

// Size of the data in the benchmark dataframe; chosen to be low enough to allow benchmarks to
// run on most GPUs, but large enough to allow highest throughput
constexpr size_t data_size = 512 << 20;

void parquet_read_common(cudf::size_type num_rows_to_read,
cudf::size_type num_cols_to_read,
cuio_source_sink_pair& source_sink,
nvbench::state& state)
{
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_sink.make_source_info());

auto mem_stats_logger = cudf::memory_stats_logger();
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
try_drop_l3_cache();

timer.start();
auto const result = cudf::io::read_parquet(read_opts);
timer.stop();

CUDF_EXPECTS(result.tbl->num_columns() == num_cols_to_read, "Unexpected number of columns");
CUDF_EXPECTS(result.tbl->num_rows() == num_rows_to_read, "Unexpected number of rows");
});

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

template <data_type DataType>
void bench_batched_memset(nvbench::state& state, nvbench::type_list<nvbench::enum_type<DataType>>)
{
auto const d_type = get_type_or_group(static_cast<int32_t>(DataType));
auto const num_cols = static_cast<cudf::size_type>(state.get_int64("num_cols"));
auto const cardinality = static_cast<cudf::size_type>(state.get_int64("cardinality"));
auto const run_length = static_cast<cudf::size_type>(state.get_int64("run_length"));
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));
auto const compression = cudf::io::compression_type::NONE;
cuio_source_sink_pair source_sink(source_type);
auto const tbl =
create_random_table(cycle_dtypes(d_type, num_cols),
table_size_bytes{data_size},
data_profile_builder().cardinality(cardinality).avg_run_length(run_length));
auto const view = tbl->view();

cudf::io::parquet_writer_options write_opts =
cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view)
.compression(compression);
cudf::io::write_parquet(write_opts);
auto const num_rows = view.num_rows();

parquet_read_common(num_rows, num_cols, source_sink, state);
}

using d_type_list = nvbench::enum_type_list<data_type::INTEGRAL,
data_type::FLOAT,
data_type::DECIMAL,
data_type::TIMESTAMP,
data_type::DURATION,
data_type::STRING,
data_type::LIST,
data_type::STRUCT>;

NVBENCH_BENCH_TYPES(bench_batched_memset, NVBENCH_TYPE_AXES(d_type_list))
.set_name("batched_memset")
.set_type_axes_names({"data_type"})
.add_int64_axis("num_cols", {1000})
.add_string_axis("io_type", {"DEVICE_BUFFER"})
.set_min_samples(4)
.add_int64_axis("cardinality", {0, 1000})
.add_int64_axis("run_length", {1, 32});
36 changes: 32 additions & 4 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include <thrust/transform_scan.h>
#include <thrust/unique.h>

#include <io/utilities/batched_memset.hpp>

#include <bitset>
#include <numeric>

Expand Down Expand Up @@ -1494,6 +1496,11 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
// buffers if they are not part of a list hierarchy. mark down
// if we have any list columns that need further processing.
bool has_lists = false;
// Using uin64_t as we are relying on the allocation padding for the buffers to be large enough
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
// that we can always write a multiple of 8 byte words
std::vector<cudf::device_span<uint64_t>> memset_bufs;
std::vector<cudf::device_span<uint64_t>> nullmask_bufs;
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved

for (size_t idx = 0; idx < _input_columns.size(); idx++) {
auto const& input_col = _input_columns[idx];
size_t const max_depth = input_col.nesting_depth();
Expand All @@ -1514,13 +1521,22 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(
out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows,
cudf::mask_state::ALL_VALID,
cudf::mask_state::UNINITIALIZED,
_stream,
_mr);
_mr,
false);
// Using uin64_t is safe as we are relying on the allocation padding for the buffers to be
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
// large enough that we can always write a multiple of 8 byte words
memset_bufs.push_back(cudf::device_span<uint64_t>(
static_cast<uint64_t*>(out_buf.data()),
cudf::util::round_up_safe(out_buf.data_size(), sizeof(uint64_t)) / sizeof(uint64_t)));
nullmask_bufs.push_back(cudf::device_span<uint64_t>(
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
reinterpret_cast<uint64_t*>(out_buf.null_mask()),
cudf::util::round_up_safe(out_buf.null_mask_size(), sizeof(uint64_t)) /
sizeof(uint64_t)));
}
}
}

// compute output column sizes by examining the pages of the -input- columns
if (has_lists) {
auto h_cols_info =
Expand Down Expand Up @@ -1593,11 +1609,23 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num

// allocate
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(size, cudf::mask_state::ALL_VALID, _stream, _mr);
out_buf.create_with_mask(size, cudf::mask_state::UNINITIALIZED, _stream, _mr, false);
// Using uin64_t is safe as we are relying on the allocation padding for the buffers to be
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
// large enough that we can always write a multiple of 8 byte words
memset_bufs.push_back(cudf::device_span<uint64_t>(
static_cast<uint64_t*>(out_buf.data()),
cudf::util::round_up_safe(out_buf.data_size(), sizeof(uint64_t)) / sizeof(uint64_t)));
nullmask_bufs.push_back(cudf::device_span<uint64_t>(
reinterpret_cast<uint64_t*>(out_buf.null_mask()),
cudf::util::round_up_safe(out_buf.null_mask_size(), sizeof(uint64_t)) /
sizeof(uint64_t)));
}
}
}
}
batched_memset(memset_bufs, 0UL, _stream);
// Need to set null mask bufs to all high bits
batched_memset(nullmask_bufs, ULLONG_MAX, _stream);
}

std::vector<size_t> reader::impl::calculate_page_string_offsets()
Expand Down
66 changes: 66 additions & 0 deletions cpp/src/io/utilities/batched_memset.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>

#include <rmm/device_buffer.hpp>

#include <cub/device/device_copy.cuh>
#include <cuda/functional>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/transform.h>

#include <io/utilities/batched_memset.hpp>

void batched_memset(std::vector<cudf::device_span<uint64_t>>& bufs,
uint64_t const value,
rmm::cuda_stream_view stream)
{
// define task and bytes parameters
auto const num_bufs = bufs.size();

// copy bufs into gpu and then get sizes from there (cudf detail function make device vector
// async)
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
auto gpu_bufs =
cudf::detail::make_device_uvector_async(bufs, stream, rmm::mr::get_current_device_resource());

// get a vector with the sizes of all buffers
auto sizes = cudf::detail::make_counting_transform_iterator(
0,
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
cuda::proclaim_return_type<std::size_t>(
[gpu_bufs = gpu_bufs.data()] __device__(cudf::size_type i) { return gpu_bufs[i].size(); }));

// get a iterator with a constant value to memset
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
auto iter_in = thrust::make_constant_iterator(thrust::make_constant_iterator(value));

// get a iterator pointing to each device span
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
auto iter_out = thrust::make_transform_iterator(
thrust::counting_iterator<uint64_t>(0),
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
cuda::proclaim_return_type<uint64_t*>(
[gpu_bufs = gpu_bufs.data()] __device__(cudf::size_type i) { return gpu_bufs[i].data(); }));
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved

size_t temp_storage_bytes = 0;

cub::DeviceCopy::Batched(nullptr, temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream);

rmm::device_buffer d_temp_storage(
temp_storage_bytes, stream, rmm::mr::get_current_device_resource());

cub::DeviceCopy::Batched(
d_temp_storage.data(), temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream);
}
33 changes: 33 additions & 0 deletions cpp/src/io/utilities/batched_memset.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <rmm/cuda_stream_view.hpp>
#include <rmm/resource_ref.hpp>

/**
* @brief A helper function that takes in a vector of device spans and memsets them to the
* value provided using batches sent to the GPU.
*
* @param bufs Vector with device spans of data
* @param value Value to memset all device spans to
* @param _stream Stream used for device memory operations and kernel launches
* @param _mr Device memory resource used to allocate the returned column's device memory
*
* @return The data in device spans all set to value
*/
void batched_memset(std::vector<cudf::device_span<uint64_t>>& bufs,
uint64_t const value,
rmm::cuda_stream_view stream);
25 changes: 16 additions & 9 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

namespace cudf::io::detail {

void gather_column_buffer::allocate_strings_data(rmm::cuda_stream_view stream)
void gather_column_buffer::allocate_strings_data(rmm::cuda_stream_view stream, bool memset_data)
sdrp713 marked this conversation as resolved.
Show resolved Hide resolved
{
CUDF_EXPECTS(type.id() == type_id::STRING, "allocate_strings_data called for non-string column");
// The contents of _strings will never be directly returned to the user.
Expand All @@ -56,11 +56,12 @@ std::unique_ptr<column> gather_column_buffer::make_string_column_impl(rmm::cuda_
return make_strings_column(*_strings, stream, _mr);
}

void cudf::io::detail::inline_column_buffer::allocate_strings_data(rmm::cuda_stream_view stream)
void cudf::io::detail::inline_column_buffer::allocate_strings_data(rmm::cuda_stream_view stream,
bool memset_data)
{
CUDF_EXPECTS(type.id() == type_id::STRING, "allocate_strings_data called for non-string column");
// size + 1 for final offset. _string_data will be initialized later.
_data = create_data(data_type{type_id::INT32}, size + 1, stream, _mr);
_data = create_data(data_type{type_id::INT32}, size + 1, stream, _mr, memset_data);
}

void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes,
Expand Down Expand Up @@ -94,22 +95,27 @@ template <class string_policy>
void column_buffer_base<string_policy>::create_with_mask(size_type _size,
cudf::mask_state null_mask_state,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
rmm::device_async_resource_ref mr,
bool memset_data)
{
size = _size;
_mr = mr;

switch (type.id()) {
case type_id::STRING: static_cast<string_policy*>(this)->allocate_strings_data(stream); break;
case type_id::STRING:
static_cast<string_policy*>(this)->allocate_strings_data(stream, memset_data);
break;

// list columns store a buffer of int32's as offsets to represent
// their individual rows
case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, _mr); break;
case type_id::LIST:
_data = create_data(data_type{type_id::INT32}, size, stream, _mr, memset_data);
break;

// struct columns store no data themselves. just validity and children.
case type_id::STRUCT: break;

default: _data = create_data(type, size, stream, _mr); break;
default: _data = create_data(type, size, stream, _mr, memset_data); break;
}
if (is_nullable) {
_null_mask =
Expand All @@ -120,9 +126,10 @@ void column_buffer_base<string_policy>::create_with_mask(size_type _size,
template <class string_policy>
void column_buffer_base<string_policy>::create(size_type _size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
rmm::device_async_resource_ref mr,
bool memset_data)
{
create_with_mask(_size, mask_state::ALL_NULL, stream, mr);
create_with_mask(_size, mask_state::ALL_NULL, stream, mr, memset_data);
}

template <class string_policy>
Expand Down
Loading
Loading