Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate JSON reader to pylibcudf #15966

Merged
merged 15 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/io/utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ cdef source_info make_source_info(list src) except*
cdef sink_info make_sinks_info(
list src, vector[unique_ptr[data_sink]] & data) except*
cdef sink_info make_sink_info(src, unique_ptr[data_sink] & data) except*
cdef add_df_col_struct_names(
df,
child_names_dict
)
cdef update_struct_field_names(
table,
vector[column_name_info]& schema_info)
Expand Down
27 changes: 27 additions & 0 deletions python/cudf/cudf/_lib/io/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,37 @@ cdef cppclass iobase_data_sink(data_sink):
return buf.tell()


cdef add_df_col_struct_names(df, child_names_dict):
for name, child_names in child_names_dict.items():
col = df._data[name]

df._data[name] = update_col_struct_field_names(col, child_names)


cdef update_col_struct_field_names(Column col, child_names):
if col.children:
children = list(col.children)
for i, (child, names) in enumerate(zip(children, child_names.values())):
children[i] = update_col_struct_field_names(
child,
names
)
col.set_base_children(tuple(children))

if isinstance(col.dtype, StructDtype):
col = col._rename_fields(
child_names.keys()
)

return col


cdef update_struct_field_names(
table,
vector[column_name_info]& schema_info
):
# Deprecated, remove in favor of add_col_struct_names
# when a reader is ported to pylibcudf
for i, (name, col) in enumerate(table._data.items()):
table._data[name] = update_column_struct_field_names(
col, schema_info[i]
Expand Down
127 changes: 53 additions & 74 deletions python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,16 @@ import cudf
from cudf.core.buffer import acquire_spill_lock

from libcpp cimport bool
from libcpp.map cimport map
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types
from cudf._lib.io.utils cimport make_source_info, update_struct_field_names
from cudf._lib.pylibcudf.libcudf.io.json cimport (
json_reader_options,
json_recovery_mode_t,
read_json as libcudf_read_json,
schema_element,
)
from cudf._lib.pylibcudf.libcudf.io.types cimport (
compression_type,
table_with_metadata,
)
from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type
from cudf._lib.io.utils cimport add_df_col_struct_names
from cudf._lib.pylibcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t
from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.types cimport data_type, type_id
from cudf._lib.pylibcudf.types cimport DataType
from cudf._lib.types cimport dtype_to_data_type
from cudf._lib.utils cimport data_from_unique_ptr
from cudf._lib.utils cimport data_from_pylibcudf_io

import cudf._lib.pylibcudf as plc

Expand Down Expand Up @@ -62,6 +52,7 @@ cpdef read_json(object filepaths_or_buffers,
# If input data is a JSON string (or StringIO), hold a reference to
# the encoded memoryview externally to ensure the encoded buffer
# isn't destroyed before calling libcudf `read_json()`

for idx in range(len(filepaths_or_buffers)):
if isinstance(filepaths_or_buffers[idx], io.StringIO):
filepaths_or_buffers[idx] = \
Expand All @@ -71,17 +62,7 @@ cpdef read_json(object filepaths_or_buffers,
filepaths_or_buffers[idx] = filepaths_or_buffers[idx].encode()

# Setup arguments
cdef vector[data_type] c_dtypes_list
cdef map[string, schema_element] c_dtypes_schema_map
cdef cudf_io_types.compression_type c_compression
# Determine byte read offsets if applicable
cdef size_type c_range_offset = (
byte_range[0] if byte_range is not None else 0
)
cdef size_type c_range_size = (
byte_range[1] if byte_range is not None else 0
)
cdef bool c_lines = lines

if compression is not None:
if compression == 'gzip':
Expand All @@ -94,56 +75,50 @@ cpdef read_json(object filepaths_or_buffers,
c_compression = cudf_io_types.compression_type.AUTO
else:
c_compression = cudf_io_types.compression_type.NONE
is_list_like_dtypes = False

processed_dtypes = None

if dtype is False:
raise ValueError("False value is unsupported for `dtype`")
elif dtype is not True:
processed_dtypes = []
if isinstance(dtype, abc.Mapping):
for k, v in dtype.items():
c_dtypes_schema_map[str(k).encode()] = \
_get_cudf_schema_element_from_dtype(v)
# Make sure keys are string
k = str(k)
lib_type, child_types = _get_cudf_schema_element_from_dtype(v)
processed_dtypes.append((k, lib_type, child_types))
elif isinstance(dtype, abc.Collection):
is_list_like_dtypes = True
c_dtypes_list.reserve(len(dtype))
for col_dtype in dtype:
c_dtypes_list.push_back(
_get_cudf_data_type_from_dtype(
col_dtype))
processed_dtypes.append(
# Ignore child columns since we cannot specify their dtypes
# when passing a list
_get_cudf_schema_element_from_dtype(col_dtype)[0]
)
else:
raise TypeError("`dtype` must be 'list like' or 'dict'")

cdef json_reader_options opts = move(
json_reader_options.builder(make_source_info(filepaths_or_buffers))
.compression(c_compression)
.lines(c_lines)
.byte_range_offset(c_range_offset)
.byte_range_size(c_range_size)
.recovery_mode(_get_json_recovery_mode(on_bad_lines))
.build()
table_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset = byte_range[0] if byte_range is not None else 0,
byte_range_size = byte_range[1] if byte_range is not None else 0,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
)
if is_list_like_dtypes:
opts.set_dtypes(c_dtypes_list)
else:
opts.set_dtypes(c_dtypes_schema_map)

opts.enable_keep_quotes(keep_quotes)
opts.enable_mixed_types_as_string(mixed_types_as_string)
opts.enable_prune_columns(prune_columns)

# Read JSON
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(libcudf_read_json(opts))

meta_names = [info.name.decode() for info in c_result.metadata.schema_info]
df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=meta_names
))

update_struct_field_names(df, c_result.metadata.schema_info)
df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(
table_w_meta
)
)

# Post-processing to add in struct column names
add_df_col_struct_names(df, table_w_meta.child_names)
return df


Expand Down Expand Up @@ -192,28 +167,32 @@ def write_json(
)


cdef schema_element _get_cudf_schema_element_from_dtype(object dtype) except *:
cdef schema_element s_element
cdef data_type lib_type
cdef _get_cudf_schema_element_from_dtype(object dtype) except *:
dtype = cudf.dtype(dtype)
if isinstance(dtype, cudf.CategoricalDtype):
raise NotImplementedError(
"CategoricalDtype as dtype is not yet "
"supported in JSON reader"
)
lib_type = dtype_to_data_type(dtype)
s_element.type = lib_type

lib_type = DataType.from_libcudf(dtype_to_data_type(dtype))
child_types = []

if isinstance(dtype, cudf.StructDtype):
for name, child_type in dtype.fields.items():
s_element.child_types[name.encode()] = \
child_lib_type, grandchild_types = \
_get_cudf_schema_element_from_dtype(child_type)
child_types.append((name, child_lib_type, grandchild_types))
elif isinstance(dtype, cudf.ListDtype):
s_element.child_types["offsets".encode()] = \
_get_cudf_schema_element_from_dtype(cudf.dtype("int32"))
s_element.child_types["element".encode()] = \
child_lib_type, grandchild_types = \
_get_cudf_schema_element_from_dtype(dtype.element_type)

return s_element
child_types = [
("offsets", DataType.from_libcudf(data_type(type_id.INT32)), []),
("element", child_lib_type, grandchild_types)
]

return lib_type, child_types


cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *:
Expand Down
23 changes: 21 additions & 2 deletions python/cudf/cudf/_lib/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp cimport bool

from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata
from cudf._lib.pylibcudf.io.types cimport (
SinkInfo,
SourceInfo,
TableWithMetadata,
compression_type,
)
from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t
from cudf._lib.pylibcudf.libcudf.types cimport size_type


cpdef TableWithMetadata read_json(
SourceInfo source_info,
list dtypes = *,
compression_type compression = *,
bool lines = *,
size_type byte_range_offset = *,
size_type byte_range_size = *,
bool keep_quotes = *,
bool mixed_types_as_string = *,
bool prune_columns = *,
json_recovery_mode_t recovery_mode = *,
)


cpdef void write_json(
SinkInfo sink_info,
TableWithMetadata tbl,
Expand Down
Loading
Loading