Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global stream pool #13922

Merged
merged 146 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 139 commits
Commits
Show all changes
146 commits
Select commit Hold shift + click to select a range
5e9cf26
add DELTA_BINARY_PACKED decoder
etseidl Jun 26, 2023
9326321
start merging in changes from #13622
etseidl Jun 27, 2023
ee7511d
get reduce working on device
etseidl Jun 27, 2023
2cafe62
use functor for transform iterator
etseidl Jun 27, 2023
219ff0b
change filter functors to use kernel_mask
etseidl Jun 27, 2023
0e181a8
pull in changes from #13622
etseidl Jun 28, 2023
129d9ab
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jun 28, 2023
2b5a25d
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jun 28, 2023
b58e55c
use less shared memory for delta binary decoder
etseidl Jun 28, 2023
c63e503
Merge branch 'feature/delta_binary' of github.com:etseidl/cudf into f…
etseidl Jun 28, 2023
2c5e087
spelling
etseidl Jun 28, 2023
996893e
change encoding to unsupported type
etseidl Jun 29, 2023
f1f74dc
add python test of delta parser
etseidl Jun 29, 2023
639b8ab
test delta with nulls
etseidl Jun 29, 2023
02cd2be
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jun 29, 2023
9871d66
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jun 29, 2023
7debd29
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jun 29, 2023
c1bbb84
add comments to skip_values and decode_batch
etseidl Jun 30, 2023
8e66a08
revert east volatile changes
etseidl Jun 30, 2023
7b09c4f
update doc string
etseidl Jul 7, 2023
5e05872
Merge branch 'feature/delta_binary' of github.com:etseidl/cudf into f…
etseidl Jul 7, 2023
6576aa3
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jul 7, 2023
1d6efbc
Merge branch 'branch-23.08' into feature/delta_binary
etseidl Jul 12, 2023
334fb53
Merge branch 'branch-23.08' into feature/delta_binary
vuule Jul 13, 2023
d18dabf
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jul 17, 2023
05eb40f
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jul 18, 2023
a957ecc
Merge branch 'branch-23.08' into feature/delta_binary
etseidl Jul 19, 2023
9b636c7
fix for header location
etseidl Jul 19, 2023
6a53d43
Merge branch 'rapidsai:branch-23.08' into feature/delta_binary
etseidl Jul 19, 2023
ceb22ab
fix some short-circuit logic
etseidl Jul 19, 2023
c00be0b
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Jul 25, 2023
e30aa11
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Jul 26, 2023
7ef4be2
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Jul 26, 2023
2410e47
Merge remote-tracking branch 'origin/branch-23.10' into feature/delta…
etseidl Jul 26, 2023
4aa783d
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Jul 28, 2023
94afb8d
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Jul 28, 2023
12b9bab
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Jul 31, 2023
a6f7957
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 1, 2023
b7dbf47
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Aug 2, 2023
ea49a23
rename function
etseidl Aug 2, 2023
3a9f186
clean up kernel_mask_for_page()
etseidl Aug 2, 2023
d7671d7
remove TODO
etseidl Aug 2, 2023
7084d89
add some documentation to kernel_mask_bits
etseidl Aug 2, 2023
f60bc1c
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Aug 2, 2023
9ebeca9
use rand_dataframe() to produce test data
etseidl Aug 3, 2023
07e73ac
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 3, 2023
410ab67
Merge remote-tracking branch 'origin/branch-23.10' into feature/delta…
etseidl Aug 3, 2023
f779455
Merge branch 'branch-23.10' into feature/delta_binary
vuule Aug 3, 2023
761393f
formatting
etseidl Aug 4, 2023
ced688d
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 4, 2023
4846032
implement suggestion from review
etseidl Aug 4, 2023
d615625
more suggestions from review
etseidl Aug 4, 2023
c1ce34c
Merge remote-tracking branch 'origin/branch-23.10' into feature/delta…
etseidl Aug 4, 2023
835e866
restore old unrolled loop for testing
etseidl Aug 5, 2023
6353a4a
add note to revisit bit unpacker with delta_byte_array
etseidl Aug 7, 2023
9ffea01
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 7, 2023
0904418
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Aug 7, 2023
616d3fc
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Aug 8, 2023
e924a97
fix and test int8 and int16 handling
etseidl Aug 8, 2023
d0bf0cd
fix for single row files
etseidl Aug 8, 2023
d3b0c09
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 9, 2023
3c83b89
Merge branch 'branch-23.10' into feature/delta_binary
vuule Aug 9, 2023
5ac20a0
clean up some docstrings
etseidl Aug 9, 2023
9914e1f
Merge remote-tracking branch 'origin/branch-23.10' into feature/delta…
etseidl Aug 14, 2023
7d077f7
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 14, 2023
62e0493
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 15, 2023
5725c61
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Aug 16, 2023
0ab8c15
Merge branch 'branch-23.10' into feature/delta_binary
etseidl Aug 17, 2023
b404de7
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 17, 2023
1d137ec
Apply suggestions from code review
etseidl Aug 17, 2023
6cd3e00
fix docstring
etseidl Aug 17, 2023
a774ac1
need to pass num_threads as template param to make constexpr
etseidl Aug 17, 2023
60b45b3
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 17, 2023
fb44e80
Merge branch 'rapidsai:branch-23.10' into feature/delta_binary
etseidl Aug 18, 2023
16532e4
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Aug 18, 2023
d7112a5
refactor stream pool
etseidl Aug 18, 2023
637019e
Merge branch 'draft_stream_pool' of github.com:etseidl/cudf into draf…
etseidl Aug 18, 2023
6b48f80
Merge remote-tracking branch 'origin/branch-23.10' into draft_stream_…
etseidl Aug 23, 2023
6384f89
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Aug 23, 2023
71ece73
checkpoint
etseidl Aug 24, 2023
db1d08d
remove comment
etseidl Aug 24, 2023
f233870
compiles now
etseidl Aug 24, 2023
384c7ee
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Aug 24, 2023
301e596
update some comments
etseidl Aug 24, 2023
e3cfa89
implement Vukasin's idea for making the pool extensible
etseidl Aug 24, 2023
e74149f
more api jiggering
etseidl Aug 24, 2023
0d946e8
clean up some
etseidl Aug 24, 2023
0c1faed
stub in docstring
etseidl Aug 24, 2023
a31056c
move get_stream_pool_size into object
etseidl Aug 24, 2023
576230a
forgot some overrides
etseidl Aug 24, 2023
21b4443
pass host_span to fork/join_streams
etseidl Aug 24, 2023
3f3c5b5
add to TODO
etseidl Aug 24, 2023
40d53e7
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Aug 25, 2023
10cb2b4
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Aug 25, 2023
e0ba2a9
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Aug 26, 2023
83d8710
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Aug 26, 2023
b8fddcc
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Aug 27, 2023
3059d94
use static events and disable timing per suggestion from review
etseidl Aug 29, 2023
1ad44db
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Aug 29, 2023
bc62b92
add Vukasins per-thread-default-event implmenentation
etseidl Aug 29, 2023
4c2c17b
remove static from fork/join events
etseidl Aug 29, 2023
06d2a75
replace event map with thread_local event struct
etseidl Aug 29, 2023
ddfc118
move stream pool to cudf::detail
etseidl Aug 29, 2023
4b00031
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Aug 29, 2023
ac55b7e
start cleaning up docstrings
etseidl Aug 30, 2023
10643c4
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Aug 30, 2023
4455230
use new stream pool in multibyte_split
etseidl Aug 30, 2023
274d4f3
Merge remote-tracking branch 'origin/branch-23.10' into draft_stream_…
etseidl Aug 30, 2023
13cd266
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Aug 30, 2023
b5c55bb
forgot to get rid of rmm stream pool
etseidl Aug 30, 2023
f5ff4d0
add more documentation
etseidl Aug 31, 2023
1dc75d6
fix formatting
etseidl Aug 31, 2023
9774635
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Sep 1, 2023
1ceaedf
remove mutex from get_streams()
etseidl Sep 1, 2023
8b316ba
change fork_streams as suggested in review
etseidl Sep 1, 2023
d538be9
hide the actual stream pool
etseidl Sep 2, 2023
f8af2b6
add TODO
etseidl Sep 2, 2023
05edba5
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Sep 4, 2023
2346cb1
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Sep 5, 2023
d7d30f9
rename fork_stream
etseidl Sep 5, 2023
64b4e42
add some documentation
etseidl Sep 5, 2023
9826366
can use std::for_each again
etseidl Sep 5, 2023
2a20067
fix typo
etseidl Sep 6, 2023
4ed8082
add alias stream_id_t
etseidl Sep 6, 2023
272d883
change stream count to size_t
etseidl Sep 6, 2023
8b4f15d
wrap cudaEventDestroy in a debug-only assert
etseidl Sep 6, 2023
457c6fb
modify event_for_thread() to take into account multiple devices
etseidl Sep 6, 2023
04dbba5
per-device stream pools
etseidl Sep 6, 2023
ded5900
use size_t for fork_stream too
etseidl Sep 6, 2023
1c5ae32
rename stream_id_t to stream_id_type
etseidl Sep 6, 2023
5ffc75a
add some more docstrings
etseidl Sep 6, 2023
0a7035f
implement suggestion from review
etseidl Sep 6, 2023
9fb0958
more docstring cleanup
etseidl Sep 6, 2023
6644b48
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Sep 7, 2023
113e66f
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Sep 7, 2023
2629a79
fork_streams is back on the menu
etseidl Sep 7, 2023
87626b6
add nodiscard to fork_streams
etseidl Sep 7, 2023
f81d13e
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Sep 8, 2023
614f352
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Sep 8, 2023
560c03c
Apply suggestions from code review
etseidl Sep 8, 2023
80ab1f5
Merge remote-tracking branch 'origin/branch-23.10' into draft_stream_…
etseidl Sep 8, 2023
46c98f4
Merge branch 'rapidsai:branch-23.10' into draft_stream_pool
etseidl Sep 9, 2023
57bd1b3
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Sep 11, 2023
afd71f9
add consts per review comments
etseidl Sep 12, 2023
a449ab5
Merge branch 'branch-23.10' into draft_stream_pool
etseidl Sep 12, 2023
d107e32
Merge branch 'branch-23.10' into draft_stream_pool
harrism Sep 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ outputs:
- test -f $PREFIX/include/cudf/detail/utilities/logger.hpp
- test -f $PREFIX/include/cudf/detail/utilities/pinned_host_vector.hpp
- test -f $PREFIX/include/cudf/detail/utilities/stacktrace.hpp
- test -f $PREFIX/include/cudf/detail/utilities/stream_pool.hpp
- test -f $PREFIX/include/cudf/detail/utilities/vector_factories.hpp
- test -f $PREFIX/include/cudf/detail/utilities/visitor_overload.hpp
- test -f $PREFIX/include/cudf/dictionary/detail/concatenate.hpp
Expand Down
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ add_library(
src/utilities/linked_column.cpp
src/utilities/logger.cpp
src/utilities/stacktrace.cpp
src/utilities/stream_pool.cpp
src/utilities/traits.cpp
src/utilities/type_checks.cpp
src/utilities/type_dispatcher.cpp
Expand Down
61 changes: 61 additions & 0 deletions cpp/include/cudf/detail/utilities/stream_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>

etseidl marked this conversation as resolved.
Show resolved Hide resolved
namespace cudf::detail {

/**
* @brief Acquire a set of `cuda_stream_view` objects and synchronize them to an event on another
* stream.
*
* By default an underlying `rmm::cuda_stream_pool` is used to obtain the streams. The only other
* implementation at present is a debugging version that always returns the stream returned by
* `cudf::get_default_stream()`. To use this debugging version, set the environment variable
* `LIBCUDF_USE_DEBUG_STREAM_POOL`.
*
* Example usage:
* @code{.cpp}
* auto stream = cudf::get_default_stream();
* auto const num_streams = 2;
* // do work on stream
* // allocate streams and wait for an event on stream before executing on any of streams
* auto streams = cudf::detail::fork_stream(stream, num_streams);
* // do work on streams[0] and streams[1]
* // wait for event on streams before continuing to do work on stream
* cudf::detail::join_streams(streams, stream);
* @endcode
*
* @param stream Stream that the returned streams will wait on.
* @param count The number of `cuda_stream_view` objects to return.
* @return Vector containing `count` stream views.
*/
[[nodiscard]] std::vector<rmm::cuda_stream_view> fork_streams(rmm::cuda_stream_view stream,
std::size_t count);

/**
* @brief Synchronize a stream to an event on a set of streams.
*
* @param streams Streams to wait on.
* @param stream Joined stream that synchronizes with the waited-on streams.
*/
void join_streams(host_span<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream);
harrism marked this conversation as resolved.
Show resolved Hide resolved

} // namespace cudf::detail
43 changes: 13 additions & 30 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,15 @@

#include <cudf/detail/stream_compaction.hpp>
#include <cudf/detail/transform.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <rmm/cuda_stream_pool.hpp>

#include <bitset>
#include <numeric>

namespace cudf::io::detail::parquet {

namespace {

int constexpr NUM_DECODERS = 3; // how many decode kernels are there to run
int constexpr APPROX_NUM_THREADS = 4; // guestimate from DaveB
int constexpr STREAM_POOL_SIZE = NUM_DECODERS * APPROX_NUM_THREADS;

auto& get_stream_pool()
{
// TODO: creating this on the heap because there were issues with trying to call the
// stream pool destructor during cuda shutdown that lead to a segmentation fault in
// nvbench. this allocation is being deliberately leaked to avoid the above, but still
// results in non-fatal warnings when running nvbench in cuda-gdb.
static auto pool = new rmm::cuda_stream_pool{STREAM_POOL_SIZE};
return *pool;
}

} // namespace

void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
{
auto& chunks = _file_itm_data.chunks;
Expand Down Expand Up @@ -178,34 +162,33 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunks.host_to_device_async(_stream);
chunk_nested_valids.host_to_device_async(_stream);
chunk_nested_data.host_to_device_async(_stream);
_stream.synchronize();

auto const level_type_size = _file_itm_data.level_type_size;
// get the number of streams we need from the pool and tell them to wait on the H2D copies
int const nkernels = std::bitset<32>(kernel_mask).count();
auto streams = cudf::detail::fork_streams(_stream, nkernels);

// vector of launched streams
std::vector<rmm::cuda_stream_view> streams;
auto const level_type_size = _file_itm_data.level_type_size;

// launch string decoder
int s_idx = 0;
if (has_strings) {
streams.push_back(get_stream_pool().get_stream());
chunk_nested_str_data.host_to_device_async(streams.back());
gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, streams.back());
auto& stream = streams[s_idx++];
chunk_nested_str_data.host_to_device_async(stream);
gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, stream);
}

// launch delta binary decoder
if ((kernel_mask & gpu::KERNEL_MASK_DELTA_BINARY) != 0) {
streams.push_back(get_stream_pool().get_stream());
gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams.back());
gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]);
}

// launch the catch-all page decoder
if ((kernel_mask & gpu::KERNEL_MASK_GENERAL) != 0) {
streams.push_back(get_stream_pool().get_stream());
gpu::DecodePageData(pages, chunks, num_rows, skip_rows, level_type_size, streams.back());
gpu::DecodePageData(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]);
}

// synchronize the streams
std::for_each(streams.begin(), streams.end(), [](auto& stream) { stream.synchronize(); });
cudf::detail::join_streams(streams, _stream);

pages.device_to_host_async(_stream);
page_nesting.device_to_host_async(_stream);
Expand Down
48 changes: 7 additions & 41 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/io/text/byte_range_info.hpp>
#include <cudf/io/text/data_chunk_source.hpp>
#include <cudf/io/text/detail/multistate.hpp>
Expand All @@ -32,7 +33,6 @@
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_pool.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
Expand Down Expand Up @@ -301,44 +301,12 @@ namespace io {
namespace text {
namespace detail {

void fork_stream(std::vector<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream)
{
cudaEvent_t event;
CUDF_CUDA_TRY(cudaEventCreate(&event));
CUDF_CUDA_TRY(cudaEventRecord(event, stream));
for (uint32_t i = 0; i < streams.size(); i++) {
CUDF_CUDA_TRY(cudaStreamWaitEvent(streams[i], event, 0));
}
CUDF_CUDA_TRY(cudaEventDestroy(event));
}

void join_stream(std::vector<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream)
{
cudaEvent_t event;
CUDF_CUDA_TRY(cudaEventCreate(&event));
for (uint32_t i = 0; i < streams.size(); i++) {
CUDF_CUDA_TRY(cudaEventRecord(event, streams[i]));
CUDF_CUDA_TRY(cudaStreamWaitEvent(stream, event, 0));
}
CUDF_CUDA_TRY(cudaEventDestroy(event));
}

std::vector<rmm::cuda_stream_view> get_streams(int32_t count, rmm::cuda_stream_pool& stream_pool)
{
auto streams = std::vector<rmm::cuda_stream_view>();
for (int32_t i = 0; i < count; i++) {
streams.emplace_back(stream_pool.get_stream());
}
return streams;
}

std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source const& source,
std::string const& delimiter,
byte_range_info byte_range,
bool strip_delimiters,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr,
rmm::cuda_stream_pool& stream_pool)
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

Expand All @@ -365,8 +333,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
CUDF_EXPECTS(delimiter.size() < multistate::max_segment_value,
"delimiter contains too many total tokens to produce a deterministic result.");

auto concurrency = 2;
auto streams = get_streams(concurrency, stream_pool);
auto const concurrency = 2;

// must be at least 32 when using warp-reduce on partials
// must be at least 1 more than max possible concurrent tiles
Expand Down Expand Up @@ -411,7 +378,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
output_builder<byte_offset> row_offset_storage(ITEMS_PER_CHUNK, max_growth, stream);
output_builder<char> char_storage(ITEMS_PER_CHUNK, max_growth, stream);

fork_stream(streams, stream);
auto streams = cudf::detail::fork_streams(stream, concurrency);

cudaEvent_t last_launch_event;
CUDF_CUDA_TRY(cudaEventCreate(&last_launch_event));
Expand Down Expand Up @@ -532,7 +499,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source

CUDF_CUDA_TRY(cudaEventDestroy(last_launch_event));

join_stream(streams, stream);
cudf::detail::join_streams(streams, stream);

// if the input was empty, we didn't find a delimiter at all,
// or the first delimiter was also the last: empty output
Expand Down Expand Up @@ -602,11 +569,10 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
parse_options options,
rmm::mr::device_memory_resource* mr)
{
auto stream = cudf::get_default_stream();
auto stream_pool = rmm::cuda_stream_pool(2);
auto stream = cudf::get_default_stream();

auto result = detail::multibyte_split(
source, delimiter, options.byte_range, options.strip_delimiters, stream, mr, stream_pool);
source, delimiter, options.byte_range, options.strip_delimiters, stream, mr);

return result;
}
Expand Down
Loading