Skip to content

Commit

Permalink
Expose streams in ORC reader and writer APIs (#14350)
Browse files Browse the repository at this point in the history
This PR contributes to #13744.
-Added stream parameters to public APIs
```
cudf::io::read_orc
cudf::io::write_orc
cudf::io::read_orc_metadata
cudf::io::read_parsed_orc_statistics
```
-Added stream gtests

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

URL: #14350
  • Loading branch information
shrshi authored Jan 17, 2024
1 parent 56a7b95 commit 1bff508
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 21 deletions.
10 changes: 8 additions & 2 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,15 @@ class orc_reader_options_builder {
* @endcode
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the table in the returned
* table_with_metadata.
*
* @return The set of columns
*/
table_with_metadata read_orc(
orc_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down Expand Up @@ -864,8 +866,10 @@ class orc_writer_options_builder {
* @endcode
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void write_orc(orc_writer_options const& options);
void write_orc(orc_writer_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Builds settings to use for `write_orc_chunked()`.
Expand Down Expand Up @@ -1287,8 +1291,10 @@ class orc_chunked_writer {
* @brief Constructor with chunked writer options
*
* @param[in] options options used to write table
* @param[in] stream CUDA stream used for device memory operations and kernel launches
*/
orc_chunked_writer(chunked_orc_writer_options const& options);
orc_chunked_writer(chunked_orc_writer_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Writes table to output.
Expand Down
14 changes: 10 additions & 4 deletions cpp/include/cudf/io/orc_metadata.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,10 +63,12 @@ struct raw_orc_statistics {
* @endcode
*
* @param src_info Dataset source
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Column names and encoded ORC statistics
*/
raw_orc_statistics read_raw_orc_statistics(source_info const& src_info);
raw_orc_statistics read_raw_orc_statistics(
source_info const& src_info, rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Monostate type alias for the statistics variant.
Expand Down Expand Up @@ -207,10 +209,12 @@ struct parsed_orc_statistics {
* @ingroup io_readers
*
* @param src_info Dataset source
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Column names and decoded ORC statistics
*/
parsed_orc_statistics read_parsed_orc_statistics(source_info const& src_info);
parsed_orc_statistics read_parsed_orc_statistics(
source_info const& src_info, rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Schema of an ORC column, including the nested columns.
Expand Down Expand Up @@ -368,10 +372,12 @@ class orc_metadata {
* @ingroup io_readers
*
* @param src_info Dataset source
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return orc_metadata with ORC schema, number of rows and number of stripes.
*/
orc_metadata read_orc_metadata(source_info const& src_info);
orc_metadata read_orc_metadata(source_info const& src_info,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/** @} */ // end of group
} // namespace io
Expand Down
31 changes: 16 additions & 15 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ void write_csv(csv_writer_options const& options,
mr);
}

raw_orc_statistics read_raw_orc_statistics(source_info const& src_info)
raw_orc_statistics read_raw_orc_statistics(source_info const& src_info,
rmm::cuda_stream_view stream)
{
auto stream = cudf::get_default_stream();
// Get source to read statistics from
std::unique_ptr<datasource> source;
if (src_info.type() == io_type::FILEPATH) {
Expand Down Expand Up @@ -342,9 +342,10 @@ column_statistics::column_statistics(orc::column_statistics&& cs)
}
}

parsed_orc_statistics read_parsed_orc_statistics(source_info const& src_info)
parsed_orc_statistics read_parsed_orc_statistics(source_info const& src_info,
rmm::cuda_stream_view stream)
{
auto const raw_stats = read_raw_orc_statistics(src_info);
auto const raw_stats = read_raw_orc_statistics(src_info, stream);

parsed_orc_statistics result;
result.column_names = raw_stats.column_names;
Expand Down Expand Up @@ -395,12 +396,12 @@ orc_column_schema make_orc_column_schema(host_span<orc::SchemaType const> orc_sc
}
}; // namespace

orc_metadata read_orc_metadata(source_info const& src_info)
orc_metadata read_orc_metadata(source_info const& src_info, rmm::cuda_stream_view stream)
{
auto sources = make_datasources(src_info);

CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported.");
auto const footer = orc::metadata(sources.front().get(), cudf::detail::default_stream_value).ff;
auto const footer = orc::metadata(sources.front().get(), stream).ff;

return {{make_orc_column_schema(footer.types, 0, "")},
static_cast<size_type>(footer.numberOfRows),
Expand All @@ -410,21 +411,21 @@ orc_metadata read_orc_metadata(source_info const& src_info)
/**
* @copydoc cudf::io::read_orc
*/
table_with_metadata read_orc(orc_reader_options const& options, rmm::mr::device_memory_resource* mr)
table_with_metadata read_orc(orc_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

auto datasources = make_datasources(options.get_source());
auto reader = std::make_unique<orc::detail::reader>(
std::move(datasources), options, cudf::get_default_stream(), mr);

auto reader = std::make_unique<orc::detail::reader>(std::move(datasources), options, stream, mr);
return reader->read(options);
}

/**
* @copydoc cudf::io::write_orc
*/
void write_orc(orc_writer_options const& options)
void write_orc(orc_writer_options const& options, rmm::cuda_stream_view stream)
{
namespace io_detail = cudf::io::detail;

Expand All @@ -434,8 +435,7 @@ void write_orc(orc_writer_options const& options)
CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for ORC writing");

auto writer = std::make_unique<orc::detail::writer>(
std::move(sinks[0]), options, io_detail::single_write_mode::YES, cudf::get_default_stream());

std::move(sinks[0]), options, io_detail::single_write_mode::YES, stream);
try {
writer->write(options.get_table());
} catch (...) {
Expand All @@ -451,15 +451,16 @@ void write_orc(orc_writer_options const& options)
/**
* @copydoc cudf::io::orc_chunked_writer::orc_chunked_writer
*/
orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& options)
orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& options,
rmm::cuda_stream_view stream)
{
namespace io_detail = cudf::io::detail;

auto sinks = make_datasinks(options.get_sink());
CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for ORC writing");

writer = std::make_unique<orc::detail::writer>(
std::move(sinks[0]), options, io_detail::single_write_mode::NO, cudf::get_default_stream());
std::move(sinks[0]), options, io_detail::single_write_mode::NO, stream);
}

/**
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_ORCIO_TEST streams/io/orc_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_PARQUETIO_TEST streams/io/parquet_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
Expand Down
137 changes: 137 additions & 0 deletions cpp/tests/streams/io/orc_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/io/detail/orc.hpp>
#include <cudf/io/orc.hpp>
#include <cudf/io/orc_metadata.hpp>
#include <cudf/io/orc_types.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/iterator_utilities.hpp>

#include <iostream>
#include <random>
#include <sstream>
#include <string>
#include <vector>

auto const temp_env = static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

class ORCTest : public cudf::test::BaseFixture {};

template <typename... UniqPtrs>
std::vector<std::unique_ptr<cudf::column>> make_uniqueptrs_vector(UniqPtrs&&... uniqptrs)
{
std::vector<std::unique_ptr<cudf::column>> ptrsvec;
(ptrsvec.push_back(std::forward<UniqPtrs>(uniqptrs)), ...);
return ptrsvec;
}

cudf::table construct_table()
{
constexpr auto num_rows = 10;

auto const zeros_iterator = thrust::make_constant_iterator(0);
auto const ones_iterator = thrust::make_constant_iterator(1);

cudf::test::fixed_width_column_wrapper<bool> col0(zeros_iterator, zeros_iterator + num_rows);
cudf::test::fixed_width_column_wrapper<int8_t> col1(zeros_iterator, zeros_iterator + num_rows);
cudf::test::fixed_width_column_wrapper<int16_t> col2(zeros_iterator, zeros_iterator + num_rows);
cudf::test::fixed_width_column_wrapper<int32_t> col3(zeros_iterator, zeros_iterator + num_rows);
cudf::test::fixed_width_column_wrapper<float> col4(zeros_iterator, zeros_iterator + num_rows);
cudf::test::fixed_width_column_wrapper<double> col5(zeros_iterator, zeros_iterator + num_rows);

cudf::test::fixed_width_column_wrapper<numeric::decimal128> col6 = [&ones_iterator, num_rows] {
auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones_iterator[i], numeric::scale_type{12}};
});
return cudf::test::fixed_width_column_wrapper<numeric::decimal128>(col6_data,
col6_data + num_rows);
}();

cudf::test::fixed_width_column_wrapper<numeric::decimal128> col7 = [&ones_iterator, num_rows] {
auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones_iterator[i], numeric::scale_type{-12}};
});
return cudf::test::fixed_width_column_wrapper<numeric::decimal128>(col7_data,
col7_data + num_rows);
}();

cudf::test::lists_column_wrapper<int64_t> col8 = [] {
auto col8_mask =
cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i % 2); });
return cudf::test::lists_column_wrapper<int64_t>(
{{1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}},
col8_mask);
}();

cudf::test::structs_column_wrapper col9 = [&ones_iterator] {
auto child_col_mask =
cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i % 2); });
cudf::test::fixed_width_column_wrapper<int32_t> child_col(
ones_iterator, ones_iterator + num_rows, child_col_mask);
return cudf::test::structs_column_wrapper{child_col};
}();

cudf::test::strings_column_wrapper col10 = [] {
std::vector<std::string> col10_data(num_rows, "rapids");
return cudf::test::strings_column_wrapper(col10_data.begin(), col10_data.end());
}();

auto colsptr = make_uniqueptrs_vector(col0.release(),
col1.release(),
col2.release(),
col3.release(),
col4.release(),
col5.release(),
col6.release(),
col7.release(),
col8.release(),
col9.release(),
col10.release());
return cudf::table(std::move(colsptr));
}

TEST_F(ORCTest, ORCWriter)
{
auto tab = construct_table();
auto filepath = temp_env->get_temp_filepath("OrcMultiColumn.orc");
cudf::io::orc_writer_options out_opts =
cudf::io::orc_writer_options::builder(cudf::io::sink_info{filepath}, tab);
cudf::io::write_orc(out_opts, cudf::test::get_default_stream());
}

TEST_F(ORCTest, ORCReader)
{
auto tab = construct_table();
auto filepath = temp_env->get_temp_filepath("OrcMultiColumn.orc");
cudf::io::orc_writer_options out_opts =
cudf::io::orc_writer_options::builder(cudf::io::sink_info{filepath}, tab);
cudf::io::write_orc(out_opts, cudf::test::get_default_stream());

cudf::io::orc_reader_options read_opts =
cudf::io::orc_reader_options::builder(cudf::io::source_info{{filepath}});
auto result = cudf::io::read_orc(read_opts, cudf::test::get_default_stream());

auto meta = read_orc_metadata(cudf::io::source_info{filepath});
auto const stats = cudf::io::read_parsed_orc_statistics(cudf::io::source_info{filepath});
}

0 comments on commit 1bff508

Please sign in to comment.