From f2aa624dd36edd5d24a7971ce5f369fa0f1846b6 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Feb 2024 14:04:58 -0400 Subject: [PATCH 01/11] draft --- python/pyarrow/includes/libarrow.pxd | 3 ++ python/pyarrow/table.pxi | 64 ++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index d92f09da779b6..814d573c3740c 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2840,6 +2840,9 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CResult[shared_ptr[CRecordBatchReader]] ImportRecordBatchReader( ArrowArrayStream*) + CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*) + CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*) + cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index abda784fb7c18..b4257961c5b8d 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1327,6 +1327,70 @@ cdef class ChunkedArray(_PandasConvertible): result += self.chunk(i).to_pylist() return result + def __arrow_c_stream__(self, requested_schema=None): + """ + Export to a C ArrowArrayStream PyCapsule. + + Parameters + ---------- + requested_schema : PyCapsule, default None + The schema to which the stream should be casted, passed as a + PyCapsule containing a C ArrowSchema representation of the + requested schema. + + Returns + ------- + PyCapsule + A capsule containing a C ArrowArrayStream struct. + """ + cdef: + ArrowArrayStream* c_stream + ChunkedArray chunked + + if requested_schema is not None: + out_schema = DataType._import_from_c_capsule(requested_schema) + if self.schema != out_schema: + chunked = self.cast(type) + else: + chunked = self + + stream_capsule = alloc_c_stream(&c_stream) + + with nogil: + check_status(ExportChunkedArray(chunked.sp_chunked_array, c_stream)) + + return stream_capsule + + @staticmethod + def _import_from_c_capsule(stream): + """ + Import ChunkedArray from a C ArrowArrayStream PyCapsule. + + Parameters + ---------- + stream: PyCapsule + A capsule containing a C ArrowArrayStream PyCapsule. + + Returns + ------- + ChunkedArray + """ + cdef: + ArrowArrayStream* c_stream + shared_ptr[CChunkedArray] c_chunked_array + ChunkedArray self + + c_stream = PyCapsule_GetPointer( + stream, 'arrow_array_stream' + ) + + with nogil: + c_chunked_array = GetResultValue(ImportChunkedArray(c_stream)) + + self = ChunkedArray.__new__(ChunkedArray) + self.init(c_chunked_array) + return self + def chunked_array(arrays, type=None): """ From 75892d5e5ba64deba776435d31fa112007251f27 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 8 Feb 2024 16:02:10 -0400 Subject: [PATCH 02/11] fix export --- python/pyarrow/table.pxi | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index b4257961c5b8d..0f4fab089a5f0 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1353,6 +1353,8 @@ cdef class ChunkedArray(_PandasConvertible): chunked = self.cast(type) else: chunked = self + else: + chunked = self stream_capsule = alloc_c_stream(&c_stream) From 1d5e5e75f353cb695ae6b4017beb65920f23c05b Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 8 Feb 2024 16:35:48 -0400 Subject: [PATCH 03/11] fix casting --- demo.ipynb | 103 +++++++++++++++++++++++++++++++++++++++ demo.md | 58 ++++++++++++++++++++++ python/pyarrow/table.pxi | 22 +++++---- 3 files changed, 173 insertions(+), 10 deletions(-) create mode 100644 demo.ipynb create mode 100644 demo.md diff --git a/demo.ipynb b/demo.ipynb new file mode 100644 index 0000000000000..cd2976628a8ae --- /dev/null +++ b/demo.ipynb @@ -0,0 +1,103 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[\n", + " - storage_type: 'struct'\n", + " - length: 6\n", + " - offset: 0\n", + " - null_count: 0\n", + " - buffers[1]:\n", + " - \n", + " - dictionary: NULL\n", + " - children[1]:\n", + " - \n", + " - storage_type: 'int32'\n", + " - length: 6\n", + " - offset: 0\n", + " - null_count: 0\n", + " - buffers[2]:\n", + " - \n", + " - \n", + " - dictionary: NULL\n", + " - children[0]:]" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pyarrow as pa\n", + "import nanoarrow as na\n", + "chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])])\n", + "table = pa.table([chunked], names=[\"col\"])\n", + "\n", + "sch = pa.schema({\"col\": pa.int32()})\n", + "[na.c_array_view(item) for item in na.c_array_stream(table, sch)]" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\n", + "[\n", + " [\n", + " 0,\n", + " 1,\n", + " 2\n", + " ],\n", + " [\n", + " 3,\n", + " 4,\n", + " 5\n", + " ]\n", + "]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "stream_capsule = chunked.__arrow_c_stream__()\n", + "chunked2 = chunked._import_from_c_capsule(stream_capsule)\n", + "chunked2" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.1" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/demo.md b/demo.md new file mode 100644 index 0000000000000..0453ada367393 --- /dev/null +++ b/demo.md @@ -0,0 +1,58 @@ +```python +import pyarrow as pa +import nanoarrow as na +chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])]) +[na.c_array_view(item) for item in na.c_array_stream(chunked)] +``` + + + + + [ + - storage_type: 'int64' + - length: 3 + - offset: 0 + - null_count: 0 + - buffers[2]: + - + - + - dictionary: NULL + - children[0]:, + + - storage_type: 'int64' + - length: 3 + - offset: 0 + - null_count: 0 + - buffers[2]: + - + - + - dictionary: NULL + - children[0]:] + + + + +```python +stream_capsule = chunked.__arrow_c_stream__() +chunked2 = chunked._import_from_c_capsule(stream_capsule) +chunked2 +``` + + + + + + [ + [ + 0, + 1, + 2 + ], + [ + 3, + 4, + 5 + ] + ] + + diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 0f4fab089a5f0..9226c9358b200 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1344,17 +1344,13 @@ cdef class ChunkedArray(_PandasConvertible): A capsule containing a C ArrowArrayStream struct. """ cdef: - ArrowArrayStream* c_stream - ChunkedArray chunked + ArrowArrayStream* c_stream = NULL + ChunkedArray chunked = self if requested_schema is not None: - out_schema = DataType._import_from_c_capsule(requested_schema) - if self.schema != out_schema: - chunked = self.cast(type) - else: - chunked = self - else: - chunked = self + out_type = DataType._import_from_c_capsule(requested_schema) + if self.type != out_type: + chunked = self.cast(out_type) stream_capsule = alloc_c_stream(&c_stream) @@ -4998,7 +4994,13 @@ cdef class Table(_Tabular): ------- PyCapsule """ - return self.to_reader().__arrow_c_stream__(requested_schema) + cdef Table table = self + if requested_schema is not None: + out_schema = Schema._import_from_c_capsule(requested_schema) + if self.schema != out_schema: + table = self.cast(out_schema) + + return table.to_reader().__arrow_c_stream__() def _reconstruct_table(arrays, schema): From e56d3d0e3df674d68662c755ffc97b7ccbacdc49 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 8 Feb 2024 16:54:35 -0400 Subject: [PATCH 04/11] add tests --- python/pyarrow/tests/test_cffi.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index ff81b06440f03..e830670553f5a 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -601,3 +601,25 @@ def test_roundtrip_batch_reader_capsule(): assert imported_reader.read_next_batch().equals(batch) with pytest.raises(StopIteration): imported_reader.read_next_batch() + + +def test_roundtrip_chunked_array_capsule(): + chunked = pa.chunked_array([pa.array(['a', 'b', 'c'])]) + + capsule = chunked.__arrow_c_stream__() + assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 + imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) + assert imported_chunked.type == chunked.type + assert imported_chunked == chunked + + +def test_roundtrip_chunked_array_capsule_requested_schema(): + chunked = pa.chunked_array([pa.array(['a', 'b', 'c'])]) + requested_type = pa.binary() + + requested_capsule = requested_type.__arrow_c_schema__() + capsule = chunked.__arrow_c_stream__(requested_capsule) + assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 + imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) + assert imported_chunked.type == requested_type + assert imported_chunked == chunked.cast(requested_type) From 72606109a8695e5613dcd6f287fc702d8674a450 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 8 Feb 2024 17:04:33 -0400 Subject: [PATCH 05/11] test batch/table requested schema --- python/pyarrow/tests/test_cffi.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index e830670553f5a..cf70c88a0558b 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -603,6 +603,22 @@ def test_roundtrip_batch_reader_capsule(): imported_reader.read_next_batch() +def test_roundtrip_batch_reader_capsule_requested_schema(): + batch = make_batch() + requested_schema = pa.schema([('ints', pa.list_(pa.int64()))]) + requested_capsule = requested_schema.__arrow_c_schema__() + # RecordBatch has no cast() method + batch_as_requested = list(pa.Table.from_batches([batch]).cast(requested_schema).to_batches())[0] + + capsule = batch.__arrow_c_stream__(requested_capsule) + assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 + imported_reader = pa.RecordBatchReader._import_from_c_capsule(capsule) + assert imported_reader.schema == requested_schema + assert imported_reader.read_next_batch().equals(batch_as_requested) + with pytest.raises(StopIteration): + imported_reader.read_next_batch() + + def test_roundtrip_chunked_array_capsule(): chunked = pa.chunked_array([pa.array(['a', 'b', 'c'])]) @@ -616,8 +632,8 @@ def test_roundtrip_chunked_array_capsule(): def test_roundtrip_chunked_array_capsule_requested_schema(): chunked = pa.chunked_array([pa.array(['a', 'b', 'c'])]) requested_type = pa.binary() - requested_capsule = requested_type.__arrow_c_schema__() + capsule = chunked.__arrow_c_stream__(requested_capsule) assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) From 15b0f683a70db214aa5d2c232bf45b6b1bdcc96d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 8 Feb 2024 17:09:04 -0400 Subject: [PATCH 06/11] fix long line --- python/pyarrow/tests/test_cffi.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index cf70c88a0558b..151803ecffa61 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -608,7 +608,9 @@ def test_roundtrip_batch_reader_capsule_requested_schema(): requested_schema = pa.schema([('ints', pa.list_(pa.int64()))]) requested_capsule = requested_schema.__arrow_c_schema__() # RecordBatch has no cast() method - batch_as_requested = list(pa.Table.from_batches([batch]).cast(requested_schema).to_batches())[0] + batch_as_requested = list( + pa.Table.from_batches([batch]).cast(requested_schema).to_batches() + )[0] capsule = batch.__arrow_c_stream__(requested_capsule) assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 From fc6bbb7457ad1802e820499aa1e8701d2a53ada0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 8 Feb 2024 21:43:04 -0400 Subject: [PATCH 07/11] remove accidental addition --- demo.ipynb | 103 ----------------------------------------------------- demo.md | 58 ------------------------------ 2 files changed, 161 deletions(-) delete mode 100644 demo.ipynb delete mode 100644 demo.md diff --git a/demo.ipynb b/demo.ipynb deleted file mode 100644 index cd2976628a8ae..0000000000000 --- a/demo.ipynb +++ /dev/null @@ -1,103 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "[\n", - " - storage_type: 'struct'\n", - " - length: 6\n", - " - offset: 0\n", - " - null_count: 0\n", - " - buffers[1]:\n", - " - \n", - " - dictionary: NULL\n", - " - children[1]:\n", - " - \n", - " - storage_type: 'int32'\n", - " - length: 6\n", - " - offset: 0\n", - " - null_count: 0\n", - " - buffers[2]:\n", - " - \n", - " - \n", - " - dictionary: NULL\n", - " - children[0]:]" - ] - }, - "execution_count": 2, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "import pyarrow as pa\n", - "import nanoarrow as na\n", - "chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])])\n", - "table = pa.table([chunked], names=[\"col\"])\n", - "\n", - "sch = pa.schema({\"col\": pa.int32()})\n", - "[na.c_array_view(item) for item in na.c_array_stream(table, sch)]" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "\n", - "[\n", - " [\n", - " 0,\n", - " 1,\n", - " 2\n", - " ],\n", - " [\n", - " 3,\n", - " 4,\n", - " 5\n", - " ]\n", - "]" - ] - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "stream_capsule = chunked.__arrow_c_stream__()\n", - "chunked2 = chunked._import_from_c_capsule(stream_capsule)\n", - "chunked2" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": ".venv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.1" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/demo.md b/demo.md deleted file mode 100644 index 0453ada367393..0000000000000 --- a/demo.md +++ /dev/null @@ -1,58 +0,0 @@ -```python -import pyarrow as pa -import nanoarrow as na -chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])]) -[na.c_array_view(item) for item in na.c_array_stream(chunked)] -``` - - - - - [ - - storage_type: 'int64' - - length: 3 - - offset: 0 - - null_count: 0 - - buffers[2]: - - - - - - dictionary: NULL - - children[0]:, - - - storage_type: 'int64' - - length: 3 - - offset: 0 - - null_count: 0 - - buffers[2]: - - - - - - dictionary: NULL - - children[0]:] - - - - -```python -stream_capsule = chunked.__arrow_c_stream__() -chunked2 = chunked._import_from_c_capsule(stream_capsule) -chunked2 -``` - - - - - - [ - [ - 0, - 1, - 2 - ], - [ - 3, - 4, - 5 - ] - ] - - From a8c9be63a737050adc1a6231884e2f6b1575dc8f Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 13 Feb 2024 13:30:53 -0400 Subject: [PATCH 08/11] remove casting for tables --- python/pyarrow/table.pxi | 8 +------- python/pyarrow/tests/test_cffi.py | 18 ------------------ 2 files changed, 1 insertion(+), 25 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 9226c9358b200..ad046c58134e4 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -4994,13 +4994,7 @@ cdef class Table(_Tabular): ------- PyCapsule """ - cdef Table table = self - if requested_schema is not None: - out_schema = Schema._import_from_c_capsule(requested_schema) - if self.schema != out_schema: - table = self.cast(out_schema) - - return table.to_reader().__arrow_c_stream__() + return self.to_reader().__arrow_c_stream__(requested_schema) def _reconstruct_table(arrays, schema): diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 151803ecffa61..3d07877e36254 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -603,24 +603,6 @@ def test_roundtrip_batch_reader_capsule(): imported_reader.read_next_batch() -def test_roundtrip_batch_reader_capsule_requested_schema(): - batch = make_batch() - requested_schema = pa.schema([('ints', pa.list_(pa.int64()))]) - requested_capsule = requested_schema.__arrow_c_schema__() - # RecordBatch has no cast() method - batch_as_requested = list( - pa.Table.from_batches([batch]).cast(requested_schema).to_batches() - )[0] - - capsule = batch.__arrow_c_stream__(requested_capsule) - assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 - imported_reader = pa.RecordBatchReader._import_from_c_capsule(capsule) - assert imported_reader.schema == requested_schema - assert imported_reader.read_next_batch().equals(batch_as_requested) - with pytest.raises(StopIteration): - imported_reader.read_next_batch() - - def test_roundtrip_chunked_array_capsule(): chunked = pa.chunked_array([pa.array(['a', 'b', 'c'])]) From e9966b42d0e60c92653339af2760bfd06b4dfb46 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 13 Feb 2024 17:14:15 -0400 Subject: [PATCH 09/11] remove cast option --- python/pyarrow/table.pxi | 5 +- python/pyarrow/tests/test_cffi.py | 152 ++++++++++++++++-------------- 2 files changed, 85 insertions(+), 72 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index ad046c58134e4..ee3872aa3a242 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1345,17 +1345,16 @@ cdef class ChunkedArray(_PandasConvertible): """ cdef: ArrowArrayStream* c_stream = NULL - ChunkedArray chunked = self if requested_schema is not None: out_type = DataType._import_from_c_capsule(requested_schema) if self.type != out_type: - chunked = self.cast(out_type) + raise NotImplementedError("Casting to requested_schema") stream_capsule = alloc_c_stream(&c_stream) with nogil: - check_status(ExportChunkedArray(chunked.sp_chunked_array, c_stream)) + check_status(ExportChunkedArray(self.sp_chunked_array, c_stream)) return stream_capsule diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 3d07877e36254..93a4e12248273 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -21,6 +21,7 @@ import gc import pyarrow as pa + try: from pyarrow.cffi import ffi except ImportError: @@ -35,17 +36,19 @@ pd = tm = None -needs_cffi = pytest.mark.skipif(ffi is None, - reason="test needs cffi package installed") +needs_cffi = pytest.mark.skipif(ffi is None, reason="test needs cffi package installed") assert_schema_released = pytest.raises( - ValueError, match="Cannot import released ArrowSchema") + ValueError, match="Cannot import released ArrowSchema" +) assert_array_released = pytest.raises( - ValueError, match="Cannot import released ArrowArray") + ValueError, match="Cannot import released ArrowArray" +) assert_stream_released = pytest.raises( - ValueError, match="Cannot import released ArrowArrayStream") + ValueError, match="Cannot import released ArrowArrayStream" +) def PyCapsule_IsValid(capsule, name): @@ -65,8 +68,7 @@ class ParamExtType(pa.ExtensionType): def __init__(self, width): self._width = width - super().__init__(pa.binary(width), - "pyarrow.tests.test_cffi.ParamExtType") + super().__init__(pa.binary(width), "pyarrow.tests.test_cffi.ParamExtType") @property def width(self): @@ -82,19 +84,18 @@ def __arrow_ext_deserialize__(cls, storage_type, serialized): def make_schema(): - return pa.schema([('ints', pa.list_(pa.int32()))], - metadata={b'key1': b'value1'}) + return pa.schema([("ints", pa.list_(pa.int32()))], metadata={b"key1": b"value1"}) def make_extension_schema(): - return pa.schema([('ext', ParamExtType(3))], - metadata={b'key1': b'value1'}) + return pa.schema([("ext", ParamExtType(3))], metadata={b"key1": b"value1"}) def make_extension_storage_schema(): # Should be kept in sync with make_extension_schema - return pa.schema([('ext', ParamExtType(3).storage_type)], - metadata={b'key1': b'value1'}) + return pa.schema( + [("ext", ParamExtType(3).storage_type)], metadata={b"key1": b"value1"} + ) def make_batch(): @@ -103,8 +104,7 @@ def make_batch(): def make_extension_batch(): schema = make_extension_schema() - ext_col = schema[0].type.wrap_array(pa.array([b"foo", b"bar"], - type=pa.binary(3))) + ext_col = schema[0].type.wrap_array(pa.array([b"foo", b"bar"], type=pa.binary(3))) return pa.record_batch([ext_col], schema) @@ -149,8 +149,7 @@ def test_export_import_type(): pa.int32()._export_to_c(ptr_schema) bad_format = ffi.new("char[]", b"zzz") c_schema.format = bad_format - with pytest.raises(ValueError, - match="Invalid or unsupported format string"): + with pytest.raises(ValueError, match="Invalid or unsupported format string"): pa.DataType._import_from_c(ptr_schema) # Now released with assert_schema_released: @@ -250,8 +249,7 @@ def check_export_import_schema(schema_factory, expected_schema_factory=None): # Not a struct type pa.int32()._export_to_c(ptr_schema) - with pytest.raises(ValueError, - match="ArrowSchema describes non-struct type"): + with pytest.raises(ValueError, match="ArrowSchema describes non-struct type"): pa.Schema._import_from_c(ptr_schema) # Now released with assert_schema_released: @@ -266,8 +264,7 @@ def test_export_import_schema(): @needs_cffi def test_export_import_schema_with_extension(): # Extension type is unregistered => the storage type is imported - check_export_import_schema(make_extension_schema, - make_extension_storage_schema) + check_export_import_schema(make_extension_schema, make_extension_storage_schema) # Extension type is registered => the extension type is imported with registered_extension_type(ParamExtType(1)): @@ -335,8 +332,7 @@ def check_export_import_batch(batch_factory): # Not a struct type pa.int32()._export_to_c(ptr_schema) batch_factory()._export_to_c(ptr_array) - with pytest.raises(ValueError, - match="ArrowSchema describes non-struct type"): + with pytest.raises(ValueError, match="ArrowSchema describes non-struct type"): pa.RecordBatch._import_from_c(ptr_array, ptr_schema) # Now released with assert_schema_released: @@ -395,9 +391,9 @@ def make_py_record_batch_reader(schema, batches): @needs_cffi -@pytest.mark.parametrize('reader_factory', - [make_ipc_stream_reader, - make_py_record_batch_reader]) +@pytest.mark.parametrize( + "reader_factory", [make_ipc_stream_reader, make_py_record_batch_reader] +) def test_export_import_batch_reader(reader_factory): c_stream = ffi.new("struct ArrowArrayStream*") ptr_stream = int(ffi.cast("uintptr_t", c_stream)) @@ -426,9 +422,9 @@ def test_export_import_exception_reader(): def gen(): if True: try: - raise ValueError('foo') + raise ValueError("foo") except ValueError as e: - raise NotImplementedError('bar') from e + raise NotImplementedError("bar") from e else: yield from make_batches() @@ -440,10 +436,10 @@ def gen(): reader.read_next_batch() # inner *and* outer exception should be present - assert 'ValueError: foo' in str(exc_info.value) - assert 'NotImplementedError: bar' in str(exc_info.value) + assert "ValueError: foo" in str(exc_info.value) + assert "NotImplementedError: bar" in str(exc_info.value) # Stacktrace containing line of the raise statement - assert 'raise ValueError(\'foo\')' in str(exc_info.value) + assert "raise ValueError('foo')" in str(exc_info.value) assert pa.total_allocated_bytes() == old_allocated @@ -453,9 +449,11 @@ def test_imported_batch_reader_error(): c_stream = ffi.new("struct ArrowArrayStream*") ptr_stream = int(ffi.cast("uintptr_t", c_stream)) - schema = pa.schema([('foo', pa.int32())]) - batches = [pa.record_batch([[1, 2, 3]], schema=schema), - pa.record_batch([[4, 5, 6]], schema=schema)] + schema = pa.schema([("foo", pa.int32())]) + batches = [ + pa.record_batch([[1, 2, 3]], schema=schema), + pa.record_batch([[4, 5, 6]], schema=schema), + ] buf = make_serialized(schema, batches) # Open a corrupt/incomplete stream and export it @@ -466,9 +464,9 @@ def test_imported_batch_reader_error(): reader_new = pa.RecordBatchReader._import_from_c(ptr_stream) batch = reader_new.read_next_batch() assert batch == batches[0] - with pytest.raises(OSError, - match="Expected to be able to read 16 bytes " - "for message body, got 8"): + with pytest.raises( + OSError, match="Expected to be able to read 16 bytes " "for message body, got 8" + ): reader_new.read_next_batch() # Again, but call read_all() @@ -477,15 +475,17 @@ def test_imported_batch_reader_error(): del reader reader_new = pa.RecordBatchReader._import_from_c(ptr_stream) - with pytest.raises(OSError, - match="Expected to be able to read 16 bytes " - "for message body, got 8"): + with pytest.raises( + OSError, match="Expected to be able to read 16 bytes " "for message body, got 8" + ): reader_new.read_all() -@pytest.mark.parametrize('obj', [pa.int32(), pa.field('foo', pa.int32()), - pa.schema({'foo': pa.int32()})], - ids=['type', 'field', 'schema']) +@pytest.mark.parametrize( + "obj", + [pa.int32(), pa.field("foo", pa.int32()), pa.schema({"foo": pa.int32()})], + ids=["type", "field", "schema"], +) def test_roundtrip_schema_capsule(obj): gc.collect() # Make sure no Arrow data dangles in a ref cycle old_allocated = pa.total_allocated_bytes() @@ -505,15 +505,19 @@ def test_roundtrip_schema_capsule(obj): assert pa.total_allocated_bytes() == old_allocated -@pytest.mark.parametrize('arr,schema_accessor,bad_type,good_type', [ - (pa.array(['a', 'b', 'c']), lambda x: x.type, pa.int32(), pa.string()), - ( - pa.record_batch([pa.array(['a', 'b', 'c'])], names=['x']), - lambda x: x.schema, - pa.schema({'x': pa.int32()}), - pa.schema({'x': pa.string()}) - ), -], ids=['array', 'record_batch']) +@pytest.mark.parametrize( + "arr,schema_accessor,bad_type,good_type", + [ + (pa.array(["a", "b", "c"]), lambda x: x.type, pa.int32(), pa.string()), + ( + pa.record_batch([pa.array(["a", "b", "c"])], names=["x"]), + lambda x: x.schema, + pa.schema({"x": pa.int32()}), + pa.schema({"x": pa.string()}), + ), + ], + ids=["array", "record_batch"], +) def test_roundtrip_array_capsule(arr, schema_accessor, bad_type, good_type): gc.collect() # Make sure no Arrow data dangles in a ref cycle old_allocated = pa.total_allocated_bytes() @@ -537,22 +541,28 @@ def test_roundtrip_array_capsule(arr, schema_accessor, bad_type, good_type): del capsule assert pa.total_allocated_bytes() == old_allocated - with pytest.raises(ValueError, - match=r"Could not cast.* string to requested .* int32"): + with pytest.raises( + ValueError, match=r"Could not cast.* string to requested .* int32" + ): arr.__arrow_c_array__(bad_type.__arrow_c_schema__()) schema_capsule, array_capsule = arr.__arrow_c_array__( - good_type.__arrow_c_schema__()) + good_type.__arrow_c_schema__() + ) arr_out = import_array(schema_capsule, array_capsule) assert schema_accessor(arr_out) == good_type # TODO: implement requested_schema for stream -@pytest.mark.parametrize('constructor', [ - pa.RecordBatchReader.from_batches, - # Use a lambda because we need to re-order the parameters - lambda schema, batches: pa.Table.from_batches(batches, schema), -], ids=['recordbatchreader', 'table']) +@pytest.mark.parametrize( + "constructor", + [ + pa.RecordBatchReader.from_batches, + # Use a lambda because we need to re-order the parameters + lambda schema, batches: pa.Table.from_batches(batches, schema), + ], + ids=["recordbatchreader", "table"], +) def test_roundtrip_reader_capsule(constructor): batches = make_batches() schema = batches[0].schema @@ -578,12 +588,12 @@ def test_roundtrip_reader_capsule(constructor): obj = constructor(schema, batches) # TODO: turn this to ValueError once we implement validation. - bad_schema = pa.schema({'ints': pa.int32()}) + bad_schema = pa.schema({"ints": pa.int32()}) with pytest.raises(NotImplementedError): obj.__arrow_c_stream__(bad_schema.__arrow_c_schema__()) # Can work with matching schema - matching_schema = pa.schema({'ints': pa.list_(pa.int32())}) + matching_schema = pa.schema({"ints": pa.list_(pa.int32())}) capsule = obj.__arrow_c_stream__(matching_schema.__arrow_c_schema__()) imported_reader = pa.RecordBatchReader._import_from_c_capsule(capsule) assert imported_reader.schema == matching_schema @@ -604,7 +614,7 @@ def test_roundtrip_batch_reader_capsule(): def test_roundtrip_chunked_array_capsule(): - chunked = pa.chunked_array([pa.array(['a', 'b', 'c'])]) + chunked = pa.chunked_array([pa.array(["a", "b", "c"])]) capsule = chunked.__arrow_c_stream__() assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 @@ -614,12 +624,16 @@ def test_roundtrip_chunked_array_capsule(): def test_roundtrip_chunked_array_capsule_requested_schema(): - chunked = pa.chunked_array([pa.array(['a', 'b', 'c'])]) - requested_type = pa.binary() - requested_capsule = requested_type.__arrow_c_schema__() + chunked = pa.chunked_array([pa.array(["a", "b", "c"])]) + # Requesting the same type should work + requested_capsule = chunked.type.__arrow_c_schema__() capsule = chunked.__arrow_c_stream__(requested_capsule) - assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) - assert imported_chunked.type == requested_type - assert imported_chunked == chunked.cast(requested_type) + assert imported_chunked == chunked + + # Casting to something else should error + requested_type = pa.binary() + requested_capsule = requested_type.__arrow_c_schema__() + with pytest.raises(NotImplementedError): + chunked.__arrow_c_stream__(requested_capsule) From 5e243669292cd0350f07934046947bbaff66984a Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 13 Feb 2024 20:58:56 -0400 Subject: [PATCH 10/11] fix accidental reformat --- python/pyarrow/tests/test_cffi.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 93a4e12248273..d991c3c1aac3a 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -637,3 +637,29 @@ def test_roundtrip_chunked_array_capsule_requested_schema(): requested_capsule = requested_type.__arrow_c_schema__() with pytest.raises(NotImplementedError): chunked.__arrow_c_stream__(requested_capsule) + + +def test_roundtrip_chunked_array_capsule(): + chunked = pa.chunked_array([pa.array(["a", "b", "c"])]) + + capsule = chunked.__arrow_c_stream__() + assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 + imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) + assert imported_chunked.type == chunked.type + assert imported_chunked == chunked + + +def test_roundtrip_chunked_array_capsule_requested_schema(): + chunked = pa.chunked_array([pa.array(["a", "b", "c"])]) + + # Requesting the same type should work + requested_capsule = chunked.type.__arrow_c_schema__() + capsule = chunked.__arrow_c_stream__(requested_capsule) + imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) + assert imported_chunked == chunked + + # Casting to something else should error + requested_type = pa.binary() + requested_capsule = requested_type.__arrow_c_schema__() + with pytest.raises(NotImplementedError): + chunked.__arrow_c_stream__(requested_capsule) From 6ac69abae084ea1c007e40df0474ac5ae3562ffb Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 13 Feb 2024 20:59:53 -0400 Subject: [PATCH 11/11] maybe actually fix reformat --- python/pyarrow/tests/test_cffi.py | 160 ++++++++++++------------------ 1 file changed, 62 insertions(+), 98 deletions(-) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index d991c3c1aac3a..3a0c7b5b7152f 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -21,7 +21,6 @@ import gc import pyarrow as pa - try: from pyarrow.cffi import ffi except ImportError: @@ -36,19 +35,17 @@ pd = tm = None -needs_cffi = pytest.mark.skipif(ffi is None, reason="test needs cffi package installed") +needs_cffi = pytest.mark.skipif(ffi is None, + reason="test needs cffi package installed") assert_schema_released = pytest.raises( - ValueError, match="Cannot import released ArrowSchema" -) + ValueError, match="Cannot import released ArrowSchema") assert_array_released = pytest.raises( - ValueError, match="Cannot import released ArrowArray" -) + ValueError, match="Cannot import released ArrowArray") assert_stream_released = pytest.raises( - ValueError, match="Cannot import released ArrowArrayStream" -) + ValueError, match="Cannot import released ArrowArrayStream") def PyCapsule_IsValid(capsule, name): @@ -68,7 +65,8 @@ class ParamExtType(pa.ExtensionType): def __init__(self, width): self._width = width - super().__init__(pa.binary(width), "pyarrow.tests.test_cffi.ParamExtType") + super().__init__(pa.binary(width), + "pyarrow.tests.test_cffi.ParamExtType") @property def width(self): @@ -84,18 +82,19 @@ def __arrow_ext_deserialize__(cls, storage_type, serialized): def make_schema(): - return pa.schema([("ints", pa.list_(pa.int32()))], metadata={b"key1": b"value1"}) + return pa.schema([('ints', pa.list_(pa.int32()))], + metadata={b'key1': b'value1'}) def make_extension_schema(): - return pa.schema([("ext", ParamExtType(3))], metadata={b"key1": b"value1"}) + return pa.schema([('ext', ParamExtType(3))], + metadata={b'key1': b'value1'}) def make_extension_storage_schema(): # Should be kept in sync with make_extension_schema - return pa.schema( - [("ext", ParamExtType(3).storage_type)], metadata={b"key1": b"value1"} - ) + return pa.schema([('ext', ParamExtType(3).storage_type)], + metadata={b'key1': b'value1'}) def make_batch(): @@ -104,7 +103,8 @@ def make_batch(): def make_extension_batch(): schema = make_extension_schema() - ext_col = schema[0].type.wrap_array(pa.array([b"foo", b"bar"], type=pa.binary(3))) + ext_col = schema[0].type.wrap_array(pa.array([b"foo", b"bar"], + type=pa.binary(3))) return pa.record_batch([ext_col], schema) @@ -149,7 +149,8 @@ def test_export_import_type(): pa.int32()._export_to_c(ptr_schema) bad_format = ffi.new("char[]", b"zzz") c_schema.format = bad_format - with pytest.raises(ValueError, match="Invalid or unsupported format string"): + with pytest.raises(ValueError, + match="Invalid or unsupported format string"): pa.DataType._import_from_c(ptr_schema) # Now released with assert_schema_released: @@ -249,7 +250,8 @@ def check_export_import_schema(schema_factory, expected_schema_factory=None): # Not a struct type pa.int32()._export_to_c(ptr_schema) - with pytest.raises(ValueError, match="ArrowSchema describes non-struct type"): + with pytest.raises(ValueError, + match="ArrowSchema describes non-struct type"): pa.Schema._import_from_c(ptr_schema) # Now released with assert_schema_released: @@ -264,7 +266,8 @@ def test_export_import_schema(): @needs_cffi def test_export_import_schema_with_extension(): # Extension type is unregistered => the storage type is imported - check_export_import_schema(make_extension_schema, make_extension_storage_schema) + check_export_import_schema(make_extension_schema, + make_extension_storage_schema) # Extension type is registered => the extension type is imported with registered_extension_type(ParamExtType(1)): @@ -332,7 +335,8 @@ def check_export_import_batch(batch_factory): # Not a struct type pa.int32()._export_to_c(ptr_schema) batch_factory()._export_to_c(ptr_array) - with pytest.raises(ValueError, match="ArrowSchema describes non-struct type"): + with pytest.raises(ValueError, + match="ArrowSchema describes non-struct type"): pa.RecordBatch._import_from_c(ptr_array, ptr_schema) # Now released with assert_schema_released: @@ -391,9 +395,9 @@ def make_py_record_batch_reader(schema, batches): @needs_cffi -@pytest.mark.parametrize( - "reader_factory", [make_ipc_stream_reader, make_py_record_batch_reader] -) +@pytest.mark.parametrize('reader_factory', + [make_ipc_stream_reader, + make_py_record_batch_reader]) def test_export_import_batch_reader(reader_factory): c_stream = ffi.new("struct ArrowArrayStream*") ptr_stream = int(ffi.cast("uintptr_t", c_stream)) @@ -422,9 +426,9 @@ def test_export_import_exception_reader(): def gen(): if True: try: - raise ValueError("foo") + raise ValueError('foo') except ValueError as e: - raise NotImplementedError("bar") from e + raise NotImplementedError('bar') from e else: yield from make_batches() @@ -436,10 +440,10 @@ def gen(): reader.read_next_batch() # inner *and* outer exception should be present - assert "ValueError: foo" in str(exc_info.value) - assert "NotImplementedError: bar" in str(exc_info.value) + assert 'ValueError: foo' in str(exc_info.value) + assert 'NotImplementedError: bar' in str(exc_info.value) # Stacktrace containing line of the raise statement - assert "raise ValueError('foo')" in str(exc_info.value) + assert 'raise ValueError(\'foo\')' in str(exc_info.value) assert pa.total_allocated_bytes() == old_allocated @@ -449,11 +453,9 @@ def test_imported_batch_reader_error(): c_stream = ffi.new("struct ArrowArrayStream*") ptr_stream = int(ffi.cast("uintptr_t", c_stream)) - schema = pa.schema([("foo", pa.int32())]) - batches = [ - pa.record_batch([[1, 2, 3]], schema=schema), - pa.record_batch([[4, 5, 6]], schema=schema), - ] + schema = pa.schema([('foo', pa.int32())]) + batches = [pa.record_batch([[1, 2, 3]], schema=schema), + pa.record_batch([[4, 5, 6]], schema=schema)] buf = make_serialized(schema, batches) # Open a corrupt/incomplete stream and export it @@ -464,9 +466,9 @@ def test_imported_batch_reader_error(): reader_new = pa.RecordBatchReader._import_from_c(ptr_stream) batch = reader_new.read_next_batch() assert batch == batches[0] - with pytest.raises( - OSError, match="Expected to be able to read 16 bytes " "for message body, got 8" - ): + with pytest.raises(OSError, + match="Expected to be able to read 16 bytes " + "for message body, got 8"): reader_new.read_next_batch() # Again, but call read_all() @@ -475,17 +477,15 @@ def test_imported_batch_reader_error(): del reader reader_new = pa.RecordBatchReader._import_from_c(ptr_stream) - with pytest.raises( - OSError, match="Expected to be able to read 16 bytes " "for message body, got 8" - ): + with pytest.raises(OSError, + match="Expected to be able to read 16 bytes " + "for message body, got 8"): reader_new.read_all() -@pytest.mark.parametrize( - "obj", - [pa.int32(), pa.field("foo", pa.int32()), pa.schema({"foo": pa.int32()})], - ids=["type", "field", "schema"], -) +@pytest.mark.parametrize('obj', [pa.int32(), pa.field('foo', pa.int32()), + pa.schema({'foo': pa.int32()})], + ids=['type', 'field', 'schema']) def test_roundtrip_schema_capsule(obj): gc.collect() # Make sure no Arrow data dangles in a ref cycle old_allocated = pa.total_allocated_bytes() @@ -505,19 +505,15 @@ def test_roundtrip_schema_capsule(obj): assert pa.total_allocated_bytes() == old_allocated -@pytest.mark.parametrize( - "arr,schema_accessor,bad_type,good_type", - [ - (pa.array(["a", "b", "c"]), lambda x: x.type, pa.int32(), pa.string()), - ( - pa.record_batch([pa.array(["a", "b", "c"])], names=["x"]), - lambda x: x.schema, - pa.schema({"x": pa.int32()}), - pa.schema({"x": pa.string()}), - ), - ], - ids=["array", "record_batch"], -) +@pytest.mark.parametrize('arr,schema_accessor,bad_type,good_type', [ + (pa.array(['a', 'b', 'c']), lambda x: x.type, pa.int32(), pa.string()), + ( + pa.record_batch([pa.array(['a', 'b', 'c'])], names=['x']), + lambda x: x.schema, + pa.schema({'x': pa.int32()}), + pa.schema({'x': pa.string()}) + ), +], ids=['array', 'record_batch']) def test_roundtrip_array_capsule(arr, schema_accessor, bad_type, good_type): gc.collect() # Make sure no Arrow data dangles in a ref cycle old_allocated = pa.total_allocated_bytes() @@ -541,28 +537,22 @@ def test_roundtrip_array_capsule(arr, schema_accessor, bad_type, good_type): del capsule assert pa.total_allocated_bytes() == old_allocated - with pytest.raises( - ValueError, match=r"Could not cast.* string to requested .* int32" - ): + with pytest.raises(ValueError, + match=r"Could not cast.* string to requested .* int32"): arr.__arrow_c_array__(bad_type.__arrow_c_schema__()) schema_capsule, array_capsule = arr.__arrow_c_array__( - good_type.__arrow_c_schema__() - ) + good_type.__arrow_c_schema__()) arr_out = import_array(schema_capsule, array_capsule) assert schema_accessor(arr_out) == good_type # TODO: implement requested_schema for stream -@pytest.mark.parametrize( - "constructor", - [ - pa.RecordBatchReader.from_batches, - # Use a lambda because we need to re-order the parameters - lambda schema, batches: pa.Table.from_batches(batches, schema), - ], - ids=["recordbatchreader", "table"], -) +@pytest.mark.parametrize('constructor', [ + pa.RecordBatchReader.from_batches, + # Use a lambda because we need to re-order the parameters + lambda schema, batches: pa.Table.from_batches(batches, schema), +], ids=['recordbatchreader', 'table']) def test_roundtrip_reader_capsule(constructor): batches = make_batches() schema = batches[0].schema @@ -588,12 +578,12 @@ def test_roundtrip_reader_capsule(constructor): obj = constructor(schema, batches) # TODO: turn this to ValueError once we implement validation. - bad_schema = pa.schema({"ints": pa.int32()}) + bad_schema = pa.schema({'ints': pa.int32()}) with pytest.raises(NotImplementedError): obj.__arrow_c_stream__(bad_schema.__arrow_c_schema__()) # Can work with matching schema - matching_schema = pa.schema({"ints": pa.list_(pa.int32())}) + matching_schema = pa.schema({'ints': pa.list_(pa.int32())}) capsule = obj.__arrow_c_stream__(matching_schema.__arrow_c_schema__()) imported_reader = pa.RecordBatchReader._import_from_c_capsule(capsule) assert imported_reader.schema == matching_schema @@ -637,29 +627,3 @@ def test_roundtrip_chunked_array_capsule_requested_schema(): requested_capsule = requested_type.__arrow_c_schema__() with pytest.raises(NotImplementedError): chunked.__arrow_c_stream__(requested_capsule) - - -def test_roundtrip_chunked_array_capsule(): - chunked = pa.chunked_array([pa.array(["a", "b", "c"])]) - - capsule = chunked.__arrow_c_stream__() - assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 - imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) - assert imported_chunked.type == chunked.type - assert imported_chunked == chunked - - -def test_roundtrip_chunked_array_capsule_requested_schema(): - chunked = pa.chunked_array([pa.array(["a", "b", "c"])]) - - # Requesting the same type should work - requested_capsule = chunked.type.__arrow_c_schema__() - capsule = chunked.__arrow_c_stream__(requested_capsule) - imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) - assert imported_chunked == chunked - - # Casting to something else should error - requested_type = pa.binary() - requested_capsule = requested_type.__arrow_c_schema__() - with pytest.raises(NotImplementedError): - chunked.__arrow_c_stream__(requested_capsule)