From 062389dbad961b62552c88425acac0f017ce7c29 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Sat, 15 Jun 2024 00:10:15 +0000 Subject: [PATCH 1/9] Migrate ORC reader to pylibcudf --- python/cudf/cudf/_lib/io/utils.pxd | 4 + python/cudf/cudf/_lib/orc.pyx | 142 +++++++------------ python/cudf/cudf/_lib/utils.pxd | 2 +- python/cudf/cudf/_lib/utils.pyx | 6 +- python/pylibcudf/pylibcudf/io/CMakeLists.txt | 2 +- python/pylibcudf/pylibcudf/io/__init__.pxd | 2 +- python/pylibcudf/pylibcudf/io/__init__.py | 2 +- python/pylibcudf/pylibcudf/io/orc.pxd | 20 +++ python/pylibcudf/pylibcudf/io/orc.pyx | 62 ++++++++ 9 files changed, 145 insertions(+), 97 deletions(-) create mode 100644 python/pylibcudf/pylibcudf/io/orc.pxd create mode 100644 python/pylibcudf/pylibcudf/io/orc.pyx diff --git a/python/cudf/cudf/_lib/io/utils.pxd b/python/cudf/cudf/_lib/io/utils.pxd index 1938f00c179..76a6e32fde0 100644 --- a/python/cudf/cudf/_lib/io/utils.pxd +++ b/python/cudf/cudf/_lib/io/utils.pxd @@ -21,6 +21,10 @@ cdef add_df_col_struct_names( df, child_names_dict ) +cdef update_col_struct_field_names( + Column col, + child_names +) cdef update_struct_field_names( table, vector[column_name_info]& schema_info) diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index adeba6fffb1..b5c4aeac29d 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -14,21 +14,18 @@ from libcpp.vector cimport vector import datetime from collections import OrderedDict -cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view - try: import ujson as json except ImportError: import json +cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view cimport pylibcudf.libcudf.io.types as cudf_io_types from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.orc cimport ( chunked_orc_writer_options, orc_chunked_writer, - orc_reader_options, orc_writer_options, - read_orc as libcudf_read_orc, write_orc as libcudf_write_orc, ) from pylibcudf.libcudf.io.orc_metadata cimport ( @@ -50,9 +47,7 @@ from pylibcudf.libcudf.io.types cimport ( column_in_metadata, compression_type, sink_info, - source_info, table_input_metadata, - table_with_metadata, ) from pylibcudf.libcudf.table.table_view cimport table_view from pylibcudf.libcudf.types cimport data_type, size_type, type_id @@ -62,13 +57,12 @@ from cudf._lib.column cimport Column from cudf._lib.io.utils cimport ( make_sink_info, make_source_info, - update_column_struct_field_names, + update_col_struct_field_names, ) +from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES +from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table -from cudf._lib.types import SUPPORTED_NUMPY_TO_LIBCUDF_TYPES - -from cudf._lib.types cimport underlying_type_t_type_id -from cudf._lib.utils cimport data_from_unique_ptr, table_view_from_table +import pylibcudf as plc from cudf._lib.utils import _index_level_name, generate_pandas_metadata @@ -236,35 +230,28 @@ cpdef read_orc(object filepaths_or_buffers, -------- cudf.read_orc """ - cdef orc_reader_options c_orc_reader_options = make_orc_reader_options( - filepaths_or_buffers, + + if columns is not None: + columns = [str(col) for col in columns] + + tbl_w_meta = plc.io.orc.read_orc( + plc.io.SourceInfo(filepaths_or_buffers), columns, - stripes or [], + stripes, get_skiprows_arg(skip_rows), get_num_rows_arg(num_rows), - ( - type_id.EMPTY - if timestamp_type is None else - ( - ( - SUPPORTED_NUMPY_TO_LIBCUDF_TYPES[ - cudf.dtype(timestamp_type) - ] - ) - ) - ), use_index, + plc.types.DataType( + SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES[ + cudf.dtype(timestamp_type) + ] + ) ) - cdef table_with_metadata c_result - cdef size_type nrows + names = tbl_w_meta.column_names - with nogil: - c_result = move(libcudf_read_orc(c_orc_reader_options)) - - names = [info.name.decode() for info in c_result.metadata.schema_info] actual_index_names, col_names, is_range_index, reset_index_name, \ - range_idx = _get_index_from_metadata(c_result.metadata.user_data, + range_idx = _get_index_from_metadata(tbl_w_meta.per_file_user_data, names, skip_rows, num_rows) @@ -272,11 +259,11 @@ cpdef read_orc(object filepaths_or_buffers, if columns is not None and (isinstance(columns, list) and len(columns) == 0): # When `columns=[]`, index needs to be # established, but not the columns. - nrows = c_result.tbl.get()[0].view().num_rows() + nrows = tbl_w_meta.tbl.num_rows() return {}, cudf.RangeIndex(nrows) - data, index = data_from_unique_ptr( - move(c_result.tbl), + data, index = data_from_pylibcudf_io( + tbl_w_meta, col_names if columns is None else names, actual_index_names ) @@ -286,11 +273,13 @@ cpdef read_orc(object filepaths_or_buffers, elif reset_index_name: index.names = [None] * len(index.names) + child_name_values = tbl_w_meta.child_names.values() + data = { - name: update_column_struct_field_names( - col, c_result.metadata.schema_info[i] + name: update_col_struct_field_names( + col, child_names ) - for i, (name, col) in enumerate(data.items()) + for (name, col), child_names in zip(data.items(), child_name_values) } return data, index @@ -313,32 +302,38 @@ cdef compression_type _get_comp_type(object compression): raise ValueError(f"Unsupported `compression` type {compression}") cdef tuple _get_index_from_metadata( - map[string, string] user_data, + vector[map[string, string]] user_data, object names, object skip_rows, object num_rows): - json_str = user_data[b'pandas'].decode('utf-8') + + # TODO: consider metadata from more than the first file? + # Note: This code used to use the deprecated user_data member on + # table_metadata (which only considers the first file) meta = None index_col = None is_range_index = False reset_index_name = False range_idx = None - if json_str != "": - meta = json.loads(json_str) - if 'index_columns' in meta and len(meta['index_columns']) > 0: - index_col = meta['index_columns'] - if isinstance(index_col[0], dict) and \ - index_col[0]['kind'] == 'range': - is_range_index = True - else: - index_col_names = OrderedDict() - for idx_col in index_col: - for c in meta['columns']: - if c['field_name'] == idx_col: - index_col_names[idx_col] = \ - c['name'] or c['field_name'] - if c['name'] is None: - reset_index_name = True + + if user_data.size() > 0: + json_str = user_data[0][b'pandas'].decode('utf-8') + if json_str != "": + meta = json.loads(json_str) + if 'index_columns' in meta and len(meta['index_columns']) > 0: + index_col = meta['index_columns'] + if isinstance(index_col[0], dict) and \ + index_col[0]['kind'] == 'range': + is_range_index = True + else: + index_col_names = OrderedDict() + for idx_col in index_col: + for c in meta['columns']: + if c['field_name'] == idx_col: + index_col_names[idx_col] = \ + c['name'] or c['field_name'] + if c['name'] is None: + reset_index_name = True actual_index_names = None if index_col is not None and len(index_col) > 0: @@ -473,41 +468,6 @@ cdef int64_t get_num_rows_arg(object arg) except*: return arg -cdef orc_reader_options make_orc_reader_options( - object filepaths_or_buffers, - object column_names, - object stripes, - int64_t skip_rows, - int64_t num_rows, - type_id timestamp_type, - bool use_index -) except*: - - cdef vector[vector[size_type]] strps = stripes - cdef orc_reader_options opts - cdef source_info src = make_source_info(filepaths_or_buffers) - opts = move( - orc_reader_options.builder(src) - .stripes(strps) - .skip_rows(skip_rows) - .timestamp_type(data_type(timestamp_type)) - .use_index(use_index) - .build() - ) - if num_rows >= 0: - opts.set_num_rows(num_rows) - - cdef vector[string] c_column_names - if column_names is not None: - c_column_names.reserve(len(column_names)) - for col in column_names: - c_column_names.push_back(str(col).encode()) - if len(column_names) > 0: - opts.set_columns(c_column_names) - - return opts - - cdef class ORCWriter: """ ORCWriter lets you you incrementally write out a ORC file from a series diff --git a/python/cudf/cudf/_lib/utils.pxd b/python/cudf/cudf/_lib/utils.pxd index ff97fe80310..7254db5c43d 100644 --- a/python/cudf/cudf/_lib/utils.pxd +++ b/python/cudf/cudf/_lib/utils.pxd @@ -11,7 +11,7 @@ from pylibcudf.libcudf.table.table cimport table, table_view cdef data_from_unique_ptr( unique_ptr[table] c_tbl, column_names, index_names=*) cdef data_from_pylibcudf_table(tbl, column_names, index_names=*) -cdef data_from_pylibcudf_io(tbl_with_meta) +cdef data_from_pylibcudf_io(tbl_with_meta, column_names = *, index_names = *) cdef data_from_table_view( table_view tv, object owner, object column_names, object index_names=*) cdef table_view table_view_from_columns(columns) except * diff --git a/python/cudf/cudf/_lib/utils.pyx b/python/cudf/cudf/_lib/utils.pyx index cae28d02ef4..3dbe47da76d 100644 --- a/python/cudf/cudf/_lib/utils.pyx +++ b/python/cudf/cudf/_lib/utils.pyx @@ -316,15 +316,17 @@ cdef data_from_pylibcudf_table(tbl, column_names, index_names=None): index_names ) -cdef data_from_pylibcudf_io(tbl_with_meta): +cdef data_from_pylibcudf_io(tbl_with_meta, column_names=None, index_names=None): """ Unpacks the TableWithMetadata from libcudf I/O into a dict of columns and an Index (cuDF format) """ + if column_names is None: + column_names = tbl_with_meta.column_names return _data_from_columns( columns=[Column.from_pylibcudf(plc) for plc in tbl_with_meta.columns], column_names=tbl_with_meta.column_names(include_children=False), - index_names=None + index_names=index_names ) cdef columns_from_table_view( diff --git a/python/pylibcudf/pylibcudf/io/CMakeLists.txt b/python/pylibcudf/pylibcudf/io/CMakeLists.txt index bcc2151f5b6..529a71a48ce 100644 --- a/python/pylibcudf/pylibcudf/io/CMakeLists.txt +++ b/python/pylibcudf/pylibcudf/io/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx parquet.pyx types.pyx) +set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx orc.pyx parquet.pyx types.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( diff --git a/python/pylibcudf/pylibcudf/io/__init__.pxd b/python/pylibcudf/pylibcudf/io/__init__.pxd index 62820048584..5927a19dc69 100644 --- a/python/pylibcudf/pylibcudf/io/__init__.pxd +++ b/python/pylibcudf/pylibcudf/io/__init__.pxd @@ -1,5 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. # CSV is removed since it is def not cpdef (to force kw-only arguments) -from . cimport avro, datasource, json, parquet, types +from . cimport avro, datasource, json, orc, parquet, types from .types cimport SourceInfo, TableWithMetadata diff --git a/python/pylibcudf/pylibcudf/io/__init__.py b/python/pylibcudf/pylibcudf/io/__init__.py index 27640f7d955..5d899ee0808 100644 --- a/python/pylibcudf/pylibcudf/io/__init__.py +++ b/python/pylibcudf/pylibcudf/io/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . import avro, csv, datasource, json, parquet, types +from . import avro, csv, datasource, json, orc, parquet, types from .types import SinkInfo, SourceInfo, TableWithMetadata diff --git a/python/pylibcudf/pylibcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/io/orc.pxd new file mode 100644 index 00000000000..a19f89c57c3 --- /dev/null +++ b/python/pylibcudf/pylibcudf/io/orc.pxd @@ -0,0 +1,20 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +from libcpp cimport bool +from libcpp.vector cimport vector + +from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.libcudf.types cimport size_type +from pylibcudf.types cimport DataType + + +cpdef TableWithMetadata read_orc( + SourceInfo source_info, + list columns = *, + list stripes = *, + size_type skip_rows = *, + size_type num_rows = *, + bool use_index = *, + bool use_np_dtypes = *, + DataType timestamp_type = *, + list decimal128_columns = * +) diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx new file mode 100644 index 00000000000..a8581814db7 --- /dev/null +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -0,0 +1,62 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +from libcpp cimport bool +from libcpp.string cimport string +from libcpp.utility cimport move +from libcpp.vector cimport vector + +from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.libcudf.io.orc cimport ( + orc_reader_options, + read_orc as cpp_read_orc, +) +from pylibcudf.libcudf.io.types cimport table_with_metadata +from pylibcudf.libcudf.types cimport size_type, type_id +from pylibcudf.types cimport DataType + + +cpdef TableWithMetadata read_orc( + SourceInfo source_info, + list columns = None, + list stripes = None, + size_type skip_rows = 0, + size_type num_rows = -1, + bool use_index = True, + bool use_np_dtypes = True, + DataType timestamp_type = DataType(type_id.EMPTY), + list decimal128_columns = None, +): + """ + """ + cdef orc_reader_options opts + cdef vector[vector[size_type]] c_stripes + opts = move( + orc_reader_options.builder(source_info.c_obj) + .use_index(use_index) + .build() + ) + if num_rows >= 0: + opts.set_num_rows(num_rows) + if skip_rows >= 0: + opts.set_skip_rows(skip_rows) + if stripes is not None: + c_stripes = stripes + opts.set_stripes(c_stripes) + if timestamp_type.id() != type_id.EMPTY: + opts.set_timestamp_type(timestamp_type.c_obj) + + cdef vector[string] c_column_names + if columns is not None: + c_column_names.reserve(len(columns)) + for col in columns: + if not isinstance(col, str): + raise TypeError("Column names must be strings!") + c_column_names.push_back(str(col).encode()) + if len(columns) > 0: + opts.set_columns(c_column_names) + + cdef table_with_metadata c_result + + with nogil: + c_result = move(cpp_read_orc(opts)) + + return TableWithMetadata.from_libcudf(c_result) From c02bdf92d75175d2ffc5a4776d6fb1c045db8c28 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 17 Jun 2024 21:31:29 +0000 Subject: [PATCH 2/9] fix tests --- python/cudf/cudf/_lib/orc.pyx | 150 ++----------- python/cudf/cudf/_lib/pylibcudf/io/types.pyx | 150 +++++++++++++ .../cudf/_lib/pylibcudf/utils/__init__.pxd | 3 + python/cudf/cudf/io/orc.py | 11 +- python/cudf/cudf/tests/test_orc.py | 36 ++-- python/cudf/cudf/utils/ioutils.py | 5 +- python/pylibcudf/pylibcudf/io/datasource.pxd | 2 + python/pylibcudf/pylibcudf/io/datasource.pyx | 3 +- python/pylibcudf/pylibcudf/io/orc.pxd | 31 +++ python/pylibcudf/pylibcudf/io/orc.pyx | 203 ++++++++++++++++++ .../pylibcudf/libcudf/io/orc_metadata.pxd | 2 + 11 files changed, 434 insertions(+), 162 deletions(-) create mode 100644 python/cudf/cudf/_lib/pylibcudf/io/types.pyx create mode 100644 python/cudf/cudf/_lib/pylibcudf/utils/__init__.pxd diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index b5c4aeac29d..69631e09e46 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -1,8 +1,5 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -import cudf -from cudf.core.buffer import acquire_spill_lock - from libc.stdint cimport int64_t from libcpp cimport bool, int from libcpp.map cimport map @@ -11,7 +8,6 @@ from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector -import datetime from collections import OrderedDict try: @@ -59,131 +55,15 @@ from cudf._lib.io.utils cimport ( make_source_info, update_col_struct_field_names, ) -from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table -import pylibcudf as plc - +import cudf +from cudf.core.buffer import acquire_spill_lock +from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES from cudf._lib.utils import _index_level_name, generate_pandas_metadata +import pylibcudf as plc -cdef _parse_column_type_statistics(column_statistics stats): - # Initialize stats to return and parse stats blob - column_stats = {} - - if stats.number_of_values.has_value(): - column_stats["number_of_values"] = stats.number_of_values.value() - - if stats.has_null.has_value(): - column_stats["has_null"] = stats.has_null.value() - - cdef statistics_type type_specific_stats = stats.type_specific_stats - - cdef integer_statistics* int_stats - cdef double_statistics* dbl_stats - cdef string_statistics* str_stats - cdef bucket_statistics* bucket_stats - cdef decimal_statistics* dec_stats - cdef date_statistics* date_stats - cdef binary_statistics* bin_stats - cdef timestamp_statistics* ts_stats - - if holds_alternative[no_statistics](type_specific_stats): - return column_stats - elif int_stats := std_get_if[integer_statistics](&type_specific_stats): - if int_stats.minimum.has_value(): - column_stats["minimum"] = int_stats.minimum.value() - else: - column_stats["minimum"] = None - if int_stats.maximum.has_value(): - column_stats["maximum"] = int_stats.maximum.value() - else: - column_stats["maximum"] = None - if int_stats.sum.has_value(): - column_stats["sum"] = int_stats.sum.value() - else: - column_stats["sum"] = None - elif dbl_stats := std_get_if[double_statistics](&type_specific_stats): - if dbl_stats.minimum.has_value(): - column_stats["minimum"] = dbl_stats.minimum.value() - else: - column_stats["minimum"] = None - if dbl_stats.maximum.has_value(): - column_stats["maximum"] = dbl_stats.maximum.value() - else: - column_stats["maximum"] = None - if dbl_stats.sum.has_value(): - column_stats["sum"] = dbl_stats.sum.value() - else: - column_stats["sum"] = None - elif str_stats := std_get_if[string_statistics](&type_specific_stats): - if str_stats.minimum.has_value(): - column_stats["minimum"] = str_stats.minimum.value().decode("utf-8") - else: - column_stats["minimum"] = None - if str_stats.maximum.has_value(): - column_stats["maximum"] = str_stats.maximum.value().decode("utf-8") - else: - column_stats["maximum"] = None - if str_stats.sum.has_value(): - column_stats["sum"] = str_stats.sum.value() - else: - column_stats["sum"] = None - elif bucket_stats := std_get_if[bucket_statistics](&type_specific_stats): - column_stats["true_count"] = bucket_stats.count[0] - column_stats["false_count"] = ( - column_stats["number_of_values"] - - column_stats["true_count"] - ) - elif dec_stats := std_get_if[decimal_statistics](&type_specific_stats): - if dec_stats.minimum.has_value(): - column_stats["minimum"] = dec_stats.minimum.value().decode("utf-8") - else: - column_stats["minimum"] = None - if dec_stats.maximum.has_value(): - column_stats["maximum"] = dec_stats.maximum.value().decode("utf-8") - else: - column_stats["maximum"] = None - if dec_stats.sum.has_value(): - column_stats["sum"] = dec_stats.sum.value().decode("utf-8") - else: - column_stats["sum"] = None - elif date_stats := std_get_if[date_statistics](&type_specific_stats): - if date_stats.minimum.has_value(): - column_stats["minimum"] = datetime.datetime.fromtimestamp( - datetime.timedelta(date_stats.minimum.value()).total_seconds(), - datetime.timezone.utc, - ) - else: - column_stats["minimum"] = None - if date_stats.maximum.has_value(): - column_stats["maximum"] = datetime.datetime.fromtimestamp( - datetime.timedelta(date_stats.maximum.value()).total_seconds(), - datetime.timezone.utc, - ) - else: - column_stats["maximum"] = None - elif bin_stats := std_get_if[binary_statistics](&type_specific_stats): - if bin_stats.sum.has_value(): - column_stats["sum"] = bin_stats.sum.value() - else: - column_stats["sum"] = None - elif ts_stats := std_get_if[timestamp_statistics](&type_specific_stats): - # Before ORC-135, the local timezone offset was included and they were - # stored as minimum and maximum. After ORC-135, the timestamp is - # adjusted to UTC before being converted to milliseconds and stored - # in minimumUtc and maximumUtc. - # TODO: Support minimum and maximum by reading writer's local timezone - if ts_stats.minimum_utc.has_value() and ts_stats.maximum_utc.has_value(): - column_stats["minimum"] = datetime.datetime.fromtimestamp( - ts_stats.minimum_utc.value() / 1000, datetime.timezone.utc - ) - column_stats["maximum"] = datetime.datetime.fromtimestamp( - ts_stats.maximum_utc.value() / 1000, datetime.timezone.utc - ) - else: - raise ValueError("Unsupported statistics type") - return column_stats cpdef read_parsed_orc_statistics(filepath_or_buffer): @@ -195,22 +75,24 @@ cpdef read_parsed_orc_statistics(filepath_or_buffer): cudf.io.orc.read_orc_statistics """ - cdef parsed_orc_statistics parsed = ( - libcudf_read_parsed_orc_statistics(make_source_info([filepath_or_buffer])) + parsed = ( + plc.io.orc.read_parsed_orc_statistics( + plc.io.SourceInfo([filepath_or_buffer]) + ) ) - cdef vector[column_statistics] file_stats = parsed.file_stats - cdef vector[vector[column_statistics]] stripes_stats = parsed.stripes_stats + file_stats = parsed.file_stats + stripes_stats = parsed.stripes_stats parsed_file_stats = [ - _parse_column_type_statistics(file_stats[column_index]) - for column_index in range(file_stats.size()) + file_stat + for file_stat in file_stats ] parsed_stripes_stats = [ - [_parse_column_type_statistics(stripes_stats[stripe_index][column_index]) - for column_index in range(stripes_stats[stripe_index].size())] - for stripe_index in range(stripes_stats.size()) + [col_stat + for col_stat in stripes_stat] + for stripes_stat in stripes_stats ] return parsed.column_names, parsed_file_stats, parsed_stripes_stats @@ -248,7 +130,7 @@ cpdef read_orc(object filepaths_or_buffers, ) ) - names = tbl_w_meta.column_names + names = tbl_w_meta.column_names(include_children=False) actual_index_names, col_names, is_range_index, reset_index_name, \ range_idx = _get_index_from_metadata(tbl_w_meta.per_file_user_data, diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx new file mode 100644 index 00000000000..a11dbdade08 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx @@ -0,0 +1,150 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp.string cimport string +from libcpp.utility cimport move +from libcpp.vector cimport vector + +from cudf._lib.pylibcudf.io.datasource cimport Datasource +from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource +from cudf._lib.pylibcudf.libcudf.io.types cimport ( + host_buffer, + source_info, + table_with_metadata, +) + +import errno +import io +import os + + +cdef class TableWithMetadata: + """A container holding a table and its associated metadata + (e.g. column names) + + For details, see :cpp:class:`cudf::io::table_with_metadata`. + """ + + @property + def columns(self): + """ + Return a list containing the columns of the table + """ + return self.tbl.columns() + + @property + def column_names(self): + """ + Return a list containing the column names of the table + """ + cdef list names = [] + for col_info in self.metadata.schema_info: + names.append(col_info.name.decode()) + return names + + @property + def per_file_user_data(self): + """ + Returns a list containing a dict + containing file-format specific metadata, + for each file being read in. + """ + return self.metadata.per_file_user_data + + @property + def child_names(self): + """ + Return a dictionary mapping the names of columns with children + to the names of their child columns + """ + return TableWithMetadata._parse_col_names(self.metadata.schema_info) + + @staticmethod + cdef dict _parse_col_names(vector[column_name_info] infos): + cdef dict child_names = dict() + cdef dict names = dict() + for col_info in infos: + child_names = TableWithMetadata._parse_col_names(col_info.children) + names[col_info.name.decode()] = child_names + return names + + @staticmethod + cdef TableWithMetadata from_libcudf(table_with_metadata& tbl_with_meta): + """Create a Python TableWithMetadata from a libcudf table_with_metadata""" + cdef TableWithMetadata out = TableWithMetadata.__new__(TableWithMetadata) + out.tbl = Table.from_libcudf(move(tbl_with_meta.tbl)) + out.metadata = tbl_with_meta.metadata + return out + +cdef class SourceInfo: + """A class containing details on a source to read from. + + For details, see :cpp:class:`cudf::io::source_info`. + + Parameters + ---------- + sources : List[Union[str, os.PathLike, bytes, io.BytesIO]] + A homogeneous list of sources (this can be a string filename, + an os.PathLike, bytes, or an io.BytesIO) to read from. + + Mixing different types of sources will raise a `ValueError`. + """ + + def __init__(self, list sources): + if not sources: + raise ValueError("Need to pass at least one source") + + cdef vector[string] c_files + cdef vector[datasource*] c_datasources + + if isinstance(sources[0], (os.PathLike, str)): + c_files.reserve(len(sources)) + + for src in sources: + if not isinstance(src, (os.PathLike, str)): + raise ValueError("All sources must be of the same type!") + if not os.path.isfile(src): + raise FileNotFoundError(errno.ENOENT, + os.strerror(errno.ENOENT), + src) + + c_files.push_back( str(src).encode()) + + self.c_obj = move(source_info(c_files)) + return + elif isinstance(sources[0], Datasource): + for csrc in sources: + if not isinstance(csrc, Datasource): + raise ValueError("All sources must be of the same type!") + c_datasources.push_back((csrc).get_datasource()) + self.c_obj = move(source_info(c_datasources)) + return + + # TODO: host_buffer is deprecated API, use host_span instead + cdef vector[host_buffer] c_host_buffers + cdef const unsigned char[::1] c_buffer + cdef bint empty_buffer = False + if isinstance(sources[0], bytes): + empty_buffer = True + for buffer in sources: + if not isinstance(buffer, bytes): + raise ValueError("All sources must be of the same type!") + if (len(buffer) > 0): + c_buffer = buffer + c_host_buffers.push_back(host_buffer(&c_buffer[0], + c_buffer.shape[0])) + empty_buffer = False + elif isinstance(sources[0], io.BytesIO): + for bio in sources: + if not isinstance(bio, io.BytesIO): + raise ValueError("All sources must be of the same type!") + c_buffer = bio.getbuffer() # check if empty? + c_host_buffers.push_back(host_buffer(&c_buffer[0], + c_buffer.shape[0])) + else: + raise ValueError("Sources must be a list of str/paths, " + "bytes, or io.BytesIO") + + if empty_buffer is True: + c_host_buffers.push_back(host_buffer(NULL, 0)) + + self.c_obj = source_info(c_host_buffers) diff --git a/python/cudf/cudf/_lib/pylibcudf/utils/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/utils/__init__.pxd new file mode 100644 index 00000000000..1205cd74690 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/utils/__init__.pxd @@ -0,0 +1,3 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from . cimport variant diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index fd246c6215f..969a18dafd5 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -181,11 +181,6 @@ def read_orc_statistics( parsed_stripes_statistics, ) = liborc.read_parsed_orc_statistics(path_or_buf) - # Parse column names - column_names = [ - column_name.decode("utf-8") for column_name in column_names - ] - # Parse file statistics file_statistics = { column_name: column_stats @@ -248,9 +243,9 @@ def _filter_stripes( num_rows_scanned = 0 for i, stripe_statistics in enumerate(stripes_statistics): num_rows_before_stripe = num_rows_scanned - num_rows_scanned += next(iter(stripe_statistics.values()))[ - "number_of_values" - ] + num_rows_scanned += next( + iter(stripe_statistics.values()) + ).number_of_values if stripes is not None and i not in stripes: continue if skip_rows is not None and num_rows_scanned <= skip_rows: diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index e0884a5819a..6aa1e4001fa 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -184,25 +184,27 @@ def test_orc_read_statistics(datadir): pytest.skip(".orc file is not found: %s" % e) # Check numberOfValues - assert_eq(file_statistics[0]["int1"]["number_of_values"], 11_000) + # assert_eq(file_statistics[0]["int1"]["number_of_values"], 11_000) + print(file_statistics[0]) + assert_eq(file_statistics[0]["int1"].number_of_values, 11_000) assert_eq( - file_statistics[0]["int1"]["number_of_values"], + file_statistics[0]["int1"].number_of_values, sum( [ - stripes_statistics[0]["int1"]["number_of_values"], - stripes_statistics[1]["int1"]["number_of_values"], - stripes_statistics[2]["int1"]["number_of_values"], + stripes_statistics[0]["int1"].number_of_values, + stripes_statistics[1]["int1"].number_of_values, + stripes_statistics[2]["int1"].number_of_values, ] ), ) assert_eq( - stripes_statistics[1]["int1"]["number_of_values"], - stripes_statistics[1]["string1"]["number_of_values"], + stripes_statistics[1]["int1"].number_of_values, + stripes_statistics[1]["string1"].number_of_values, ) - assert_eq(stripes_statistics[2]["string1"]["number_of_values"], 1_000) + assert_eq(stripes_statistics[2]["string1"].number_of_values, 1_000) # Check other statistics - assert_eq(stripes_statistics[2]["string1"]["has_null"], False) + assert_eq(stripes_statistics[2]["string1"].has_null, False) assert_eq( file_statistics[0]["int1"]["minimum"], min( @@ -1538,8 +1540,8 @@ def test_empty_statistics(): for stats in got: # Similar expected stats for the first 6 columns in this case for col_name in ascii_lowercase[:6]: - assert stats[0][col_name].get("number_of_values") == 0 - assert stats[0][col_name].get("has_null") is True + assert stats[0][col_name].number_of_values == 0 + assert stats[0][col_name].has_null is True assert stats[0][col_name].get("minimum") is None assert stats[0][col_name].get("maximum") is None for col_name in ascii_lowercase[:3]: @@ -1547,17 +1549,17 @@ def test_empty_statistics(): # Sum for decimal column is a string assert stats[0]["d"].get("sum") == "0" - assert stats[0]["g"].get("number_of_values") == 0 - assert stats[0]["g"].get("has_null") is True + assert stats[0]["g"].number_of_values == 0 + assert stats[0]["g"].has_null is True assert stats[0]["g"].get("true_count") == 0 assert stats[0]["g"].get("false_count") == 0 - assert stats[0]["h"].get("number_of_values") == 0 - assert stats[0]["h"].get("has_null") is True + assert stats[0]["h"].number_of_values == 0 + assert stats[0]["h"].has_null is True assert stats[0]["h"].get("sum") == 0 - assert stats[0]["i"].get("number_of_values") == 1 - assert stats[0]["i"].get("has_null") is False + assert stats[0]["i"].number_of_values == 1 + assert stats[0]["i"].has_null is False assert stats[0]["i"].get("minimum") == 1 assert stats[0]["i"].get("maximum") == 1 assert stats[0]["i"].get("sum") == 1 diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 6b146be0fa3..645871c1c21 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1866,13 +1866,14 @@ def _apply_filter_bool_eq(val, col_stats): return False elif val is False: if (col_stats["false_count"] == 0) or ( - col_stats["true_count"] == col_stats["number_of_values"] + col_stats["true_count"] == col_stats.number_of_values ): return False return True def _apply_filter_not_eq(val, col_stats): + print(col_stats) return ("minimum" in col_stats and val < col_stats["minimum"]) or ( "maximum" in col_stats and val > col_stats["maximum"] ) @@ -1893,7 +1894,7 @@ def _apply_predicate(op, val, col_stats): return False # TODO: Replace pd.isnull with # cudf.isnull once it is implemented - if pd.isnull(val) and not col_stats["has_null"]: + if pd.isnull(val) and not col_stats.has_null: return False if not _apply_filter_bool_eq(val, col_stats): return False diff --git a/python/pylibcudf/pylibcudf/io/datasource.pxd b/python/pylibcudf/pylibcudf/io/datasource.pxd index c08f36693c7..97e1bba8ef2 100644 --- a/python/pylibcudf/pylibcudf/io/datasource.pxd +++ b/python/pylibcudf/pylibcudf/io/datasource.pxd @@ -1,5 +1,7 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. +from libcpp.memory cimport shared_ptr + from pylibcudf.libcudf.io.datasource cimport datasource diff --git a/python/pylibcudf/pylibcudf/io/datasource.pyx b/python/pylibcudf/pylibcudf/io/datasource.pyx index 02418444caa..0db85ad2353 100644 --- a/python/pylibcudf/pylibcudf/io/datasource.pyx +++ b/python/pylibcudf/pylibcudf/io/datasource.pyx @@ -1,7 +1,8 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from pylibcudf.libcudf.io.datasource cimport datasource +from libcpp.memory cimport shared_ptr +from pylibcudf.libcudf.io.datasource cimport datasource cdef class Datasource: cdef datasource* get_datasource(self) except * nogil: diff --git a/python/pylibcudf/pylibcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/io/orc.pxd index a19f89c57c3..af4768f937b 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/io/orc.pxd @@ -1,10 +1,18 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libc.stdint cimport uint64_t from libcpp cimport bool +from libcpp.optional cimport optional +from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.io.types cimport SourceInfo, TableWithMetadata from pylibcudf.libcudf.types cimport size_type from pylibcudf.types cimport DataType +from pylibcudf.libcudf.io.orc_metadata cimport ( + column_statistics, + parsed_orc_statistics, + statistics_type, +) cpdef TableWithMetadata read_orc( @@ -18,3 +26,26 @@ cpdef TableWithMetadata read_orc( DataType timestamp_type = *, list decimal128_columns = * ) + +cdef class OrcColumnStatistics: + cdef optional[uint64_t] number_of_values_c + cdef optional[bool] has_null_c + cdef statistics_type type_specific_stats_c + cdef dict column_stats + + cdef void _init_stats_dict(self) + + @staticmethod + cdef OrcColumnStatistics from_libcudf(column_statistics& col_stats) + + +cdef class ParsedOrcStatistics: + cdef parsed_orc_statistics c_obj + + @staticmethod + cdef ParsedOrcStatistics from_libcudf(parsed_orc_statistics& orc_stats) + + +cpdef ParsedOrcStatistics read_parsed_orc_statistics( + SourceInfo source_info +) diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index a8581814db7..a303bfeb019 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -4,6 +4,8 @@ from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +import datetime + from pylibcudf.io.types cimport SourceInfo, TableWithMetadata from pylibcudf.libcudf.io.orc cimport ( orc_reader_options, @@ -12,6 +14,198 @@ from pylibcudf.libcudf.io.orc cimport ( from pylibcudf.libcudf.io.types cimport table_with_metadata from pylibcudf.libcudf.types cimport size_type, type_id from pylibcudf.types cimport DataType +from pylibcudf.libcudf.io.orc cimport ( + orc_reader_options, + read_orc as cpp_read_orc, +) +from pylibcudf.libcudf.io.orc_metadata cimport ( + binary_statistics, + bucket_statistics, + column_statistics, + date_statistics, + decimal_statistics, + double_statistics, + integer_statistics, + no_statistics, + read_parsed_orc_statistics as cpp_read_parsed_orc_statistics, + statistics_type, + string_statistics, + timestamp_statistics, +) +from pylibcudf.variant cimport get_if, holds_alternative + + +cdef class OrcColumnStatistics: + @property + def number_of_values(self): + if self.number_of_values_c.has_value(): + return self.number_of_values_c.value() + return None + + @property + def has_null(self): + if self.has_null_c.has_value(): + return self.has_null_c.value() + return None + + cdef void _init_stats_dict(self): + # Initialize stats to return and parse stats blob + self.column_stats = {} + + cdef statistics_type type_specific_stats = self.type_specific_stats_c + + cdef integer_statistics* int_stats + cdef double_statistics* dbl_stats + cdef string_statistics* str_stats + cdef bucket_statistics* bucket_stats + cdef decimal_statistics* dec_stats + cdef date_statistics* date_stats + cdef binary_statistics* bin_stats + cdef timestamp_statistics* ts_stats + + if holds_alternative[no_statistics](type_specific_stats): + pass + elif int_stats := get_if[integer_statistics](&type_specific_stats): + if int_stats.minimum.has_value(): + self.column_stats["minimum"] = int_stats.minimum.value() + else: + self.column_stats["minimum"] = None + if int_stats.maximum.has_value(): + self.column_stats["maximum"] = int_stats.maximum.value() + else: + self.column_stats["maximum"] = None + if int_stats.sum.has_value(): + self.column_stats["sum"] = int_stats.sum.value() + else: + self.column_stats["sum"] = None + elif dbl_stats := get_if[double_statistics](&type_specific_stats): + if dbl_stats.minimum.has_value(): + self.column_stats["minimum"] = dbl_stats.minimum.value() + else: + self.column_stats["minimum"] = None + if dbl_stats.maximum.has_value(): + self.column_stats["maximum"] = dbl_stats.maximum.value() + else: + self.column_stats["maximum"] = None + if dbl_stats.sum.has_value(): + self.column_stats["sum"] = dbl_stats.sum.value() + else: + self.column_stats["sum"] = None + elif str_stats := get_if[string_statistics](&type_specific_stats): + if str_stats.minimum.has_value(): + self.column_stats["minimum"] = str_stats.minimum.value().decode("utf-8") + else: + self.column_stats["minimum"] = None + if str_stats.maximum.has_value(): + self.column_stats["maximum"] = str_stats.maximum.value().decode("utf-8") + else: + self.column_stats["maximum"] = None + if str_stats.sum.has_value(): + self.column_stats["sum"] = str_stats.sum.value() + else: + self.column_stats["sum"] = None + elif bucket_stats := get_if[bucket_statistics](&type_specific_stats): + self.column_stats["true_count"] = bucket_stats.count[0] + self.column_stats["false_count"] = ( + self.number_of_values + - self.column_stats["true_count"] + ) + elif dec_stats := get_if[decimal_statistics](&type_specific_stats): + if dec_stats.minimum.has_value(): + self.column_stats["minimum"] = dec_stats.minimum.value().decode("utf-8") + else: + self.column_stats["minimum"] = None + if dec_stats.maximum.has_value(): + self.column_stats["maximum"] = dec_stats.maximum.value().decode("utf-8") + else: + self.column_stats["maximum"] = None + if dec_stats.sum.has_value(): + self.column_stats["sum"] = dec_stats.sum.value().decode("utf-8") + else: + self.column_stats["sum"] = None + elif date_stats := get_if[date_statistics](&type_specific_stats): + if date_stats.minimum.has_value(): + self.column_stats["minimum"] = datetime.datetime.fromtimestamp( + datetime.timedelta(date_stats.minimum.value()).total_seconds(), + datetime.timezone.utc, + ) + else: + self.column_stats["minimum"] = None + if date_stats.maximum.has_value(): + self.column_stats["maximum"] = datetime.datetime.fromtimestamp( + datetime.timedelta(date_stats.maximum.value()).total_seconds(), + datetime.timezone.utc, + ) + else: + self.column_stats["maximum"] = None + elif bin_stats := get_if[binary_statistics](&type_specific_stats): + if bin_stats.sum.has_value(): + self.column_stats["sum"] = bin_stats.sum.value() + else: + self.column_stats["sum"] = None + elif ts_stats := get_if[timestamp_statistics](&type_specific_stats): + # Before ORC-135, the local timezone offset was included and they were + # stored as minimum and maximum. After ORC-135, the timestamp is + # adjusted to UTC before being converted to milliseconds and stored + # in minimumUtc and maximumUtc. + # TODO: Support minimum and maximum by reading writer's local timezone + if ts_stats.minimum_utc.has_value() and ts_stats.maximum_utc.has_value(): + self.column_stats["minimum"] = datetime.datetime.fromtimestamp( + ts_stats.minimum_utc.value() / 1000, datetime.timezone.utc + ) + self.column_stats["maximum"] = datetime.datetime.fromtimestamp( + ts_stats.maximum_utc.value() / 1000, datetime.timezone.utc + ) + else: + raise ValueError("Unsupported statistics type") + + def __getitem__(self, item): + return self.column_stats[item] + + def __contains__(self, item): + return item in self.column_stats + + def get(self, item, alt=None): + return self.column_stats.get(item, alt) + + @staticmethod + cdef OrcColumnStatistics from_libcudf(column_statistics& col_stats): + cdef OrcColumnStatistics out = OrcColumnStatistics.__new__(OrcColumnStatistics) + out.number_of_values_c = col_stats.number_of_values + out.has_null_c = col_stats.has_null + out.type_specific_stats_c = col_stats.type_specific_stats + out._init_stats_dict() + return out + + +cdef class ParsedOrcStatistics: + + @property + def column_names(self): + return [name.decode() for name in self.c_obj.column_names] + + @property + def file_stats(self): + stats_lst = [] + for i in range(self.c_obj.file_stats.size()): + stats_lst.append(OrcColumnStatistics.from_libcudf(self.c_obj.file_stats[i])) + return stats_lst + + @property + def stripes_stats(self): + stats_lst = [] + for stripe_stats_c in self.c_obj.stripes_stats: + stripe_stats = [] + for i in range(stripe_stats_c.size()): + stripe_stats.append(OrcColumnStatistics.from_libcudf(stripe_stats_c[i])) + stats_lst.append(stripe_stats) + return stats_lst + + @staticmethod + cdef ParsedOrcStatistics from_libcudf(parsed_orc_statistics& orc_stats): + cdef ParsedOrcStatistics out = ParsedOrcStatistics.__new__(ParsedOrcStatistics) + out.c_obj = move(orc_stats) + return out cpdef TableWithMetadata read_orc( @@ -60,3 +254,12 @@ cpdef TableWithMetadata read_orc( c_result = move(cpp_read_orc(opts)) return TableWithMetadata.from_libcudf(c_result) + + +cpdef ParsedOrcStatistics read_parsed_orc_statistics( + SourceInfo source_info +): + cdef parsed_orc_statistics parsed = ( + cpp_read_parsed_orc_statistics(source_info.c_obj) + ) + return ParsedOrcStatistics.from_libcudf(parsed) diff --git a/python/pylibcudf/pylibcudf/libcudf/io/orc_metadata.pxd b/python/pylibcudf/pylibcudf/libcudf/io/orc_metadata.pxd index db6cb0cdfa5..ab6d6f18ee1 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/orc_metadata.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/orc_metadata.pxd @@ -6,6 +6,8 @@ from libcpp cimport bool from libcpp.optional cimport optional from libcpp.string cimport string from libcpp.vector cimport vector + +from pylibcudf.libcudf.io cimport types as cudf_io_types from pylibcudf.variant cimport monostate, variant From 3467bcdd60acdb383f5ec880826785550749fab3 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Thu, 1 Aug 2024 20:27:35 +0000 Subject: [PATCH 3/9] add test --- python/pylibcudf/pylibcudf/io/orc.pxd | 2 +- python/pylibcudf/pylibcudf/io/orc.pyx | 30 +++++++++-- .../pylibcudf/pylibcudf/tests/common/utils.py | 28 +++++++--- .../pylibcudf/pylibcudf/tests/io/test_csv.py | 9 +++- .../pylibcudf/pylibcudf/tests/io/test_orc.py | 54 +++++++++++++++++++ 5 files changed, 110 insertions(+), 13 deletions(-) create mode 100644 python/pylibcudf/pylibcudf/tests/io/test_orc.py diff --git a/python/pylibcudf/pylibcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/io/orc.pxd index af4768f937b..6920999e418 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/io/orc.pxd @@ -20,7 +20,7 @@ cpdef TableWithMetadata read_orc( list columns = *, list stripes = *, size_type skip_rows = *, - size_type num_rows = *, + size_type nrows = *, bool use_index = *, bool use_np_dtypes = *, DataType timestamp_type = *, diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index a303bfeb019..0c035e39ebb 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -213,13 +213,35 @@ cpdef TableWithMetadata read_orc( list columns = None, list stripes = None, size_type skip_rows = 0, - size_type num_rows = -1, + size_type nrows = -1, bool use_index = True, bool use_np_dtypes = True, DataType timestamp_type = DataType(type_id.EMPTY), list decimal128_columns = None, ): - """ + """Reads an ORC file into a :py:class:`~.types.TableWithMetadata`. + + Parameters + ---------- + source_info : SourceInfo + The SourceInfo object to read the Parquet file from. + columns : list, default None + The string names of the columns to be read. + stripes : list[list[size_type]], default None + List of stripes to be read. + skip_rows : int64_t, default 0 + The number of rows to skip from the start of the file. + nrows : size_type, default -1 + The number of rows to read. By default, read the entire file. + use_index : bool, default True + Whether to use the row index to speed up reading. + use_np_dtypes : bool, default True + Whether to use numpy compatible dtypes. + + Returns + ------- + TableWithMetadata + The Table and its corresponding metadata (column names) that were read in. """ cdef orc_reader_options opts cdef vector[vector[size_type]] c_stripes @@ -228,8 +250,8 @@ cpdef TableWithMetadata read_orc( .use_index(use_index) .build() ) - if num_rows >= 0: - opts.set_num_rows(num_rows) + if nrows >= 0: + opts.set_num_rows(nrows) if skip_rows >= 0: opts.set_skip_rows(skip_rows) if stripes is not None: diff --git a/python/pylibcudf/pylibcudf/tests/common/utils.py b/python/pylibcudf/pylibcudf/tests/common/utils.py index babe6634318..caaa192cd3b 100644 --- a/python/pylibcudf/pylibcudf/tests/common/utils.py +++ b/python/pylibcudf/pylibcudf/tests/common/utils.py @@ -9,6 +9,7 @@ import pyarrow.compute as pc import pylibcudf as plc import pytest +from pyarrow.orc import write_table as orc_write_table from pyarrow.parquet import write_table as pq_write_table from pylibcudf.io.types import CompressionType @@ -242,13 +243,20 @@ def is_nested_list(typ): return nesting_level(typ)[0] > 1 -def _convert_numeric_types_to_floating(pa_table): +def _convert_types(pa_table, input_pred, result_type): """ Useful little helper for testing the dtypes option in I/O readers. Returns a tuple containing the pylibcudf dtypes and the new pyarrow schema + + Parameters + ---------- + input_pred : function + Predicate that evaluates to true for types to replace + result_type : pa.DataType + The type to cast to """ dtypes = [] new_fields = [] @@ -257,11 +265,9 @@ def _convert_numeric_types_to_floating(pa_table): child_types = [] plc_type = plc.interop.from_arrow(field.type) - if pa.types.is_integer(field.type) or pa.types.is_unsigned_integer( - field.type - ): - plc_type = plc.interop.from_arrow(pa.float64()) - field = field.with_type(pa.float64()) + if input_pred(field.type): + plc_type = plc.interop.from_arrow(result_type) + field = field.with_type(result_type) dtypes.append((field.name, plc_type, child_types)) @@ -332,6 +338,16 @@ def make_source(path_or_buf, pa_table, format, **kwargs): if isinstance(path_or_buf, io.IOBase) else path_or_buf, ) + elif format == "orc": + # The conversion to pandas is lossy (doesn't preserve + # nested types) so we + # will just use pyarrow directly to write this + orc_write_table( + pa_table, + pa.PythonFile(path_or_buf) + if isinstance(path_or_buf, io.IOBase) + else path_or_buf, + ) if isinstance(path_or_buf, io.IOBase): path_or_buf.seek(0) return path_or_buf diff --git a/python/pylibcudf/pylibcudf/tests/io/test_csv.py b/python/pylibcudf/pylibcudf/tests/io/test_csv.py index ccd7eef54f3..fc661128d8f 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_csv.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_csv.py @@ -9,7 +9,7 @@ import pytest from pylibcudf.io.types import CompressionType from utils import ( - _convert_numeric_types_to_floating, + _convert_types, assert_table_and_meta_eq, make_source, write_source_str, @@ -148,7 +148,12 @@ def test_read_csv_dtypes(csv_table_data, source_or_sink, usecols): if usecols is not None: pa_table = pa_table.select(usecols) - dtypes, new_fields = _convert_numeric_types_to_floating(pa_table) + dtypes, new_fields = _convert_types( + pa_table, + lambda typ: pa.types.is_unsigned_integer(typ) + or pa.types.is_integer(typ), + pa.float64(), + ) # Extract the dtype out of the (name, type, child_types) tuple # (read_csv doesn't support this format since it doesn't support nested columns) dtypes = {name: dtype for name, dtype, _ in dtypes} diff --git a/python/pylibcudf/pylibcudf/tests/io/test_orc.py b/python/pylibcudf/pylibcudf/tests/io/test_orc.py new file mode 100644 index 00000000000..d7d93fc313a --- /dev/null +++ b/python/pylibcudf/pylibcudf/tests/io/test_orc.py @@ -0,0 +1,54 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +import pyarrow as pa +import pytest +from utils import _convert_types, assert_table_and_meta_eq, make_source + +import cudf._lib.pylibcudf as plc + +# Shared kwargs to pass to make_source +_COMMON_ORC_SOURCE_KWARGS = {"format": "orc"} + + +@pytest.mark.parametrize("columns", [None, ["col_int64", "col_bool"]]) +def test_read_orc_basic( + table_data, binary_source_or_sink, nrows_skiprows, columns +): + _, pa_table = table_data + nrows, skiprows = nrows_skiprows + + # ORC reader doesn't support skip_rows for nested columns + if skiprows > 0: + colnames_to_drop = [] + for i in range(len(pa_table.schema)): + field = pa_table.schema.field(i) + + if pa.types.is_nested(field.type): + colnames_to_drop.append(field.name) + pa_table = pa_table.drop(colnames_to_drop) + # ORC doesn't support unsigned ints + # let's cast to int64 + _, new_fields = _convert_types( + pa_table, pa.types.is_unsigned_integer, pa.int64() + ) + pa_table = pa_table.cast(pa.schema(new_fields)) + + source = make_source( + binary_source_or_sink, pa_table, **_COMMON_ORC_SOURCE_KWARGS + ) + + res = plc.io.orc.read_orc( + plc.io.SourceInfo([source]), + nrows=nrows, + skip_rows=skiprows, + columns=columns, + ) + + if columns is not None: + pa_table = pa_table.select(columns) + + # Adapt to nrows/skiprows + pa_table = pa_table.slice( + offset=skiprows, length=nrows if nrows != -1 else None + ) + + assert_table_and_meta_eq(pa_table, res, check_field_nullability=False) From a93b23c85ad7d9346f4837432174ebf28f44d630 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 28 Aug 2024 23:54:37 +0000 Subject: [PATCH 4/9] Fix some issues with the rebase --- python/cudf/cudf/_lib/pylibcudf/io/types.pyx | 150 ------------------ .../cudf/_lib/pylibcudf/utils/__init__.pxd | 3 - python/cudf/cudf/_lib/utils.pyx | 4 +- python/pylibcudf/pylibcudf/io/datasource.pxd | 2 - python/pylibcudf/pylibcudf/io/datasource.pyx | 3 +- python/pylibcudf/pylibcudf/io/types.pyx | 1 + .../pylibcudf/libcudf/io/orc_metadata.pxd | 2 - 7 files changed, 4 insertions(+), 161 deletions(-) delete mode 100644 python/cudf/cudf/_lib/pylibcudf/io/types.pyx delete mode 100644 python/cudf/cudf/_lib/pylibcudf/utils/__init__.pxd diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx deleted file mode 100644 index a11dbdade08..00000000000 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -from libcpp.string cimport string -from libcpp.utility cimport move -from libcpp.vector cimport vector - -from cudf._lib.pylibcudf.io.datasource cimport Datasource -from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource -from cudf._lib.pylibcudf.libcudf.io.types cimport ( - host_buffer, - source_info, - table_with_metadata, -) - -import errno -import io -import os - - -cdef class TableWithMetadata: - """A container holding a table and its associated metadata - (e.g. column names) - - For details, see :cpp:class:`cudf::io::table_with_metadata`. - """ - - @property - def columns(self): - """ - Return a list containing the columns of the table - """ - return self.tbl.columns() - - @property - def column_names(self): - """ - Return a list containing the column names of the table - """ - cdef list names = [] - for col_info in self.metadata.schema_info: - names.append(col_info.name.decode()) - return names - - @property - def per_file_user_data(self): - """ - Returns a list containing a dict - containing file-format specific metadata, - for each file being read in. - """ - return self.metadata.per_file_user_data - - @property - def child_names(self): - """ - Return a dictionary mapping the names of columns with children - to the names of their child columns - """ - return TableWithMetadata._parse_col_names(self.metadata.schema_info) - - @staticmethod - cdef dict _parse_col_names(vector[column_name_info] infos): - cdef dict child_names = dict() - cdef dict names = dict() - for col_info in infos: - child_names = TableWithMetadata._parse_col_names(col_info.children) - names[col_info.name.decode()] = child_names - return names - - @staticmethod - cdef TableWithMetadata from_libcudf(table_with_metadata& tbl_with_meta): - """Create a Python TableWithMetadata from a libcudf table_with_metadata""" - cdef TableWithMetadata out = TableWithMetadata.__new__(TableWithMetadata) - out.tbl = Table.from_libcudf(move(tbl_with_meta.tbl)) - out.metadata = tbl_with_meta.metadata - return out - -cdef class SourceInfo: - """A class containing details on a source to read from. - - For details, see :cpp:class:`cudf::io::source_info`. - - Parameters - ---------- - sources : List[Union[str, os.PathLike, bytes, io.BytesIO]] - A homogeneous list of sources (this can be a string filename, - an os.PathLike, bytes, or an io.BytesIO) to read from. - - Mixing different types of sources will raise a `ValueError`. - """ - - def __init__(self, list sources): - if not sources: - raise ValueError("Need to pass at least one source") - - cdef vector[string] c_files - cdef vector[datasource*] c_datasources - - if isinstance(sources[0], (os.PathLike, str)): - c_files.reserve(len(sources)) - - for src in sources: - if not isinstance(src, (os.PathLike, str)): - raise ValueError("All sources must be of the same type!") - if not os.path.isfile(src): - raise FileNotFoundError(errno.ENOENT, - os.strerror(errno.ENOENT), - src) - - c_files.push_back( str(src).encode()) - - self.c_obj = move(source_info(c_files)) - return - elif isinstance(sources[0], Datasource): - for csrc in sources: - if not isinstance(csrc, Datasource): - raise ValueError("All sources must be of the same type!") - c_datasources.push_back((csrc).get_datasource()) - self.c_obj = move(source_info(c_datasources)) - return - - # TODO: host_buffer is deprecated API, use host_span instead - cdef vector[host_buffer] c_host_buffers - cdef const unsigned char[::1] c_buffer - cdef bint empty_buffer = False - if isinstance(sources[0], bytes): - empty_buffer = True - for buffer in sources: - if not isinstance(buffer, bytes): - raise ValueError("All sources must be of the same type!") - if (len(buffer) > 0): - c_buffer = buffer - c_host_buffers.push_back(host_buffer(&c_buffer[0], - c_buffer.shape[0])) - empty_buffer = False - elif isinstance(sources[0], io.BytesIO): - for bio in sources: - if not isinstance(bio, io.BytesIO): - raise ValueError("All sources must be of the same type!") - c_buffer = bio.getbuffer() # check if empty? - c_host_buffers.push_back(host_buffer(&c_buffer[0], - c_buffer.shape[0])) - else: - raise ValueError("Sources must be a list of str/paths, " - "bytes, or io.BytesIO") - - if empty_buffer is True: - c_host_buffers.push_back(host_buffer(NULL, 0)) - - self.c_obj = source_info(c_host_buffers) diff --git a/python/cudf/cudf/_lib/pylibcudf/utils/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/utils/__init__.pxd deleted file mode 100644 index 1205cd74690..00000000000 --- a/python/cudf/cudf/_lib/pylibcudf/utils/__init__.pxd +++ /dev/null @@ -1,3 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -from . cimport variant diff --git a/python/cudf/cudf/_lib/utils.pyx b/python/cudf/cudf/_lib/utils.pyx index 3dbe47da76d..fc22bf31881 100644 --- a/python/cudf/cudf/_lib/utils.pyx +++ b/python/cudf/cudf/_lib/utils.pyx @@ -322,10 +322,10 @@ cdef data_from_pylibcudf_io(tbl_with_meta, column_names=None, index_names=None): into a dict of columns and an Index (cuDF format) """ if column_names is None: - column_names = tbl_with_meta.column_names + column_names = tbl_with_meta.column_names(include_children=False) return _data_from_columns( columns=[Column.from_pylibcudf(plc) for plc in tbl_with_meta.columns], - column_names=tbl_with_meta.column_names(include_children=False), + column_names=column_names, index_names=index_names ) diff --git a/python/pylibcudf/pylibcudf/io/datasource.pxd b/python/pylibcudf/pylibcudf/io/datasource.pxd index 97e1bba8ef2..c08f36693c7 100644 --- a/python/pylibcudf/pylibcudf/io/datasource.pxd +++ b/python/pylibcudf/pylibcudf/io/datasource.pxd @@ -1,7 +1,5 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from libcpp.memory cimport shared_ptr - from pylibcudf.libcudf.io.datasource cimport datasource diff --git a/python/pylibcudf/pylibcudf/io/datasource.pyx b/python/pylibcudf/pylibcudf/io/datasource.pyx index 0db85ad2353..02418444caa 100644 --- a/python/pylibcudf/pylibcudf/io/datasource.pyx +++ b/python/pylibcudf/pylibcudf/io/datasource.pyx @@ -1,9 +1,8 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from libcpp.memory cimport shared_ptr - from pylibcudf.libcudf.io.datasource cimport datasource + cdef class Datasource: cdef datasource* get_datasource(self) except * nogil: with gil: diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 1600a805b37..563a02761da 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -130,6 +130,7 @@ cdef class TableWithMetadata: """ return self.metadata.per_file_user_data + cdef class SourceInfo: """A class containing details on a source to read from. diff --git a/python/pylibcudf/pylibcudf/libcudf/io/orc_metadata.pxd b/python/pylibcudf/pylibcudf/libcudf/io/orc_metadata.pxd index ab6d6f18ee1..9302ffe2f80 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/orc_metadata.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/orc_metadata.pxd @@ -1,12 +1,10 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -cimport pylibcudf.libcudf.io.types as cudf_io_types from libc.stdint cimport int32_t, int64_t, uint32_t, uint64_t from libcpp cimport bool from libcpp.optional cimport optional from libcpp.string cimport string from libcpp.vector cimport vector - from pylibcudf.libcudf.io cimport types as cudf_io_types from pylibcudf.variant cimport monostate, variant From 79ca12b65522ceda4d2851564721c31fd256c5de Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 28 Aug 2024 23:58:08 +0000 Subject: [PATCH 5/9] Some minor cleanup --- python/cudf/cudf/tests/test_orc.py | 2 -- python/cudf/cudf/utils/ioutils.py | 1 - 2 files changed, 3 deletions(-) diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 6aa1e4001fa..f6ab8e5479a 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -184,8 +184,6 @@ def test_orc_read_statistics(datadir): pytest.skip(".orc file is not found: %s" % e) # Check numberOfValues - # assert_eq(file_statistics[0]["int1"]["number_of_values"], 11_000) - print(file_statistics[0]) assert_eq(file_statistics[0]["int1"].number_of_values, 11_000) assert_eq( file_statistics[0]["int1"].number_of_values, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 645871c1c21..933d545a121 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1873,7 +1873,6 @@ def _apply_filter_bool_eq(val, col_stats): def _apply_filter_not_eq(val, col_stats): - print(col_stats) return ("minimum" in col_stats and val < col_stats["minimum"]) or ( "maximum" in col_stats and val > col_stats["maximum"] ) From 0927adc7e3fa2cee28887c41f0e0791011e1a320 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 29 Aug 2024 00:08:44 +0000 Subject: [PATCH 6/9] Fix style --- python/cudf/cudf/_lib/orc.pyx | 32 +++++---------------------- python/pylibcudf/pylibcudf/io/orc.pxd | 5 ++--- python/pylibcudf/pylibcudf/io/orc.pyx | 10 +++------ 3 files changed, 10 insertions(+), 37 deletions(-) diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index 69631e09e46..6988d55b096 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -15,8 +15,8 @@ try: except ImportError: import json -cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view cimport pylibcudf.libcudf.io.types as cudf_io_types +cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.orc cimport ( chunked_orc_writer_options, @@ -24,21 +24,6 @@ from pylibcudf.libcudf.io.orc cimport ( orc_writer_options, write_orc as libcudf_write_orc, ) -from pylibcudf.libcudf.io.orc_metadata cimport ( - binary_statistics, - bucket_statistics, - column_statistics, - date_statistics, - decimal_statistics, - double_statistics, - integer_statistics, - no_statistics, - parsed_orc_statistics, - read_parsed_orc_statistics as libcudf_read_parsed_orc_statistics, - statistics_type, - string_statistics, - timestamp_statistics, -) from pylibcudf.libcudf.io.types cimport ( column_in_metadata, compression_type, @@ -46,24 +31,17 @@ from pylibcudf.libcudf.io.types cimport ( table_input_metadata, ) from pylibcudf.libcudf.table.table_view cimport table_view -from pylibcudf.libcudf.types cimport data_type, size_type, type_id -from pylibcudf.variant cimport get_if as std_get_if, holds_alternative from cudf._lib.column cimport Column -from cudf._lib.io.utils cimport ( - make_sink_info, - make_source_info, - update_col_struct_field_names, -) +from cudf._lib.io.utils cimport make_sink_info, update_col_struct_field_names from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table +import pylibcudf as plc + import cudf -from cudf.core.buffer import acquire_spill_lock from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES from cudf._lib.utils import _index_level_name, generate_pandas_metadata - -import pylibcudf as plc - +from cudf.core.buffer import acquire_spill_lock cpdef read_parsed_orc_statistics(filepath_or_buffer): diff --git a/python/pylibcudf/pylibcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/io/orc.pxd index 6920999e418..b111d617b1b 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/io/orc.pxd @@ -4,15 +4,14 @@ from libcpp cimport bool from libcpp.optional cimport optional from libcpp.string cimport string from libcpp.vector cimport vector - from pylibcudf.io.types cimport SourceInfo, TableWithMetadata -from pylibcudf.libcudf.types cimport size_type -from pylibcudf.types cimport DataType from pylibcudf.libcudf.io.orc_metadata cimport ( column_statistics, parsed_orc_statistics, statistics_type, ) +from pylibcudf.libcudf.types cimport size_type +from pylibcudf.types cimport DataType cpdef TableWithMetadata read_orc( diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index 0c035e39ebb..101529fb4c9 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -11,13 +11,6 @@ from pylibcudf.libcudf.io.orc cimport ( orc_reader_options, read_orc as cpp_read_orc, ) -from pylibcudf.libcudf.io.types cimport table_with_metadata -from pylibcudf.libcudf.types cimport size_type, type_id -from pylibcudf.types cimport DataType -from pylibcudf.libcudf.io.orc cimport ( - orc_reader_options, - read_orc as cpp_read_orc, -) from pylibcudf.libcudf.io.orc_metadata cimport ( binary_statistics, bucket_statistics, @@ -32,6 +25,9 @@ from pylibcudf.libcudf.io.orc_metadata cimport ( string_statistics, timestamp_statistics, ) +from pylibcudf.libcudf.io.types cimport table_with_metadata +from pylibcudf.libcudf.types cimport size_type, type_id +from pylibcudf.types cimport DataType from pylibcudf.variant cimport get_if, holds_alternative From 4328460b9f9eeaf88417d616f76a006c6e6e5ca8 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 29 Aug 2024 16:39:24 +0000 Subject: [PATCH 7/9] Fix import in pylibcudf tests --- python/pylibcudf/pylibcudf/tests/io/test_orc.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_orc.py b/python/pylibcudf/pylibcudf/tests/io/test_orc.py index d7d93fc313a..42b14b1feff 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_orc.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_orc.py @@ -1,10 +1,9 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import pyarrow as pa +import pylibcudf as plc import pytest from utils import _convert_types, assert_table_and_meta_eq, make_source -import cudf._lib.pylibcudf as plc - # Shared kwargs to pass to make_source _COMMON_ORC_SOURCE_KWARGS = {"format": "orc"} From a371f936dafbd5a216e05250a4b79eef98c8bbd3 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 3 Sep 2024 22:17:46 +0000 Subject: [PATCH 8/9] Address reviews --- python/cudf/cudf/_lib/orc.pyx | 25 +++------ python/pylibcudf/pylibcudf/io/orc.pyx | 55 +++++++++++++------ python/pylibcudf/pylibcudf/libcudf/io/orc.pxd | 1 + .../pylibcudf/pylibcudf/tests/common/utils.py | 9 +-- .../pylibcudf/pylibcudf/tests/io/test_csv.py | 3 +- 5 files changed, 51 insertions(+), 42 deletions(-) diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index 6988d55b096..f88c48ce989 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -44,6 +44,7 @@ from cudf._lib.utils import _index_level_name, generate_pandas_metadata from cudf.core.buffer import acquire_spill_lock +# TODO: Consider inlining this function since it seems to only be used in one place. cpdef read_parsed_orc_statistics(filepath_or_buffer): """ Cython function to call into libcudf API, see `read_parsed_orc_statistics`. @@ -59,21 +60,7 @@ cpdef read_parsed_orc_statistics(filepath_or_buffer): ) ) - file_stats = parsed.file_stats - stripes_stats = parsed.stripes_stats - - parsed_file_stats = [ - file_stat - for file_stat in file_stats - ] - - parsed_stripes_stats = [ - [col_stat - for col_stat in stripes_stat] - for stripes_stat in stripes_stats - ] - - return parsed.column_names, parsed_file_stats, parsed_stripes_stats + return parsed.column_names, parsed.file_stats, parsed.stripes_stats cpdef read_orc(object filepaths_or_buffers, @@ -89,6 +76,11 @@ cpdef read_orc(object filepaths_or_buffers, See Also -------- cudf.read_orc + + Notes + ----- + Currently this function only considers the metadata of the first file in the list of + filepaths_or_buffers. """ if columns is not None: @@ -167,9 +159,6 @@ cdef tuple _get_index_from_metadata( object skip_rows, object num_rows): - # TODO: consider metadata from more than the first file? - # Note: This code used to use the deprecated user_data member on - # table_metadata (which only considers the first file) meta = None index_col = None is_range_index = False diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index 101529fb4c9..fbecd986c5e 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -32,6 +32,13 @@ from pylibcudf.variant cimport get_if, holds_alternative cdef class OrcColumnStatistics: + def __init__(self): + raise TypeError( + "OrcColumnStatistics should not be instantiated by users. If it is " + "being constructed in Cython from a preexisting libcudf object, " + "use `OrcColumnStatistics.from_libcudf` instead." + ) + @property def number_of_values(self): if self.number_of_values_c.has_value(): @@ -161,8 +168,8 @@ cdef class OrcColumnStatistics: def __contains__(self, item): return item in self.column_stats - def get(self, item, alt=None): - return self.column_stats.get(item, alt) + def get(self, item, default=None): + return self.column_stats.get(item, default) @staticmethod cdef OrcColumnStatistics from_libcudf(column_statistics& col_stats): @@ -182,20 +189,20 @@ cdef class ParsedOrcStatistics: @property def file_stats(self): - stats_lst = [] - for i in range(self.c_obj.file_stats.size()): - stats_lst.append(OrcColumnStatistics.from_libcudf(self.c_obj.file_stats[i])) - return stats_lst + return [ + OrcColumnStatistics.from_libcudf(self.c_obj.file_stats[i]) + for i in range(self.c_obj.file_stats.size()) + ] @property def stripes_stats(self): - stats_lst = [] - for stripe_stats_c in self.c_obj.stripes_stats: - stripe_stats = [] - for i in range(stripe_stats_c.size()): - stripe_stats.append(OrcColumnStatistics.from_libcudf(stripe_stats_c[i])) - stats_lst.append(stripe_stats) - return stats_lst + return [ + [ + OrcColumnStatistics.from_libcudf(stripe_stats_c[i]) + for i in range(stripe_stats_c.size()) + ] + for stripe_stats_c in self.c_obj.stripes_stats + ] @staticmethod cdef ParsedOrcStatistics from_libcudf(parsed_orc_statistics& orc_stats): @@ -233,6 +240,10 @@ cpdef TableWithMetadata read_orc( Whether to use the row index to speed up reading. use_np_dtypes : bool, default True Whether to use numpy compatible dtypes. + timestamp_type : DataType, default None + The timestamp type to use for the timestamp columns. + decimal128_columns : list, default None + List of column names to be read as 128-bit decimals. Returns ------- @@ -253,18 +264,26 @@ cpdef TableWithMetadata read_orc( if stripes is not None: c_stripes = stripes opts.set_stripes(c_stripes) - if timestamp_type.id() != type_id.EMPTY: + if timestamp_type.id() is not None: opts.set_timestamp_type(timestamp_type.c_obj) + cdef vector[string] c_decimal128_columns + if decimal128_columns is not None and len(decimal128_columns) > 0: + c_decimal128_columns.reserve(len(decimal128_columns)) + for col in decimal128_columns: + if not isinstance(col, str): + raise TypeError("Decimal 128 column names must be strings!") + c_decimal128_columns.push_back(col.encode()) + opts.set_decimal128_columns(c_decimal128_columns) + cdef vector[string] c_column_names - if columns is not None: + if columns is not None and len(columns) > 0: c_column_names.reserve(len(columns)) for col in columns: if not isinstance(col, str): raise TypeError("Column names must be strings!") - c_column_names.push_back(str(col).encode()) - if len(columns) > 0: - opts.set_columns(c_column_names) + c_column_names.push_back(col.encode()) + opts.set_columns(c_column_names) cdef table_with_metadata c_result diff --git a/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd index e4a09b8feb2..dca24c7f665 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd @@ -35,6 +35,7 @@ cdef extern from "cudf/io/orc.hpp" \ void enable_use_index(bool val) except + void enable_use_np_dtypes(bool val) except + void set_timestamp_type(data_type type) except + + void set_decimal128_columns(vector[string] val) except + @staticmethod orc_reader_options_builder builder( diff --git a/python/pylibcudf/pylibcudf/tests/common/utils.py b/python/pylibcudf/pylibcudf/tests/common/utils.py index caaa192cd3b..9f389fa42c4 100644 --- a/python/pylibcudf/pylibcudf/tests/common/utils.py +++ b/python/pylibcudf/pylibcudf/tests/common/utils.py @@ -245,14 +245,15 @@ def is_nested_list(typ): def _convert_types(pa_table, input_pred, result_type): """ - Useful little helper for testing the - dtypes option in I/O readers. + Useful little helper for testing the dtypes option in I/O readers. - Returns a tuple containing the pylibcudf dtypes - and the new pyarrow schema + Returns a tuple containing the pylibcudf dtypes and the new pyarrow schema based on + the data in the table. Parameters ---------- + pa_table : pyarrow.Table + The table from which to extract the dtypes input_pred : function Predicate that evaluates to true for types to replace result_type : pa.DataType diff --git a/python/pylibcudf/pylibcudf/tests/io/test_csv.py b/python/pylibcudf/pylibcudf/tests/io/test_csv.py index fc661128d8f..ab26f23418d 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_csv.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_csv.py @@ -150,8 +150,7 @@ def test_read_csv_dtypes(csv_table_data, source_or_sink, usecols): dtypes, new_fields = _convert_types( pa_table, - lambda typ: pa.types.is_unsigned_integer(typ) - or pa.types.is_integer(typ), + lambda t: (pa.types.is_unsigned_integer(t) or pa.types.is_integer(t)), pa.float64(), ) # Extract the dtype out of the (name, type, child_types) tuple From d917492d8f3bb88dd099565d8b2c9df868e85e47 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 21 Sep 2024 01:25:57 +0000 Subject: [PATCH 9/9] Fix timestamp type --- python/pylibcudf/pylibcudf/io/orc.pyx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index fbecd986c5e..01a5e4b04a1 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -26,7 +26,7 @@ from pylibcudf.libcudf.io.orc_metadata cimport ( timestamp_statistics, ) from pylibcudf.libcudf.io.types cimport table_with_metadata -from pylibcudf.libcudf.types cimport size_type, type_id +from pylibcudf.libcudf.types cimport size_type from pylibcudf.types cimport DataType from pylibcudf.variant cimport get_if, holds_alternative @@ -219,7 +219,7 @@ cpdef TableWithMetadata read_orc( size_type nrows = -1, bool use_index = True, bool use_np_dtypes = True, - DataType timestamp_type = DataType(type_id.EMPTY), + DataType timestamp_type = None, list decimal128_columns = None, ): """Reads an ORC file into a :py:class:`~.types.TableWithMetadata`. @@ -264,7 +264,7 @@ cpdef TableWithMetadata read_orc( if stripes is not None: c_stripes = stripes opts.set_stripes(c_stripes) - if timestamp_type.id() is not None: + if timestamp_type is not None: opts.set_timestamp_type(timestamp_type.c_obj) cdef vector[string] c_decimal128_columns