-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-39984: [Python] Add ChunkedArray import/export to/from C #39985
Conversation
|
python/pyarrow/table.pxi
Outdated
if requested_schema is not None: | ||
out_schema = Schema._import_from_c_capsule(requested_schema) | ||
if self.schema != out_schema: | ||
table = self.cast(out_schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not strictly necessary, but it would be nicer (both for memory consumption and for latency) to cast each batch when required, rather than all the table up front.
You could simply use RecordBatchReader.from_batches
with a generator that casts each batch in turn. Something like:
batches = table.to_batches()
if requested_schema is not None:
out_schema = Schema._import_from_c_capsule(requested_schema)
if self.schema != out_schema:
batches = (batch.cast(out_schema) for batch in batches)
return RecordBatchReader.from_batches(batches)
(or you can fold the functionality directly in PyRecordBatchReader
)
python/pyarrow/table.pxi
Outdated
@@ -4932,7 +4994,13 @@ cdef class Table(_Tabular): | |||
------- | |||
PyCapsule | |||
""" | |||
return self.to_reader().__arrow_c_stream__(requested_schema) | |||
cdef Table table = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably not required.
python/pyarrow/table.pxi
Outdated
if requested_schema is not None: | ||
out_type = DataType._import_from_c_capsule(requested_schema) | ||
if self.type != out_type: | ||
chunked = self.cast(out_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same remark as in Table.__arrow_c_stream__
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this as much of a concern as with a Table
? I can't think of any clean way to do this lazily on a per-chunk basis, although I'm happy to remove the feature if it's that bad of an idea.
A quick check suggests casting up front is faster...perhaps because it obliterates the chunks:
import pyarrow as pa
import numpy as np
n = int(1e6)
n_chunks = 1000
per_chunk = n // n_chunks
chunks = [np.random.random(per_chunk) for i in range(n_chunks)]
chunked = pa.chunked_array(chunks)
def roundtrip_chunked():
stream_capsule = chunked.__arrow_c_stream__()
chunked._import_from_c_capsule(stream_capsule)
%timeit roundtrip_chunked()
#> 3.72 ms ± 15.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
def roundtrip_chunked_cast():
stream_capsule = chunked.cast(pa.float()).__arrow_c_stream__()
chunked._import_from_c_capsule(stream_capsule)
%timeit roundtrip_chunked_cast()
#> 1.52 ms ± 1.79 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I gave a possible solution above.
Also, your benchmark is bizarre: where is the casting in roundtrip_chunked
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nowhere! A roundtrip without casting is almost certainly faster than a roundtrip with casting, and it is already twice as slow as the cast + roundtrip. I'm sure there's room to make a better benchmark (I'm also running a C++ debug build), but I'm personally convinced that the cast + export solution is not so bad that it should not be attempted.
Well, I gave a possible solution above.
For RecordBatches? I don't think we have a way to do that for a stream of Array
in Arrow C++ or in pyarrow?
I'm happy to remove the feature as well and leave to be implemented properly later...I didn't anticipate it being controversial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A roundtrip without casting is almost certainly faster than a roundtrip with casting, and it is already twice as slow as the cast + roundtrip.
If it doesn't make sense then there's something wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to remove the feature as well and leave to be implemented properly later...I didn't anticipate it being controversial.
It's not controversial. The implementation is. I'm sure for simple benchmarks with a small dataset, an otherwise idle machine, and enough RAM to hold multiple copies of the dataset, casting everything at once can seem slightly faster because it saves some overhead. That doesn't make it a viable strategy in the general case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it! I dropped the cast and we can circle back and do it right. I opened a PR to do it properly for the batch-wise RecordBatchReader export and that might serve as a template for how this should work, too.
python/pyarrow/tests/test_cffi.py
Outdated
batch = make_batch() | ||
requested_schema = pa.schema([('ints', pa.list_(pa.int64()))]) | ||
requested_capsule = requested_schema.__arrow_c_schema__() | ||
# RecordBatch has no cast() method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fixed instead of working around it.
@paleolimbot I think it would be nice to add a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not strictly necessary, but it would be nicer (both for memory consumption and for latency) to cast each batch when required, rather than all the table up front.
I removed table casting for now...I'm still game to try in #40066 , maybe with a CastingRecordBatchReader : public RecordBatchReader
that would solve the issue for streams, too.
I think it would be nice to add a test_nanoarrow.py to exercise nanoarrow integration.
I opened #40065 ...probably best done when nanoarrow's Python is a little more stable (i.e., when there is a nanoarrow.Array
and nanoarrow.ArrayStream
, neither of which currently exist).
python/pyarrow/table.pxi
Outdated
if requested_schema is not None: | ||
out_type = DataType._import_from_c_capsule(requested_schema) | ||
if self.type != out_type: | ||
chunked = self.cast(out_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this as much of a concern as with a Table
? I can't think of any clean way to do this lazily on a per-chunk basis, although I'm happy to remove the feature if it's that bad of an idea.
A quick check suggests casting up front is faster...perhaps because it obliterates the chunks:
import pyarrow as pa
import numpy as np
n = int(1e6)
n_chunks = 1000
per_chunk = n // n_chunks
chunks = [np.random.random(per_chunk) for i in range(n_chunks)]
chunked = pa.chunked_array(chunks)
def roundtrip_chunked():
stream_capsule = chunked.__arrow_c_stream__()
chunked._import_from_c_capsule(stream_capsule)
%timeit roundtrip_chunked()
#> 3.72 ms ± 15.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
def roundtrip_chunked_cast():
stream_capsule = chunked.cast(pa.float()).__arrow_c_stream__()
chunked._import_from_c_capsule(stream_capsule)
%timeit roundtrip_chunked_cast()
#> 1.52 ms ± 1.79 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @paleolimbot !
After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 91bf1c9. There was 1 benchmark result with an error:
There were no benchmark performance regressions. 🎉 The full Conbench report has more details. |
…ache#39985) ### Rationale for this change ChunkedArrays have an unambiguous representation as a stream of arrays. apache#39455 added the ability to import/export in C++...this PR wires up the new functions in pyarrow. ### What changes are included in this PR? - Added `__arrow_c_stream__()` and `_import_from_c_capsule()` to the `ChunkedArray` ### Are these changes tested? Yes! Tests were added. ### Are there any user-facing changes? Yes! But I'm not sure where the protocol methods are documented. ```python import pyarrow as pa import nanoarrow as na chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])]) [na.c_array_view(item) for item in na.c_array_stream(chunked)] ``` [<nanoarrow.c_lib.CArrayView> - storage_type: 'int64' - length: 3 - offset: 0 - null_count: 0 - buffers[2]: - <bool validity[0 b] > - <int64 data[24 b] 0 1 2> - dictionary: NULL - children[0]:, <nanoarrow.c_lib.CArrayView> - storage_type: 'int64' - length: 3 - offset: 0 - null_count: 0 - buffers[2]: - <bool validity[0 b] > - <int64 data[24 b] 3 4 5> - dictionary: NULL - children[0]:] ```python stream_capsule = chunked.__arrow_c_stream__() chunked2 = chunked._import_from_c_capsule(stream_capsule) chunked2 ``` <pyarrow.lib.ChunkedArray object at 0x105bb70b0> [ [ 0, 1, 2 ], [ 3, 4, 5 ] ] * Closes: apache#39984 Lead-authored-by: Dewey Dunnington <[email protected]> Co-authored-by: Dewey Dunnington <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
…ache#39985) ### Rationale for this change ChunkedArrays have an unambiguous representation as a stream of arrays. apache#39455 added the ability to import/export in C++...this PR wires up the new functions in pyarrow. ### What changes are included in this PR? - Added `__arrow_c_stream__()` and `_import_from_c_capsule()` to the `ChunkedArray` ### Are these changes tested? Yes! Tests were added. ### Are there any user-facing changes? Yes! But I'm not sure where the protocol methods are documented. ```python import pyarrow as pa import nanoarrow as na chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])]) [na.c_array_view(item) for item in na.c_array_stream(chunked)] ``` [<nanoarrow.c_lib.CArrayView> - storage_type: 'int64' - length: 3 - offset: 0 - null_count: 0 - buffers[2]: - <bool validity[0 b] > - <int64 data[24 b] 0 1 2> - dictionary: NULL - children[0]:, <nanoarrow.c_lib.CArrayView> - storage_type: 'int64' - length: 3 - offset: 0 - null_count: 0 - buffers[2]: - <bool validity[0 b] > - <int64 data[24 b] 3 4 5> - dictionary: NULL - children[0]:] ```python stream_capsule = chunked.__arrow_c_stream__() chunked2 = chunked._import_from_c_capsule(stream_capsule) chunked2 ``` <pyarrow.lib.ChunkedArray object at 0x105bb70b0> [ [ 0, 1, 2 ], [ 3, 4, 5 ] ] * Closes: apache#39984 Lead-authored-by: Dewey Dunnington <[email protected]> Co-authored-by: Dewey Dunnington <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
…ache#39985) ### Rationale for this change ChunkedArrays have an unambiguous representation as a stream of arrays. apache#39455 added the ability to import/export in C++...this PR wires up the new functions in pyarrow. ### What changes are included in this PR? - Added `__arrow_c_stream__()` and `_import_from_c_capsule()` to the `ChunkedArray` ### Are these changes tested? Yes! Tests were added. ### Are there any user-facing changes? Yes! But I'm not sure where the protocol methods are documented. ```python import pyarrow as pa import nanoarrow as na chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])]) [na.c_array_view(item) for item in na.c_array_stream(chunked)] ``` [<nanoarrow.c_lib.CArrayView> - storage_type: 'int64' - length: 3 - offset: 0 - null_count: 0 - buffers[2]: - <bool validity[0 b] > - <int64 data[24 b] 0 1 2> - dictionary: NULL - children[0]:, <nanoarrow.c_lib.CArrayView> - storage_type: 'int64' - length: 3 - offset: 0 - null_count: 0 - buffers[2]: - <bool validity[0 b] > - <int64 data[24 b] 3 4 5> - dictionary: NULL - children[0]:] ```python stream_capsule = chunked.__arrow_c_stream__() chunked2 = chunked._import_from_c_capsule(stream_capsule) chunked2 ``` <pyarrow.lib.ChunkedArray object at 0x105bb70b0> [ [ 0, 1, 2 ], [ 3, 4, 5 ] ] * Closes: apache#39984 Lead-authored-by: Dewey Dunnington <[email protected]> Co-authored-by: Dewey Dunnington <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
Rationale for this change
ChunkedArrays have an unambiguous representation as a stream of arrays. #39455 added the ability to import/export in C++...this PR wires up the new functions in pyarrow.
What changes are included in this PR?
__arrow_c_stream__()
and_import_from_c_capsule()
to theChunkedArray
Are these changes tested?
Yes! Tests were added.
Are there any user-facing changes?
Yes! But I'm not sure where the protocol methods are documented.