From 9bb6c6a6496355cabc33dbab24022e1b47cdef3a Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 25 Apr 2024 10:40:05 -0700 Subject: [PATCH 01/20] round trip fixed_len_byte_array data properly --- cpp/include/cudf/io/types.hpp | 59 ++++++++++++++- cpp/src/io/functions.cpp | 2 + cpp/src/io/parquet/page_enc.cu | 32 +++++--- cpp/src/io/parquet/parquet_gpu.hpp | 1 + cpp/src/io/parquet/reader_impl.cpp | 16 ++-- cpp/src/io/parquet/reader_impl_helpers.cpp | 7 +- cpp/src/io/parquet/writer_impl.cu | 13 +++- cpp/src/io/utilities/column_buffer.cpp | 5 ++ cpp/tests/io/parquet_writer_test.cpp | 86 ++++++++++++++++++++++ 9 files changed, 198 insertions(+), 23 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index b3dea0ab280..150e997f533 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -236,6 +236,8 @@ enum dictionary_policy { struct column_name_info { std::string name; ///< Column name std::optional is_nullable; ///< Column nullability + std::optional is_binary; ///< Column is binary (i.e. not a list) + std::optional type_length; ///< Byte width of data (for fixed length data) std::vector children; ///< Child column names /** @@ -243,9 +245,12 @@ struct column_name_info { * * @param _name Column name * @param _is_nullable True if column is nullable + * @param _is_binary True if column is binary data */ - column_name_info(std::string const& _name, std::optional _is_nullable = std::nullopt) - : name(_name), is_nullable(_is_nullable) + column_name_info(std::string const& _name, + std::optional _is_nullable = std::nullopt, + std::optional _is_binary = std::nullopt) + : name(_name), is_nullable(_is_nullable), is_binary(_is_binary) { } @@ -606,6 +611,7 @@ class column_in_metadata { bool _skip_compression = false; std::optional _decimal_precision; std::optional _parquet_field_id; + std::optional _type_length; std::vector children; column_encoding _encoding = column_encoding::USE_DEFAULT; @@ -693,6 +699,19 @@ class column_in_metadata { return *this; } + /** + * @brief Set the data length of the column. Only valid if this column is a + * fixed-length byte array. + * + * @param length The data length to set for this column + * @return this for chaining + */ + column_in_metadata& set_type_length(int32_t length) noexcept + { + _type_length = length; + return *this; + } + /** * @brief Set the parquet field id of this column. * @@ -826,6 +845,22 @@ class column_in_metadata { */ [[nodiscard]] uint8_t get_decimal_precision() const { return _decimal_precision.value(); } + /** + * @brief Get whether type length has been set for this column + * + * @return Boolean indicating whether type length has been set for this column + */ + [[nodiscard]] bool is_type_length_set() const noexcept { return _type_length.has_value(); } + + /** + * @brief Get the type length that was set for this column. + * + * @throws std::bad_optional_access If type length was not set for this + * column. Check using `is_type_length_set()` first. + * @return The decimal precision that was set for this column + */ + [[nodiscard]] uint8_t get_type_length() const { return _type_length.value(); } + /** * @brief Get whether parquet field id has been set for this column. * @@ -932,6 +967,7 @@ struct partition_info { class reader_column_schema { // Whether to read binary data as a string column bool _convert_binary_to_strings{true}; + int32_t _type_length{0}; std::vector children; @@ -997,6 +1033,18 @@ class reader_column_schema { return *this; } + /** + * @brief Sets the length of fixed length data. + * + * @param type_length Size of the data type in bytes + * @return this for chaining + */ + reader_column_schema& set_type_length(int32_t type_length) + { + _type_length = type_length; + return *this; + } + /** * @brief Get whether to encode this column as binary or string data * @@ -1007,6 +1055,13 @@ class reader_column_schema { return _convert_binary_to_strings; } + /** + * @brief Get the length in bytes of this fixed length data. + * + * @return The length in bytes of the data type + */ + [[nodiscard]] int32_t get_type_length() const { return _type_length; } + /** * @brief Get the number of child objects * diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 12059dffa4e..7287d6ba786 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -534,6 +534,8 @@ table_input_metadata::table_input_metadata(table_metadata const& metadata) [&](column_name_info const& name) { auto col_meta = column_in_metadata{name.name}; if (name.is_nullable.has_value()) { col_meta.set_nullability(name.is_nullable.value()); } + if (name.is_binary.value_or(false)) { col_meta.set_output_as_binary(true); } + if (name.type_length.has_value()) { col_meta.set_type_length(name.type_length.value()); } std::transform(name.children.begin(), name.children.end(), std::back_inserter(col_meta.children), diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 227f13db60e..1af2166351d 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -109,10 +109,10 @@ using rle_page_enc_state_s = page_enc_state_s; /** * @brief Returns the size of the type in the Parquet file. */ -constexpr uint32_t physical_type_len(Type physical_type, type_id id) +constexpr uint32_t physical_type_len(Type physical_type, type_id id, int type_length) { - if (physical_type == FIXED_LEN_BYTE_ARRAY and id == type_id::DECIMAL128) { - return sizeof(__int128_t); + if (physical_type == FIXED_LEN_BYTE_ARRAY) { + return id == type_id::DECIMAL128 ? sizeof(__int128_t) : type_length; } switch (physical_type) { case INT96: return 12u; @@ -183,7 +183,7 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) auto const physical_type = s->col.physical_type; auto const leaf_type = s->col.leaf_column->type().id(); - auto const dtype_len = physical_type_len(physical_type, leaf_type); + auto const dtype_len = physical_type_len(physical_type, leaf_type, s->col.type_length); auto const nvals = s->frag.num_leaf_values; auto const start_value_idx = s->frag.start_value_idx; @@ -541,7 +541,8 @@ __device__ size_t delta_data_len(Type physical_type, size_t page_size, encode_kernel_mask encoding) { - auto const dtype_len_out = physical_type_len(physical_type, type_id); + // dtype_len_out is for the lengths, rather than the char data, so pass sizeof(int32_t) + auto const dtype_len_out = physical_type_len(physical_type, type_id, sizeof(int32_t)); auto const dtype_len = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -1662,7 +1663,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) __syncthreads(); auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -1837,6 +1838,19 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) thrust::make_reverse_iterator(v_char_ptr), dst + pos); } + } else { + auto const elem = + get_element(*(s->col.leaf_column), val_idx); + if (len != 0 and elem.data() != nullptr) { + if (is_split_stream) { + auto const v_char_ptr = reinterpret_cast(elem.data()); + for (int i = 0; i < dtype_len_out; i++, pos += stride) { + dst[pos] = v_char_ptr[i]; + } + } else { + memcpy(dst + pos, elem.data(), len); + } + } } } break; } @@ -1884,7 +1898,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // Encode data values auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -2016,7 +2030,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // Encode data values auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -3215,7 +3229,7 @@ __device__ int32_t calculate_boundary_order(statistics_chunk const* s, } // align ptr to an 8-byte boundary. address returned will be <= ptr. -constexpr __device__ void* align8(void* ptr) +inline __device__ void* align8(void* ptr) { // it's ok to round down because we have an extra 7 bytes in the buffer auto algn = 3 & reinterpret_cast(ptr); diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index c06fb63acda..cc33fa24043 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -470,6 +470,7 @@ struct chunk_page_info { struct parquet_column_device_view : stats_column_desc { Type physical_type; //!< physical data type ConvertedType converted_type; //!< logical data type + int32_t type_length; //!< length of fixed_length_byte_array data uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble) //!< levels constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; } diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index b7172f5ba67..e358da26563 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -497,14 +497,18 @@ table_with_metadata reader::impl::read_chunk_internal( // Create the final output cudf columns. for (size_t i = 0; i < _output_buffers.size(); ++i) { - auto metadata = _reader_column_schema.has_value() - ? std::make_optional((*_reader_column_schema)[i]) - : std::nullopt; - auto const& schema = _metadata->get_schema(_output_column_schemas[i]); - // FIXED_LEN_BYTE_ARRAY never read as string - if (schema.type == FIXED_LEN_BYTE_ARRAY and schema.converted_type != DECIMAL) { + auto metadata = _reader_column_schema.has_value() + ? std::make_optional((*_reader_column_schema)[i]) + : std::nullopt; + auto const& schema = _metadata->get_schema(_output_column_schemas[i]); + auto const logical_type = schema.logical_type.value_or(LogicalType{}); + // FIXED_LEN_BYTE_ARRAY never read as string. + // TODO: if we ever decide that the default reader behavior is to treat unannotated BINARY as + // binary and not strings, this test needs to change. + if (schema.type == FIXED_LEN_BYTE_ARRAY and logical_type.type != LogicalType::DECIMAL) { metadata = std::make_optional(); metadata->set_convert_binary_to_strings(false); + metadata->set_type_length(schema.type_length); } // Only construct `out_metadata` if `_output_metadata` has not been cached. if (!_output_metadata) { diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 402ccef7a15..3ef89f1b378 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -165,9 +165,10 @@ type_id to_type_id(SchemaElement const& schema, case FLOAT: return type_id::FLOAT32; case DOUBLE: return type_id::FLOAT64; case BYTE_ARRAY: - case FIXED_LEN_BYTE_ARRAY: - // Can be mapped to INT32 (32-bit hash) or STRING - return strings_to_categorical ? type_id::INT32 : type_id::STRING; + // strings can be mapped to a 32-bit hash + if (strings_to_categorical) { return type_id::INT32; } + [[fallthrough]]; + case FIXED_LEN_BYTE_ARRAY: return type_id::STRING; case INT96: return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id : type_id::TIMESTAMP_NANOSECONDS; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 5509a33f9f0..98bc37f689e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -704,7 +704,14 @@ std::vector construct_schema_tree( } schema_tree_node col_schema{}; - col_schema.type = Type::BYTE_ARRAY; + // test if this should this be output as FIXED_LEN_BYTE_ARRAY + if (col_meta.is_type_length_set()) { + col_schema.type = Type::FIXED_LEN_BYTE_ARRAY; + col_schema.type_length = col_meta.get_type_length(); + } else { + col_schema.type = Type::BYTE_ARRAY; + } + col_schema.converted_type = thrust::nullopt; col_schema.stats_dtype = statistics_dtype::dtype_byte_array; col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED; @@ -1024,6 +1031,7 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream auto desc = parquet_column_device_view{}; // Zero out all fields desc.stats_dtype = schema_node.stats_dtype; desc.ts_scale = schema_node.ts_scale; + desc.type_length = schema_node.type_length; if (is_list()) { desc.level_offsets = _dremel_offsets.data(); @@ -1266,8 +1274,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, chunk_col_desc.requested_encoding != column_encoding::USE_DEFAULT && chunk_col_desc.requested_encoding != column_encoding::DICTIONARY; auto const is_type_non_dict = - chunk_col_desc.physical_type == Type::BOOLEAN || - (chunk_col_desc.output_as_byte_array && chunk_col_desc.physical_type == Type::BYTE_ARRAY); + chunk_col_desc.physical_type == Type::BOOLEAN || chunk_col_desc.output_as_byte_array; if (is_type_non_dict || is_requested_non_dict) { chunk.use_dictionary = false; diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 5dc2291abdc..c01311ad0c7 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -198,6 +198,11 @@ std::unique_ptr make_column(column_buffer_base& buffer, if (schema_info != nullptr) { schema_info->children.push_back(column_name_info{"offsets"}); schema_info->children.push_back(column_name_info{"binary"}); + // cuDF type will be list, but remember it was originally binary data + schema_info->is_binary = true; + if (schema.has_value() and schema->get_type_length() > 0) { + schema_info->type_length = schema->get_type_length(); + } } return make_lists_column( diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index a16b3d63177..2a3bb40a49f 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1857,6 +1857,92 @@ TEST_F(ParquetWriterTest, DurationByteStreamSplit) test_durations([](auto i) { return false; }, true); } +TEST_F(ParquetWriterTest, WriteFixedLenByteArray) +{ + using cudf::io::parquet::detail::Encoding; + constexpr int fixed_width = 16; + constexpr cudf::size_type num_rows = fixed_width * fixed_width; + std::vector data(num_rows * fixed_width, 0); + std::vector offsets(num_rows + 1); + + for (int i = 0; i < fixed_width; i++) { + for (int j = 0; j < fixed_width; j++) { + auto const rowid = i * fixed_width + j; + auto const off = rowid * fixed_width; + offsets[rowid] = off; + data[off + fixed_width - 2] = i; + data[off + fixed_width - 1] = j; + } + } + offsets[num_rows] = num_rows * fixed_width; + + auto data_child = cudf::test::fixed_width_column_wrapper(data.begin(), data.end()); + auto off_child = cudf::test::fixed_width_column_wrapper(offsets.begin(), offsets.end()); + auto col = cudf::make_lists_column(num_rows, off_child.release(), data_child.release(), 0, {}); + + auto expected = table_view{{*col, *col, *col, *col}}; + cudf::io::table_input_metadata expected_metadata(expected); + + expected_metadata.column_metadata[0] + .set_name("flba_plain") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::PLAIN) + .set_output_as_binary(true); + expected_metadata.column_metadata[1] + .set_name("flba_split") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT) + .set_output_as_binary(true); + expected_metadata.column_metadata[2] + .set_name("flba_delta") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY) + .set_output_as_binary(true); + expected_metadata.column_metadata[3] + .set_name("flba_dict") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::DICTIONARY) + .set_output_as_binary(true); + + auto filepath = temp_env->get_temp_filepath("WriteFixedLenByteArray.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + + // check page headers to make sure each column is encoded with the appropriate encoder + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::detail::FileMetaData fmd; + read_footer(source, &fmd); + + // check that the schema retains the FIXED_LEN_BYTE_ARRAY type + for (int i = 1; i <= 4; i++) { + EXPECT_EQ(fmd.schema[i].type, cudf::io::parquet::detail::Type::FIXED_LEN_BYTE_ARRAY); + EXPECT_EQ(fmd.schema[i].type_length, fixed_width); + } + + // no nulls and no repetition, so the only encoding used should be for the data. + auto const expect_enc = [&fmd](int idx, cudf::io::parquet::detail::Encoding enc) { + EXPECT_EQ(fmd.row_groups[0].columns[idx].meta_data.encodings[0], enc); + }; + + // requested plain + expect_enc(0, Encoding::PLAIN); + // requested byte_stream_split + expect_enc(1, Encoding::BYTE_STREAM_SPLIT); + // requested delta_byte_array + expect_enc(2, Encoding::DELTA_BYTE_ARRAY); + // requested dictionary, but should fall back to plain + // TODO: update if we get FLBA working with dictionary encoding + expect_enc(3, Encoding::PLAIN); +} + ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template From b4750842e6cc98f492245a60631e5edfb370c95c Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 25 Apr 2024 21:00:32 +0000 Subject: [PATCH 02/20] address review comments --- cpp/src/io/parquet/writer_impl.cu | 2 +- cpp/tests/io/parquet_writer_test.cpp | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 98bc37f689e..cf0d7c18d95 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -704,7 +704,7 @@ std::vector construct_schema_tree( } schema_tree_node col_schema{}; - // test if this should this be output as FIXED_LEN_BYTE_ARRAY + // test if this should be output as FIXED_LEN_BYTE_ARRAY if (col_meta.is_type_length_set()) { col_schema.type = Type::FIXED_LEN_BYTE_ARRAY; col_schema.type_length = col_meta.get_type_length(); diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 2a3bb40a49f..3996321a066 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1859,22 +1859,22 @@ TEST_F(ParquetWriterTest, DurationByteStreamSplit) TEST_F(ParquetWriterTest, WriteFixedLenByteArray) { + srand(31337); using cudf::io::parquet::detail::Encoding; constexpr int fixed_width = 16; - constexpr cudf::size_type num_rows = fixed_width * fixed_width; - std::vector data(num_rows * fixed_width, 0); + constexpr cudf::size_type num_rows = 200; + std::vector data(num_rows * fixed_width); std::vector offsets(num_rows + 1); - for (int i = 0; i < fixed_width; i++) { - for (int j = 0; j < fixed_width; j++) { - auto const rowid = i * fixed_width + j; - auto const off = rowid * fixed_width; - offsets[rowid] = off; - data[off + fixed_width - 2] = i; - data[off + fixed_width - 1] = j; + // fill a num_rows X fixed_width array with random numbers and populate offsets array + int cur_offset = 0; + for (int i = 0; i < num_rows; i++) { + offsets[i] = cur_offset; + for (int j = 0; j < fixed_width; j++, cur_offset++) { + data[cur_offset] = rand() & 0xff; } } - offsets[num_rows] = num_rows * fixed_width; + offsets[num_rows] = cur_offset; auto data_child = cudf::test::fixed_width_column_wrapper(data.begin(), data.end()); auto off_child = cudf::test::fixed_width_column_wrapper(offsets.begin(), offsets.end()); From ad45ee1e08823dc3479e1fcf6776a2c4b4a2b7a4 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 26 Apr 2024 19:20:40 +0000 Subject: [PATCH 03/20] checkpoint metadata paths --- python/cudf/cudf/_lib/parquet.pyx | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 9ce9aad18f7..7fa396f5912 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -788,7 +788,11 @@ cdef _set_col_metadata( Column col, column_in_metadata& col_meta, bool force_nullable_schema=False, + str path=None, ): + name = col_meta.get_name().decode('UTF-8') + full_path = path + "." + name if path is not None else name + print(full_path) if force_nullable_schema: # Only set nullability if `force_nullable_schema` # is true. @@ -802,13 +806,17 @@ cdef _set_col_metadata( _set_col_metadata( child_col, col_meta.child(i), - force_nullable_schema + force_nullable_schema, + full_path ) elif isinstance(col.dtype, cudf.ListDtype): + full_path = full_path + ".list" + col_meta.child(1).set_name("element".encode()) _set_col_metadata( col.children[1], col_meta.child(1), - force_nullable_schema + force_nullable_schema, + full_path ) elif isinstance(col.dtype, cudf.core.dtypes.DecimalDtype): col_meta.set_decimal_precision(col.dtype.precision) From 101ea2d0eaad1b53fe1d83f6895187f193ec745c Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 26 Apr 2024 20:29:25 +0000 Subject: [PATCH 04/20] add some column_in_metadata methods --- python/cudf/cudf/_lib/cpp/io/types.pxd | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index d8cc329b0a0..260f94e94c9 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -1,6 +1,6 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. -from libc.stdint cimport uint8_t +from libc.stdint cimport int32_t, uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport shared_ptr, unique_ptr @@ -57,6 +57,18 @@ cdef extern from "cudf/io/types.hpp" \ ADAPTIVE = 1, ALWAYS = 2, + ctypedef enum column_encoding: + USE_DEFAULT = -1, + DICTIONARY = 0, + PLAIN = 1, + DELTA_BINARY_PACKED = 2, + DELTA_LENGTH_BYTE_ARRAY = 3, + DELTA_BYTE_ARRAY = 4, + BYTE_STREAM_SPLIT = 5, + DIRECT = 6, + DIRECT_V2 = 7, + DICTIONARY_V2 = 8, + cdef cppclass column_name_info: string name vector[column_name_info] children @@ -81,6 +93,9 @@ cdef extern from "cudf/io/types.hpp" \ column_in_metadata& set_decimal_precision(uint8_t precision) column_in_metadata& child(size_type i) column_in_metadata& set_output_as_binary(bool binary) + column_in_metadata& set_type_length(int32_t type_length) + column_in_metadata& set_skip_compression(bool skip) + column_in_metadata& set_encoding(column_encoding enc) string get_name() cdef cppclass table_input_metadata: From a7745535e3947679e45f22bbfe1d4817eccb630b Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 26 Apr 2024 21:41:50 +0000 Subject: [PATCH 05/20] finish first cut at new options --- python/cudf/cudf/_lib/cpp/io/types.pxd | 20 ++++----- python/cudf/cudf/_lib/parquet.pyx | 61 ++++++++++++++++++++++++-- python/cudf/cudf/core/dataframe.py | 6 +++ python/cudf/cudf/io/parquet.py | 32 ++++++++++++++ python/cudf/cudf/utils/ioutils.py | 10 +++++ 5 files changed, 115 insertions(+), 14 deletions(-) diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index 260f94e94c9..4af848f9c1c 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -58,16 +58,16 @@ cdef extern from "cudf/io/types.hpp" \ ALWAYS = 2, ctypedef enum column_encoding: - USE_DEFAULT = -1, - DICTIONARY = 0, - PLAIN = 1, - DELTA_BINARY_PACKED = 2, - DELTA_LENGTH_BYTE_ARRAY = 3, - DELTA_BYTE_ARRAY = 4, - BYTE_STREAM_SPLIT = 5, - DIRECT = 6, - DIRECT_V2 = 7, - DICTIONARY_V2 = 8, + USE_DEFAULT "cudf::io::column_encoding::USE_DEFAULT" + DICTIONARY "cudf::io::column_encoding::DICTIONARY" + PLAIN "cudf::io::column_encoding::PLAIN" + DELTA_BINARY_PACKED "cudf::io::column_encoding::DELTA_BINARY_PACKED" + DELTA_LENGTH_BYTE_ARRAY "cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY" + DELTA_BYTE_ARRAY "cudf::io::column_encoding::DELTA_BYTE_ARRAY" + BYTE_STREAM_SPLIT "cudf::io::column_encoding::BYTE_STREAM_SPLIT" + DIRECT "cudf::io::column_encoding::DIRECT" + DIRECT_V2 "cudf::io::column_encoding::DIRECT_V2" + DICTIONARY_V2 "cudf::io::column_encoding::DICTIONARY_V2" cdef cppclass column_name_info: string name diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 7fa396f5912..13c887c4bc7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -403,6 +403,9 @@ def write_parquet( object force_nullable_schema=False, header_version="1.0", use_dictionary=True, + object skip_compression=None, + object column_encoding=None, + object column_type_length=None, ): """ Cython function to call into libcudf API, see `write_parquet`. @@ -453,7 +456,11 @@ def write_parquet( _set_col_metadata( table[name]._column, tbl_meta.column_metadata[i], - force_nullable_schema + force_nullable_schema, + None, + skip_compression, + column_encoding, + column_type_length ) cdef map[string, string] tmp_user_data @@ -784,20 +791,60 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression): raise ValueError("Unsupported `compression` type") +cdef cudf_io_types.column_encoding _get_encoding_type(object encoding): + if encoding is None: + return cudf_io_types.column_encoding.USE_DEFAULT + + enc = str(encoding).upper() + if enc == "PLAIN": + return cudf_io_types.column_encoding.PLAIN + elif enc == "DICTIONARY": + return cudf_io_types.column_encoding.DICTIONARY + elif enc == "DELTA_BINARY_PACKED": + return cudf_io_types.column_encoding.DELTA_BINARY_PACKED + elif enc == "DELTA_LENGTH_BYTE_ARRAY": + return cudf_io_types.column_encoding.DELTA_LENGTH_BYTE_ARRAY + elif enc == "DELTA_BYTE_ARRAY": + return cudf_io_types.column_encoding.DELTA_BYTE_ARRAY + elif enc == "BYTE_STREAM_SPLIT": + return cudf_io_types.column_encoding.BYTE_STREAM_SPLIT + elif enc == "DIRECT": + return cudf_io_types.column_encoding.DIRECT + elif enc == "DIRECT_V2": + return cudf_io_types.column_encoding.DIRECT_V2 + elif enc == "DICTIONARY_V2": + return cudf_io_types.column_encoding.DICTIONARY_V2 + else: + raise ValueError("Unsupported `column_encoding` type") + + cdef _set_col_metadata( Column col, column_in_metadata& col_meta, bool force_nullable_schema=False, str path=None, + object skip_compression=None, + object column_encoding=None, + object column_type_length=None, ): name = col_meta.get_name().decode('UTF-8') full_path = path + "." + name if path is not None else name - print(full_path) + # print(full_path) if force_nullable_schema: # Only set nullability if `force_nullable_schema` # is true. col_meta.set_nullability(True) + if skip_compression is not None and full_path in skip_compression: + col_meta.set_skip_compression(skip_compression[full_path]) + + if column_encoding is not None and full_path in column_encoding: + col_meta.set_encoding(_get_encoding_type(column_encoding[full_path])) + + if column_type_length is not None and full_path in column_type_length: + col_meta.set_output_as_binary(True) + col_meta.set_type_length(column_type_length[full_path]) + if isinstance(col.dtype, cudf.StructDtype): for i, (child_col, name) in enumerate( zip(col.children, list(col.dtype.fields)) @@ -807,7 +854,10 @@ cdef _set_col_metadata( child_col, col_meta.child(i), force_nullable_schema, - full_path + full_path, + skip_compression, + column_encoding, + column_type_length ) elif isinstance(col.dtype, cudf.ListDtype): full_path = full_path + ".list" @@ -816,7 +866,10 @@ cdef _set_col_metadata( col.children[1], col_meta.child(1), force_nullable_schema, - full_path + full_path, + skip_compression, + column_encoding, + column_type_length ) elif isinstance(col.dtype, cudf.core.dtypes.DecimalDtype): col_meta.set_decimal_precision(col.dtype.precision) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 45bb66d5d4b..d0e73e362c2 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6686,6 +6686,9 @@ def to_parquet( return_metadata=False, use_dictionary=True, header_version="1.0", + skip_compression=None, + column_encoding=None, + column_type_length=None, *args, **kwargs, ): @@ -6712,6 +6715,9 @@ def to_parquet( return_metadata=return_metadata, use_dictionary=use_dictionary, header_version=header_version, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, *args, **kwargs, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index e7f1ad0751f..ed1b0a0610f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -68,6 +68,9 @@ def _write_parquet( force_nullable_schema=False, header_version="1.0", use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -100,6 +103,9 @@ def _write_parquet( "force_nullable_schema": force_nullable_schema, "header_version": header_version, "use_dictionary": use_dictionary, + "skip_compression": skip_compression, + "column_encoding": column_encoding, + "column_type_length": column_type_length, } if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs): with ExitStack() as stack: @@ -138,6 +144,11 @@ def write_to_dataset( max_page_size_rows=None, storage_options=None, force_nullable_schema=False, + header_version="1.0", + use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, ): """Wraps `to_parquet` to write partitioned Parquet datasets. For each combination of partition group and value, @@ -239,6 +250,11 @@ def write_to_dataset( max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, ) else: @@ -260,6 +276,11 @@ def write_to_dataset( max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, ) return metadata @@ -903,6 +924,9 @@ def to_parquet( force_nullable_schema=False, header_version="1.0", use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, *args, **kwargs, ): @@ -952,6 +976,11 @@ def to_parquet( return_metadata=return_metadata, storage_options=storage_options, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, ) partition_info = ( @@ -979,6 +1008,9 @@ def to_parquet( force_nullable_schema=force_nullable_schema, header_version=header_version, use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, ) else: diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 66e14f4b9de..3db6abc8416 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -301,6 +301,16 @@ If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, otherwise as `not null`. +skip_compression : dict, default None + Dictionary of `column_name: bool` pairs indicating whether the named + column is to be compressed. +column_encoding : dict, default None + Dictionary of `column_name: str` pairs specifying that a particular + encoding be used for the named column. Valid values include:... +column_type_length : dict, default None + Dictionary of `column_name: int` pairs. If the named column is to + be written as FIXED_LEN_BYTE_ARRAY, the value specifies the size of + the data in bytes. **kwargs Additional parameters will be passed to execution engines other than ``cudf``. From 92a1fe0c10272352e71a31594cb4692f2300da5c Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 26 Apr 2024 23:09:30 +0000 Subject: [PATCH 06/20] add doc stubs --- python/cudf/cudf/io/parquet.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index ed1b0a0610f..a11234b35c6 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -213,6 +213,11 @@ def write_to_dataset( If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, otherwise as `not null`. + header_version: + use_dictionary: + skip_compression: + column_encoding: + column_type_length: """ fs = ioutils._ensure_filesystem(fs, root_path, storage_options) From 1974f124d6ee386ddad72e08e8b4123777fd183f Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Sat, 27 Apr 2024 11:57:02 -0700 Subject: [PATCH 07/20] clean up docs and change skip_compression to a set --- python/cudf/cudf/_lib/parquet.pyx | 2 +- python/cudf/cudf/utils/ioutils.py | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 13c887c4bc7..d5d1babc7a7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -836,7 +836,7 @@ cdef _set_col_metadata( col_meta.set_nullability(True) if skip_compression is not None and full_path in skip_compression: - col_meta.set_skip_compression(skip_compression[full_path]) + col_meta.set_skip_compression(True) if column_encoding is not None and full_path in column_encoding: col_meta.set_encoding(_get_encoding_type(column_encoding[full_path])) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 3db6abc8416..a237f0f5a9b 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -301,16 +301,16 @@ If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, otherwise as `not null`. -skip_compression : dict, default None - Dictionary of `column_name: bool` pairs indicating whether the named - column is to be compressed. -column_encoding : dict, default None - Dictionary of `column_name: str` pairs specifying that a particular - encoding be used for the named column. Valid values include:... -column_type_length : dict, default None - Dictionary of `column_name: int` pairs. If the named column is to - be written as FIXED_LEN_BYTE_ARRAY, the value specifies the size of - the data in bytes. +skip_compression : set, optional, default None + If a column name is present in the set, that column will not be compressed, + regardless of the ``compression`` setting. +column_encoding : dict, optional, default None + Sets the page encoding to use on a per-column basis. The key is a column + name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED', + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', or 'BYTE_STREAM_SPLIT'. +column_type_length : dict, optional, default None + Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. + The key is a column name and the value is an integer. **kwargs Additional parameters will be passed to execution engines other than ``cudf``. From f43dae28d45f90333cb84175f536e62717d9c78f Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 29 Apr 2024 09:56:27 -0700 Subject: [PATCH 08/20] do not build full_path if it is not needed --- python/cudf/cudf/_lib/parquet.pyx | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index d5d1babc7a7..fa7cbfc59b6 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -827,9 +827,10 @@ cdef _set_col_metadata( object column_encoding=None, object column_type_length=None, ): - name = col_meta.get_name().decode('UTF-8') + need_path = skip_compression is not None or column_encoding is not None or column_type_length is not None + name = col_meta.get_name().decode('UTF-8') if need_path else None full_path = path + "." + name if path is not None else name - # print(full_path) + if force_nullable_schema: # Only set nullability if `force_nullable_schema` # is true. @@ -860,7 +861,7 @@ cdef _set_col_metadata( column_type_length ) elif isinstance(col.dtype, cudf.ListDtype): - full_path = full_path + ".list" + full_path = full_path + ".list" if full_path is not None else None col_meta.child(1).set_name("element".encode()) _set_col_metadata( col.children[1], From 1a4494452d5ca7ab3209b0b7be6376f1d29c9d02 Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 29 Apr 2024 10:50:22 -0700 Subject: [PATCH 09/20] formatting --- python/cudf/cudf/_lib/parquet.pyx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index fa7cbfc59b6..9603efc2591 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -827,7 +827,8 @@ cdef _set_col_metadata( object column_encoding=None, object column_type_length=None, ): - need_path = skip_compression is not None or column_encoding is not None or column_type_length is not None + need_path = (skip_compression is not None or column_encoding is not None or + column_type_length is not None) name = col_meta.get_name().decode('UTF-8') if need_path else None full_path = path + "." + name if path is not None else name From 4c4b22414bd0a718abadbabdc8a8b83542eaec99 Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 29 Apr 2024 10:59:53 -0700 Subject: [PATCH 10/20] skip setting element names too --- python/cudf/cudf/_lib/parquet.pyx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 9603efc2591..0c012fd1f0a 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -862,8 +862,9 @@ cdef _set_col_metadata( column_type_length ) elif isinstance(col.dtype, cudf.ListDtype): - full_path = full_path + ".list" if full_path is not None else None - col_meta.child(1).set_name("element".encode()) + if full_path is not None: + full_path = full_path + ".list" + col_meta.child(1).set_name("element".encode()) _set_col_metadata( col.children[1], col_meta.child(1), From cee8c9c93f0d0a99a8646571acc0a8592f0fe9ed Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 29 Apr 2024 11:27:30 -0700 Subject: [PATCH 11/20] add test that uses new option --- python/cudf/cudf/tests/test_parquet.py | 28 ++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 6fb1d3d8ba5..d855be38462 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2,6 +2,7 @@ import datetime import glob +import hashlib import math import os import pathlib @@ -2807,6 +2808,33 @@ def test_parquet_reader_fixed_bin(datadir): assert_eq(expect, got) +def test_parquet_flba_round_trip(tmpdir): + def flba(i): + hasher = hashlib.sha256() + hasher.update(i.to_bytes(4, "little")) + return hasher.digest() + + # use pyarrow to write table of fixed_len_byte_array + num_rows = 200 + data = pa.array([flba(i) for i in range(num_rows)], type=pa.binary(32)) + padf = pa.Table.from_arrays([data], names=["flba"]) + padf_fname = tmpdir.join("padf.parquet") + # TODO: cudf cannot read fixed_len_byte_array that is dictionary encoded + # remove after merge of #15601 + pq.write_table(padf, padf_fname, use_dictionary=False) + + # round trip data with cudf + cdf = cudf.read_parquet(padf_fname) + cdf_fname = tmpdir.join("cdf.parquet") + cdf.to_parquet(cdf_fname, column_type_length={"flba": 32}) + + # now read back in with pyarrow to test it was written properly by cudf + padf2 = pq.read_table(padf_fname) + padf3 = pq.read_table(cdf_fname) + assert_eq(padf2, padf3) + assert_eq(padf2.schema[0].type, padf3.schema[0].type) + + def test_parquet_reader_rle_boolean(datadir): fname = datadir / "rle_boolean_encoding.parquet" From 4ad48cd289669bc35c069335205812273b005e6d Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 29 Apr 2024 11:41:06 -0700 Subject: [PATCH 12/20] add output_as_binary as separate option --- python/cudf/cudf/_lib/parquet.pyx | 16 ++++++++++++---- python/cudf/cudf/core/dataframe.py | 2 ++ python/cudf/cudf/io/parquet.py | 9 +++++++++ python/cudf/cudf/utils/ioutils.py | 3 +++ 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 0c012fd1f0a..85975869a15 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -406,6 +406,7 @@ def write_parquet( object skip_compression=None, object column_encoding=None, object column_type_length=None, + object output_as_binary=None, ): """ Cython function to call into libcudf API, see `write_parquet`. @@ -460,7 +461,8 @@ def write_parquet( None, skip_compression, column_encoding, - column_type_length + column_type_length, + output_as_binary ) cdef map[string, string] tmp_user_data @@ -826,9 +828,10 @@ cdef _set_col_metadata( object skip_compression=None, object column_encoding=None, object column_type_length=None, + object output_as_binary=None, ): need_path = (skip_compression is not None or column_encoding is not None or - column_type_length is not None) + column_type_length is not None or output_as_binary is not None) name = col_meta.get_name().decode('UTF-8') if need_path else None full_path = path + "." + name if path is not None else name @@ -847,6 +850,9 @@ cdef _set_col_metadata( col_meta.set_output_as_binary(True) col_meta.set_type_length(column_type_length[full_path]) + if output_as_binary is not None and full_path in output_as_binary: + col_meta.set_output_as_binary(True) + if isinstance(col.dtype, cudf.StructDtype): for i, (child_col, name) in enumerate( zip(col.children, list(col.dtype.fields)) @@ -859,7 +865,8 @@ cdef _set_col_metadata( full_path, skip_compression, column_encoding, - column_type_length + column_type_length, + output_as_binary ) elif isinstance(col.dtype, cudf.ListDtype): if full_path is not None: @@ -872,7 +879,8 @@ cdef _set_col_metadata( full_path, skip_compression, column_encoding, - column_type_length + column_type_length, + output_as_binary ) elif isinstance(col.dtype, cudf.core.dtypes.DecimalDtype): col_meta.set_decimal_precision(col.dtype.precision) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index d0e73e362c2..6576489f36d 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6689,6 +6689,7 @@ def to_parquet( skip_compression=None, column_encoding=None, column_type_length=None, + output_as_binary=None, *args, **kwargs, ): @@ -6718,6 +6719,7 @@ def to_parquet( skip_compression=skip_compression, column_encoding=column_encoding, column_type_length=column_type_length, + output_as_binary=output_as_binary, *args, **kwargs, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a11234b35c6..682bb985fce 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -71,6 +71,7 @@ def _write_parquet( skip_compression=None, column_encoding=None, column_type_length=None, + output_as_binary=None, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -106,6 +107,7 @@ def _write_parquet( "skip_compression": skip_compression, "column_encoding": column_encoding, "column_type_length": column_type_length, + "output_as_binary": output_as_binary, } if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs): with ExitStack() as stack: @@ -149,6 +151,7 @@ def write_to_dataset( skip_compression=None, column_encoding=None, column_type_length=None, + output_as_binary=None, ): """Wraps `to_parquet` to write partitioned Parquet datasets. For each combination of partition group and value, @@ -218,6 +221,7 @@ def write_to_dataset( skip_compression: column_encoding: column_type_length: + output_as_binary: """ fs = ioutils._ensure_filesystem(fs, root_path, storage_options) @@ -260,6 +264,7 @@ def write_to_dataset( skip_compression=skip_compression, column_encoding=column_encoding, column_type_length=column_type_length, + output_as_binary=output_as_binary, ) else: @@ -286,6 +291,7 @@ def write_to_dataset( skip_compression=skip_compression, column_encoding=column_encoding, column_type_length=column_type_length, + output_as_binary=output_as_binary, ) return metadata @@ -932,6 +938,7 @@ def to_parquet( skip_compression=None, column_encoding=None, column_type_length=None, + output_as_binary=None, *args, **kwargs, ): @@ -986,6 +993,7 @@ def to_parquet( skip_compression=skip_compression, column_encoding=column_encoding, column_type_length=column_type_length, + output_as_binary=output_as_binary, ) partition_info = ( @@ -1016,6 +1024,7 @@ def to_parquet( skip_compression=skip_compression, column_encoding=column_encoding, column_type_length=column_type_length, + output_as_binary=output_as_binary, ) else: diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index a237f0f5a9b..4f0dcc13c10 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -311,6 +311,9 @@ column_type_length : dict, optional, default None Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. The key is a column name and the value is an integer. +output_as_binary : set, optional, default None + If a column name is present in the set, that column will be output as + unannotated binary, rather than the default 'UTF-8'. **kwargs Additional parameters will be passed to execution engines other than ``cudf``. From ee9f414325cfa1fdcbbd832bc085a4d73865df7b Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 29 Apr 2024 11:43:50 -0700 Subject: [PATCH 13/20] add to documentation of column_type_length --- python/cudf/cudf/utils/ioutils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 4f0dcc13c10..d74ebd9b776 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -310,7 +310,9 @@ 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', or 'BYTE_STREAM_SPLIT'. column_type_length : dict, optional, default None Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. - The key is a column name and the value is an integer. + The key is a column name and the value is an integer. The named column + will be output as unannotated binary (i.e. the column will behave as if + ``output_as_binary`` was set). output_as_binary : set, optional, default None If a column name is present in the set, that column will be output as unannotated binary, rather than the default 'UTF-8'. From c75b301a7595046a5f7a2f1e667001636aac341c Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 29 Apr 2024 13:28:10 -0700 Subject: [PATCH 14/20] flesh out documentation --- python/cudf/cudf/io/parquet.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 682bb985fce..351fdd42efb 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -216,12 +216,29 @@ def write_to_dataset( If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, otherwise as `not null`. - header_version: - use_dictionary: - skip_compression: - column_encoding: - column_type_length: - output_as_binary: + header_version : {{'1.0', '2.0'}}, default "1.0" + Controls whether to use version 1.0 or version 2.0 page headers when + encoding. Version 1.0 is more portable, but version 2.0 enables the + use of newer encoding schemes. + force_nullable_schema : bool, default False. + If True, writes all columns as `null` in schema. + If False, columns are written as `null` if they contain null values, + otherwise as `not null`. + skip_compression : set, optional, default None + If a column name is present in the set, that column will not be compressed, + regardless of the ``compression`` setting. + column_encoding : dict, optional, default None + Sets the page encoding to use on a per-column basis. The key is a column + name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED', + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', or 'BYTE_STREAM_SPLIT'. + column_type_length : dict, optional, default None + Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. + The key is a column name and the value is an integer. The named column + will be output as unannotated binary (i.e. the column will behave as if + ``output_as_binary`` was set). + output_as_binary : set, optional, default None + If a column name is present in the set, that column will be output as + unannotated binary, rather than the default 'UTF-8'. """ fs = ioutils._ensure_filesystem(fs, root_path, storage_options) From 4c9b8a4bc3283fb3e2522e2944e3e22e902ab841 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 30 Apr 2024 20:06:44 +0000 Subject: [PATCH 15/20] add test for encoding and compression override --- python/cudf/cudf/tests/test_parquet.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index d855be38462..a25c1804c7b 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2835,6 +2835,22 @@ def flba(i): assert_eq(padf2.schema[0].type, padf3.schema[0].type) +def test_per_column_options(tmpdir): + pdf = pd.DataFrame({"ilist": [[1, 2, 3]], "i1": [1]}) + cdf = cudf.from_pandas(pdf) + fname = tmpdir.join("ilist.parquet") + cdf.to_parquet( + fname, + column_encoding={"ilist.list.element": "DELTA_BINARY_PACKED"}, + compression="SNAPPY", + skip_compression={"ilist.list.element"}, + ) + pf = pq.ParquetFile(fname) + fmd = pf.metadata + assert "DELTA_BINARY_PACKED" in fmd.row_group(0).column(0).encodings + assert fmd.row_group(0).column(0).compression == "UNCOMPRESSED" + + def test_parquet_reader_rle_boolean(datadir): fname = datadir / "rle_boolean_encoding.parquet" From bed11f2d3c84467a2101564191942d2d9260d61c Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 7 May 2024 20:45:43 -0700 Subject: [PATCH 16/20] remove todo --- python/cudf/cudf/tests/test_parquet.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 5d008bae79f..da784861ffd 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2846,9 +2846,7 @@ def flba(i): data = pa.array([flba(i) for i in range(num_rows)], type=pa.binary(32)) padf = pa.Table.from_arrays([data], names=["flba"]) padf_fname = tmpdir.join("padf.parquet") - # TODO: cudf cannot read fixed_len_byte_array that is dictionary encoded - # remove after merge of #15601 - pq.write_table(padf, padf_fname, use_dictionary=False) + pq.write_table(padf, padf_fname) # round trip data with cudf cdf = cudf.read_parquet(padf_fname) From df4a593d0284d9d8f7c5dd7f5f385509cf2d7f54 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 8 May 2024 17:37:23 +0000 Subject: [PATCH 17/20] add USE_DEFAULT and remove non-parquet encodings from _get_encoding_type() --- python/cudf/cudf/_lib/parquet.pyx | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 85975869a15..748c755c5f7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -810,12 +810,8 @@ cdef cudf_io_types.column_encoding _get_encoding_type(object encoding): return cudf_io_types.column_encoding.DELTA_BYTE_ARRAY elif enc == "BYTE_STREAM_SPLIT": return cudf_io_types.column_encoding.BYTE_STREAM_SPLIT - elif enc == "DIRECT": - return cudf_io_types.column_encoding.DIRECT - elif enc == "DIRECT_V2": - return cudf_io_types.column_encoding.DIRECT_V2 - elif enc == "DICTIONARY_V2": - return cudf_io_types.column_encoding.DICTIONARY_V2 + elif enc == "USE_DEFAULT": + return cudf_io_types.column_encoding.USE_DEFAULT else: raise ValueError("Unsupported `column_encoding` type") From 0a316dfc8c2c7d4742a16fcddc9f3a8a1c92e24c Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 10 May 2024 22:39:20 +0000 Subject: [PATCH 18/20] check that the other column is compressed --- python/cudf/cudf/tests/test_parquet.py | 43 +++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index adfd44c15a8..8702d721d8c 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2858,20 +2858,55 @@ def flba(i): assert_eq(padf2.schema[0].type, padf3.schema[0].type) -def test_per_column_options(tmpdir): - pdf = pd.DataFrame({"ilist": [[1, 2, 3]], "i1": [1]}) +@pytest.mark.parametrize( + "encoding", + [ + "PLAIN", + "DICTIONARY", + "DELTA_BINARY_PACKED", + "BYTE_STREAM_SPLIT", + "USE_DEFAULT", + ], +) +def test_per_column_options(tmpdir, encoding): + pdf = pd.DataFrame({"ilist": [[1, 2, 3, 1, 2, 3]], "i1": [1]}) cdf = cudf.from_pandas(pdf) fname = tmpdir.join("ilist.parquet") cdf.to_parquet( fname, - column_encoding={"ilist.list.element": "DELTA_BINARY_PACKED"}, + column_encoding={"ilist.list.element": encoding}, compression="SNAPPY", skip_compression={"ilist.list.element"}, ) + # DICTIONARY and USE_DEFAULT should both result in a PLAIN_DICTIONARY encoding in parquet + encoding_name = ( + "PLAIN_DICTIONARY" + if encoding == "DICTIONARY" or encoding == "USE_DEFAULT" + else encoding + ) pf = pq.ParquetFile(fname) fmd = pf.metadata - assert "DELTA_BINARY_PACKED" in fmd.row_group(0).column(0).encodings + assert encoding_name in fmd.row_group(0).column(0).encodings assert fmd.row_group(0).column(0).compression == "UNCOMPRESSED" + assert fmd.row_group(0).column(1).compression == "SNAPPY" + + +@pytest.mark.parametrize( + "encoding", + ["DELTA_LENGTH_BYTE_ARRAY", "DELTA_BYTE_ARRAY"], +) +def test_per_column_options_string_col(tmpdir, encoding): + pdf = pd.DataFrame({"s": ["a string"], "i1": [1]}) + cdf = cudf.from_pandas(pdf) + fname = tmpdir.join("strcol.parquet") + cdf.to_parquet( + fname, + column_encoding={"s": encoding}, + compression="SNAPPY", + ) + pf = pq.ParquetFile(fname) + fmd = pf.metadata + assert encoding in fmd.row_group(0).column(0).encodings def test_parquet_reader_rle_boolean(datadir): From a273c43eb7286952f88ee59f4f166b65df94fafd Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 10 May 2024 22:39:49 +0000 Subject: [PATCH 19/20] update docs to include USE_DEFAULT --- python/cudf/cudf/io/parquet.py | 3 ++- python/cudf/cudf/utils/ioutils.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 2e6e4e4e648..b573f709262 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -230,7 +230,8 @@ def write_to_dataset( column_encoding : dict, optional, default None Sets the page encoding to use on a per-column basis. The key is a column name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED', - 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', or 'BYTE_STREAM_SPLIT'. + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', 'BYTE_STREAM_SPLIT', or + 'USE_DEFAULT'. column_type_length : dict, optional, default None Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. The key is a column name and the value is an integer. The named column diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 76a20e9ad3d..16b8b46a8a3 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -308,7 +308,8 @@ column_encoding : dict, optional, default None Sets the page encoding to use on a per-column basis. The key is a column name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED', - 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', or 'BYTE_STREAM_SPLIT'. + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', 'BYTE_STREAM_SPLIT', or + 'USE_DEFAULT'. column_type_length : dict, optional, default None Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. The key is a column name and the value is an integer. The named column From f0183ffc536b5f83b09bdc8fc6016ac945f0e937 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 14 May 2024 22:43:02 +0000 Subject: [PATCH 20/20] change enum to PEP 435 style --- .../cudf/_lib/pylibcudf/libcudf/io/types.pxd | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd index cdc0c44ebb5..38fae1df1e5 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd @@ -57,17 +57,18 @@ cdef extern from "cudf/io/types.hpp" \ ADAPTIVE = 1, ALWAYS = 2, - ctypedef enum column_encoding: - USE_DEFAULT "cudf::io::column_encoding::USE_DEFAULT" - DICTIONARY "cudf::io::column_encoding::DICTIONARY" - PLAIN "cudf::io::column_encoding::PLAIN" - DELTA_BINARY_PACKED "cudf::io::column_encoding::DELTA_BINARY_PACKED" - DELTA_LENGTH_BYTE_ARRAY "cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY" - DELTA_BYTE_ARRAY "cudf::io::column_encoding::DELTA_BYTE_ARRAY" - BYTE_STREAM_SPLIT "cudf::io::column_encoding::BYTE_STREAM_SPLIT" - DIRECT "cudf::io::column_encoding::DIRECT" - DIRECT_V2 "cudf::io::column_encoding::DIRECT_V2" - DICTIONARY_V2 "cudf::io::column_encoding::DICTIONARY_V2" + cdef extern from "cudf/io/types.hpp" namespace "cudf::io" nogil: + cpdef enum class column_encoding: + USE_DEFAULT = -1 + DICTIONARY = 0 + PLAIN = 1 + DELTA_BINARY_PACKED = 2 + DELTA_LENGTH_BYTE_ARRAY =3 + DELTA_BYTE_ARRAY = 4 + BYTE_STREAM_SPLIT = 5 + DIRECT = 6 + DIRECT_V2 = 7 + DICTIONARY_V2 = 8 cdef cppclass column_name_info: string name