Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39984: [Python] Add ChunkedArray import/export to/from C #39985

Merged
merged 11 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2840,6 +2840,9 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CResult[shared_ptr[CRecordBatchReader]] ImportRecordBatchReader(
ArrowArrayStream*)

CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*)
CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*)


cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
CResult[int64_t] ReferencedBufferSize(const CArray& array_data)
Expand Down
61 changes: 61 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,67 @@ cdef class ChunkedArray(_PandasConvertible):
result += self.chunk(i).to_pylist()
return result

def __arrow_c_stream__(self, requested_schema=None):
"""
Export to a C ArrowArrayStream PyCapsule.

Parameters
----------
requested_schema : PyCapsule, default None
The schema to which the stream should be casted, passed as a
PyCapsule containing a C ArrowSchema representation of the
requested schema.

Returns
-------
PyCapsule
A capsule containing a C ArrowArrayStream struct.
"""
cdef:
ArrowArrayStream* c_stream = NULL

if requested_schema is not None:
out_type = DataType._import_from_c_capsule(requested_schema)
if self.type != out_type:
raise NotImplementedError("Casting to requested_schema")

stream_capsule = alloc_c_stream(&c_stream)

with nogil:
check_status(ExportChunkedArray(self.sp_chunked_array, c_stream))

return stream_capsule

@staticmethod
def _import_from_c_capsule(stream):
"""
Import ChunkedArray from a C ArrowArrayStream PyCapsule.

Parameters
----------
stream: PyCapsule
A capsule containing a C ArrowArrayStream PyCapsule.

Returns
-------
ChunkedArray
"""
cdef:
ArrowArrayStream* c_stream
shared_ptr[CChunkedArray] c_chunked_array
ChunkedArray self

c_stream = <ArrowArrayStream*>PyCapsule_GetPointer(
stream, 'arrow_array_stream'
)

with nogil:
c_chunked_array = GetResultValue(ImportChunkedArray(c_stream))

self = ChunkedArray.__new__(ChunkedArray)
self.init(c_chunked_array)
return self


def chunked_array(arrays, type=None):
"""
Expand Down
26 changes: 26 additions & 0 deletions python/pyarrow/tests/test_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,29 @@ def test_roundtrip_batch_reader_capsule():
assert imported_reader.read_next_batch().equals(batch)
with pytest.raises(StopIteration):
imported_reader.read_next_batch()


def test_roundtrip_chunked_array_capsule():
chunked = pa.chunked_array([pa.array(["a", "b", "c"])])

capsule = chunked.__arrow_c_stream__()
assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1
imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule)
assert imported_chunked.type == chunked.type
assert imported_chunked == chunked


def test_roundtrip_chunked_array_capsule_requested_schema():
chunked = pa.chunked_array([pa.array(["a", "b", "c"])])

# Requesting the same type should work
requested_capsule = chunked.type.__arrow_c_schema__()
capsule = chunked.__arrow_c_stream__(requested_capsule)
imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule)
assert imported_chunked == chunked

# Casting to something else should error
requested_type = pa.binary()
requested_capsule = requested_type.__arrow_c_schema__()
with pytest.raises(NotImplementedError):
chunked.__arrow_c_stream__(requested_capsule)
Loading