Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose some Parquet per-column configuration options via the python API #15613

Merged
merged 47 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
9bb6c6a
round trip fixed_len_byte_array data properly
etseidl Apr 25, 2024
3a3ea7b
Merge remote-tracking branch 'origin/branch-24.06' into fixed_len_rou…
etseidl Apr 25, 2024
b475084
address review comments
etseidl Apr 25, 2024
ad45ee1
checkpoint metadata paths
etseidl Apr 26, 2024
101ea2d
add some column_in_metadata methods
etseidl Apr 26, 2024
a774553
finish first cut at new options
etseidl Apr 26, 2024
92a1fe0
add doc stubs
etseidl Apr 26, 2024
79f5008
Merge branch 'rapidsai:branch-24.06' into python_metadata
etseidl Apr 27, 2024
1974f12
clean up docs and change skip_compression to a set
etseidl Apr 27, 2024
f43dae2
do not build full_path if it is not needed
etseidl Apr 29, 2024
1a44944
formatting
etseidl Apr 29, 2024
4c4b224
skip setting element names too
etseidl Apr 29, 2024
cee8c9c
add test that uses new option
etseidl Apr 29, 2024
4ad48cd
add output_as_binary as separate option
etseidl Apr 29, 2024
ee9f414
add to documentation of column_type_length
etseidl Apr 29, 2024
c75b301
flesh out documentation
etseidl Apr 29, 2024
eae6b04
Merge branch 'rapidsai:branch-24.06' into fixed_len_roundtrip
etseidl Apr 29, 2024
4c9b8a4
add test for encoding and compression override
etseidl Apr 30, 2024
aebd86f
Merge remote-tracking branch 'origin/branch-24.06' into python_metadata
etseidl Apr 30, 2024
79a93c3
Merge branch 'branch-24.06' into fixed_len_roundtrip
etseidl Apr 30, 2024
d7b23ab
Merge branch 'rapidsai:branch-24.06' into python_metadata
etseidl May 2, 2024
e41cd04
Merge branch 'rapidsai:branch-24.06' into fixed_len_roundtrip
etseidl May 2, 2024
7934e31
Merge branch 'branch-24.06' into fixed_len_roundtrip
etseidl May 3, 2024
48d5de6
Merge branch 'branch-24.06' into python_metadata
etseidl May 3, 2024
64dc418
Merge branch 'branch-24.06' into fixed_len_roundtrip
vuule May 3, 2024
60e4e0a
Merge remote-tracking branch 'github/fixed_len_roundtrip' into python…
etseidl May 3, 2024
2a79298
Merge remote-tracking branch 'origin/branch-24.06' into python_metadata
etseidl May 7, 2024
fba79f5
Merge remote-tracking branch 'origin/branch-24.06' into python_metadata
etseidl May 8, 2024
bed11f2
remove todo
etseidl May 8, 2024
6e36578
Merge branch 'branch-24.06' into python_metadata
etseidl May 8, 2024
df4a593
add USE_DEFAULT and remove non-parquet encodings from _get_encoding_t…
etseidl May 8, 2024
9cc159e
Merge branch 'branch-24.06' into python_metadata
vuule May 8, 2024
05d8194
Merge branch 'rapidsai:branch-24.06' into python_metadata
etseidl May 9, 2024
782e4ff
Merge remote-tracking branch 'origin/branch-24.06' into python_metadata
etseidl May 10, 2024
0a316df
check that the other column is compressed
etseidl May 10, 2024
a273c43
update docs to include USE_DEFAULT
etseidl May 10, 2024
30d9eb3
Merge remote-tracking branch 'origin/branch-24.06' into python_metadata
etseidl May 11, 2024
e188af2
Merge remote-tracking branch 'origin/branch-24.06' into python_metadata
etseidl May 13, 2024
3d30b4d
Merge branch 'branch-24.06' into python_metadata
etseidl May 14, 2024
b53933b
Merge remote-tracking branch 'origin/branch-24.06' into python_metadata
etseidl May 14, 2024
f0183ff
change enum to PEP 435 style
etseidl May 14, 2024
78100f4
Merge branch 'branch-24.06' into python_metadata
etseidl May 15, 2024
bc06472
Merge branch 'branch-24.06' into python_metadata
vuule May 16, 2024
1348779
Merge branch 'rapidsai:branch-24.06' into python_metadata
etseidl May 20, 2024
c797b56
Merge branch 'branch-24.06' into python_metadata
etseidl May 20, 2024
39023c5
Merge remote-tracking branch 'origin/branch-24.06' into python_metadata
etseidl May 21, 2024
bd99301
Merge branch 'branch-24.06' into python_metadata
galipremsagar May 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ def write_parquet(
object force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
object skip_compression=None,
object column_encoding=None,
object column_type_length=None,
object output_as_binary=None,
):
"""
Cython function to call into libcudf API, see `write_parquet`.
Expand Down Expand Up @@ -457,7 +461,12 @@ def write_parquet(
_set_col_metadata(
table[name]._column,
tbl_meta.column_metadata[i],
force_nullable_schema
force_nullable_schema,
None,
skip_compression,
column_encoding,
column_type_length,
output_as_binary
)

cdef map[string, string] tmp_user_data
Expand Down Expand Up @@ -809,16 +818,62 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression):
raise ValueError("Unsupported `compression` type")


cdef cudf_io_types.column_encoding _get_encoding_type(object encoding):
if encoding is None:
return cudf_io_types.column_encoding.USE_DEFAULT

enc = str(encoding).upper()
if enc == "PLAIN":
return cudf_io_types.column_encoding.PLAIN
elif enc == "DICTIONARY":
return cudf_io_types.column_encoding.DICTIONARY
elif enc == "DELTA_BINARY_PACKED":
return cudf_io_types.column_encoding.DELTA_BINARY_PACKED
elif enc == "DELTA_LENGTH_BYTE_ARRAY":
return cudf_io_types.column_encoding.DELTA_LENGTH_BYTE_ARRAY
elif enc == "DELTA_BYTE_ARRAY":
return cudf_io_types.column_encoding.DELTA_BYTE_ARRAY
elif enc == "BYTE_STREAM_SPLIT":
return cudf_io_types.column_encoding.BYTE_STREAM_SPLIT
elif enc == "USE_DEFAULT":
return cudf_io_types.column_encoding.USE_DEFAULT
else:
raise ValueError("Unsupported `column_encoding` type")


cdef _set_col_metadata(
Column col,
column_in_metadata& col_meta,
bool force_nullable_schema=False,
str path=None,
object skip_compression=None,
object column_encoding=None,
object column_type_length=None,
object output_as_binary=None,
):
need_path = (skip_compression is not None or column_encoding is not None or
column_type_length is not None or output_as_binary is not None)
name = col_meta.get_name().decode('UTF-8') if need_path else None
full_path = path + "." + name if path is not None else name

if force_nullable_schema:
# Only set nullability if `force_nullable_schema`
# is true.
col_meta.set_nullability(True)

if skip_compression is not None and full_path in skip_compression:
col_meta.set_skip_compression(True)

if column_encoding is not None and full_path in column_encoding:
col_meta.set_encoding(_get_encoding_type(column_encoding[full_path]))

if column_type_length is not None and full_path in column_type_length:
col_meta.set_output_as_binary(True)
col_meta.set_type_length(column_type_length[full_path])

if output_as_binary is not None and full_path in output_as_binary:
col_meta.set_output_as_binary(True)

if isinstance(col.dtype, cudf.StructDtype):
for i, (child_col, name) in enumerate(
zip(col.children, list(col.dtype.fields))
Expand All @@ -827,13 +882,26 @@ cdef _set_col_metadata(
_set_col_metadata(
child_col,
col_meta.child(i),
force_nullable_schema
force_nullable_schema,
full_path,
skip_compression,
column_encoding,
column_type_length,
output_as_binary
)
elif isinstance(col.dtype, cudf.ListDtype):
if full_path is not None:
full_path = full_path + ".list"
col_meta.child(1).set_name("element".encode())
_set_col_metadata(
col.children[1],
col_meta.child(1),
force_nullable_schema
force_nullable_schema,
full_path,
skip_compression,
column_encoding,
column_type_length,
output_as_binary
)
elif isinstance(col.dtype, cudf.core.dtypes.DecimalDtype):
col_meta.set_decimal_precision(col.dtype.precision)
17 changes: 16 additions & 1 deletion python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libc.stdint cimport uint8_t
from libc.stdint cimport int32_t, uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport shared_ptr, unique_ptr
Expand Down Expand Up @@ -57,6 +57,18 @@ cdef extern from "cudf/io/types.hpp" \
ADAPTIVE = 1,
ALWAYS = 2,

ctypedef enum column_encoding:
vyasr marked this conversation as resolved.
Show resolved Hide resolved
USE_DEFAULT "cudf::io::column_encoding::USE_DEFAULT"
DICTIONARY "cudf::io::column_encoding::DICTIONARY"
PLAIN "cudf::io::column_encoding::PLAIN"
DELTA_BINARY_PACKED "cudf::io::column_encoding::DELTA_BINARY_PACKED"
DELTA_LENGTH_BYTE_ARRAY "cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY"
DELTA_BYTE_ARRAY "cudf::io::column_encoding::DELTA_BYTE_ARRAY"
BYTE_STREAM_SPLIT "cudf::io::column_encoding::BYTE_STREAM_SPLIT"
DIRECT "cudf::io::column_encoding::DIRECT"
DIRECT_V2 "cudf::io::column_encoding::DIRECT_V2"
DICTIONARY_V2 "cudf::io::column_encoding::DICTIONARY_V2"

cdef cppclass column_name_info:
string name
vector[column_name_info] children
Expand All @@ -81,6 +93,9 @@ cdef extern from "cudf/io/types.hpp" \
column_in_metadata& set_decimal_precision(uint8_t precision)
column_in_metadata& child(size_type i)
column_in_metadata& set_output_as_binary(bool binary)
column_in_metadata& set_type_length(int32_t type_length)
column_in_metadata& set_skip_compression(bool skip)
column_in_metadata& set_encoding(column_encoding enc)
string get_name()

cdef cppclass table_input_metadata:
Expand Down
8 changes: 8 additions & 0 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6704,6 +6704,10 @@ def to_parquet(
return_metadata=False,
use_dictionary=True,
header_version="1.0",
skip_compression=None,
column_encoding=None,
column_type_length=None,
output_as_binary=None,
*args,
**kwargs,
):
Expand All @@ -6730,6 +6734,10 @@ def to_parquet(
return_metadata=return_metadata,
use_dictionary=use_dictionary,
header_version=header_version,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
*args,
**kwargs,
)
Expand Down
64 changes: 64 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ def _write_parquet(
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
skip_compression=None,
column_encoding=None,
column_type_length=None,
output_as_binary=None,
):
if is_list_like(paths) and len(paths) > 1:
if partitions_info is None:
Expand Down Expand Up @@ -102,6 +106,10 @@ def _write_parquet(
"force_nullable_schema": force_nullable_schema,
"header_version": header_version,
"use_dictionary": use_dictionary,
"skip_compression": skip_compression,
"column_encoding": column_encoding,
"column_type_length": column_type_length,
"output_as_binary": output_as_binary,
}
if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs):
with ExitStack() as stack:
Expand Down Expand Up @@ -140,6 +148,12 @@ def write_to_dataset(
max_page_size_rows=None,
storage_options=None,
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
skip_compression=None,
column_encoding=None,
column_type_length=None,
output_as_binary=None,
):
"""Wraps `to_parquet` to write partitioned Parquet datasets.
For each combination of partition group and value,
Expand Down Expand Up @@ -204,6 +218,30 @@ def write_to_dataset(
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
otherwise as `not null`.
header_version : {{'1.0', '2.0'}}, default "1.0"
Controls whether to use version 1.0 or version 2.0 page headers when
encoding. Version 1.0 is more portable, but version 2.0 enables the
use of newer encoding schemes.
force_nullable_schema : bool, default False.
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
otherwise as `not null`.
skip_compression : set, optional, default None
If a column name is present in the set, that column will not be compressed,
regardless of the ``compression`` setting.
column_encoding : dict, optional, default None
Sets the page encoding to use on a per-column basis. The key is a column
name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED',
'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', 'BYTE_STREAM_SPLIT', or
'USE_DEFAULT'.
column_type_length : dict, optional, default None
Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements.
The key is a column name and the value is an integer. The named column
will be output as unannotated binary (i.e. the column will behave as if
``output_as_binary`` was set).
output_as_binary : set, optional, default None
If a column name is present in the set, that column will be output as
unannotated binary, rather than the default 'UTF-8'.
"""

fs = ioutils._ensure_filesystem(fs, root_path, storage_options)
Expand Down Expand Up @@ -241,6 +279,12 @@ def write_to_dataset(
max_page_size_bytes=max_page_size_bytes,
max_page_size_rows=max_page_size_rows,
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
)

else:
Expand All @@ -262,6 +306,12 @@ def write_to_dataset(
max_page_size_bytes=max_page_size_bytes,
max_page_size_rows=max_page_size_rows,
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
)

return metadata
Expand Down Expand Up @@ -906,6 +956,10 @@ def to_parquet(
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
skip_compression=None,
column_encoding=None,
column_type_length=None,
output_as_binary=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -955,6 +1009,12 @@ def to_parquet(
return_metadata=return_metadata,
storage_options=storage_options,
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
)

partition_info = (
Expand Down Expand Up @@ -983,6 +1043,10 @@ def to_parquet(
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
)

else:
Expand Down
76 changes: 76 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2870,6 +2870,82 @@ def flba(i):
assert_eq(expect, got)


def test_parquet_flba_round_trip(tmpdir):
def flba(i):
hasher = hashlib.sha256()
hasher.update(i.to_bytes(4, "little"))
return hasher.digest()

# use pyarrow to write table of fixed_len_byte_array
num_rows = 200
data = pa.array([flba(i) for i in range(num_rows)], type=pa.binary(32))
padf = pa.Table.from_arrays([data], names=["flba"])
padf_fname = tmpdir.join("padf.parquet")
pq.write_table(padf, padf_fname)

# round trip data with cudf
cdf = cudf.read_parquet(padf_fname)
cdf_fname = tmpdir.join("cdf.parquet")
cdf.to_parquet(cdf_fname, column_type_length={"flba": 32})

# now read back in with pyarrow to test it was written properly by cudf
padf2 = pq.read_table(padf_fname)
padf3 = pq.read_table(cdf_fname)
assert_eq(padf2, padf3)
assert_eq(padf2.schema[0].type, padf3.schema[0].type)


@pytest.mark.parametrize(
"encoding",
[
"PLAIN",
"DICTIONARY",
"DELTA_BINARY_PACKED",
"BYTE_STREAM_SPLIT",
"USE_DEFAULT",
],
)
def test_per_column_options(tmpdir, encoding):
pdf = pd.DataFrame({"ilist": [[1, 2, 3, 1, 2, 3]], "i1": [1]})
cdf = cudf.from_pandas(pdf)
fname = tmpdir.join("ilist.parquet")
cdf.to_parquet(
fname,
column_encoding={"ilist.list.element": encoding},
compression="SNAPPY",
skip_compression={"ilist.list.element"},
)
# DICTIONARY and USE_DEFAULT should both result in a PLAIN_DICTIONARY encoding in parquet
encoding_name = (
"PLAIN_DICTIONARY"
if encoding == "DICTIONARY" or encoding == "USE_DEFAULT"
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
else encoding
)
pf = pq.ParquetFile(fname)
fmd = pf.metadata
assert encoding_name in fmd.row_group(0).column(0).encodings
assert fmd.row_group(0).column(0).compression == "UNCOMPRESSED"
assert fmd.row_group(0).column(1).compression == "SNAPPY"


@pytest.mark.parametrize(
"encoding",
["DELTA_LENGTH_BYTE_ARRAY", "DELTA_BYTE_ARRAY"],
)
def test_per_column_options_string_col(tmpdir, encoding):
pdf = pd.DataFrame({"s": ["a string"], "i1": [1]})
cdf = cudf.from_pandas(pdf)
fname = tmpdir.join("strcol.parquet")
cdf.to_parquet(
fname,
column_encoding={"s": encoding},
compression="SNAPPY",
)
pf = pq.ParquetFile(fname)
fmd = pf.metadata
assert encoding in fmd.row_group(0).column(0).encodings


def test_parquet_reader_rle_boolean(datadir):
fname = datadir / "rle_boolean_encoding.parquet"

Expand Down
Loading
Loading