Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Segfault in to_pandas() on batch from IPC stream in specific edge cases #41469

Open
Tom-Newton opened this issue Apr 30, 2024 · 14 comments

Comments

@Tom-Newton
Copy link
Contributor

Tom-Newton commented Apr 30, 2024

Describe the bug, including details regarding any error messages, version, and platform.

So far I've only been able to reproduce this case with pyspark but I think the bug is probably on the arrow side. The problem was introduced with #15210 and reverting this change still fixes the problem on the 16.0.0 release.

Reproduce

The smallest reproducer I've found is the following.
reproduce_pyspark.py.txt (it has a .txt extensions because github doesn't let me upload .py)
Versions:

pandas==2.2.2
pyspark==3.5.1
pyarrow==16.0.0

python 3.10.14 

Error is:

py4j.protocol.Py4JJavaError: An error occurred while calling o63.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 32) (192.168.1.222 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault

Current thread 0x00007f3d6621a740 (most recent call first):
  File "/home/tomnewton/segault-venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 188 in arrow_to_pandas

full_stdout.txt

A few things I've noticed:

  1. Reproducible with various combinations of nullable nested arrays that may contain null, where the data is null at some level before the final layer of nesting.
  2. Adding a second row which is not-null avoids the problem.
  3. I was unable to reproduce the problem just by creating a chunked array that looks the same and calling to_pandas on it.
  4. I was unable to reproduce with an IPC stream fully in python created from a pyarrow table.
  5. I was unable to reproduce with highly nested struct or map types.

Component(s)

C++, Python, Java

@jorisvandenbossche jorisvandenbossche changed the title Segfault in to_pandas() on batch from IPC stream in specific edge cases [Python] Segfault in to_pandas() on batch from IPC stream in specific edge cases May 1, 2024
@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented May 1, 2024

Thanks for the report!

I was unable to reproduce with an IPC stream fully in python created from a pyarrow table.

Could you try creating an IPC file from the pyspark dataframe? (I don't know if pyspark provides the functionality for that) Or can you convert the pyspark dataframe to pyarrow first (not going through pandas), and then save it?

And something else you could try: does it still reproduce after a roundtrip to Parquet?

@jorisvandenbossche
Copy link
Member

The quick attempt at reproducing this in pyarrow, but as you also observed that doesn't crash:

import pyarrow as pa

typ = pa.struct([
    pa.field("id", pa.int64()),
    pa.field("value", pa.list_(pa.list_(pa.list_(pa.list_(pa.list_(pa.float64()))))))
])
arr = pa.array([{"id": 0, "value": None}], typ)
arr.to_pandas()

@Tom-Newton
Copy link
Contributor Author

Tom-Newton commented May 1, 2024

Thanks for the suggestions. PySpark doesn't really support this but I can hack it to make it do this.

does it still reproduce after a roundtrip to Parquet?

No, after a roundtrip to parquet the problem no longer occurs. To test I modified this section of pyspark https://github.com/apache/spark/blob/v3.5.1/python/pyspark/sql/pandas/serializers.py#L323-L324

Approximately normal
for batch in batches:
    pyarrow_table = pa.Table.from_batches([batch])
    yield [self.arrow_to_pandas(c) for c in pyarrow_table.itercolumns()]
With round trip in parquet
for batch in batches:
    import tempfile
    import pyarrow.parquet
    with tempfile.TemporaryFile() as tempdir:
        pyarrow_table = pa.Table.from_batches([batch])
        pyarrow.parquet.write_table(pyarrow_table, tempdir)
        read_back_pyarrow_table = pyarrow.parquet.read_table(tempdir)
    yield [self.arrow_to_pandas(c) for c in read_back_pyarrow_table.itercolumns()]

Could you try creating an IPC file from the pyspark dataframe? (I don't know if pyspark provides the functionality for that) Or can you convert the pyspark dataframe to pyarrow first (not going through pandas), and then save it?

I managed to hack something in

My hack
      reader = pa.ipc.open_stream(stream)

      with open("/tmp/arrow_stream", "wb") as write_file:
          with pa.ipc.new_stream(write_file, reader.schema) as writer:
              for batch in reader:
                  writer.write_batch(batch)

      with open("/tmp/arrow_stream", "rb") as read_file:
          with pa.ipc.open_stream(read_file) as reader:
              for batch in reader:
                  yield batch

Original is https://github.com/apache/spark/blob/v3.5.1/python/pyspark/sql/pandas/serializers.py#L108-L113

And now it can be reproduced with just pyarrow.

        with open("/tmp/arrow_stream", "rb") as read_file:
            with pa.ipc.open_stream(read_file) as reader:
                schema = reader.schema

                for batch in reader:
                    [c.to_pandas(date_as_object=True) for c in record_batches_to_chunked_array(batch)]

def record_batches_to_chunked_array(batch):
    for c in pa.Table.from_batches([batch]).itercolumns():
        yield c

arrow_stream.txt (the .txt extension is just so Github lets me upload it, its actually a binary dump of the arrow stream created with the hack I mentioned above).

@Tom-Newton
Copy link
Contributor Author

Wait.... there is something else weird going. This smaller reproduce doesn't always work depending on the python environment.

@Tom-Newton
Copy link
Contributor Author

Tom-Newton commented May 1, 2024

So it turns out the bug is also only reproducible when numpy is imported prior to pyarrow. So my current smallest reproduce is

import numpy as np
import pyarrow as pa

with open("/tmp/arrow_stream", "rb") as read_file:
    with pa.ipc.open_stream(read_file) as reader:
        schema = reader.schema

        for batch in reader:
            batch.to_pandas()

print("SUCCESS")

Where the file is as in my previous comment arrow_stream.txt

@Tom-Newton
Copy link
Contributor Author

Tom-Newton commented May 1, 2024

I managed to attach the a debugger so I can see a bit about why its segfaulting.

Ultimately the segfault is on arrow/array/array_nested.h:90. Suspiciously the value of data_->offset is -189153672, in the call to value_offset that seg faults. All the previous calls had data_->offset is 0. -189153672 is a deterministic value when I run with the arrow_stream file I got out of spark. Interestingly we still get this -189153672 value without the numpy import but without numpy it does not segfault.

For an IPC file created from python data_->offset is always 0 and there is no segfault. So, potentially the problem is actually on the Java side.

Python code to create similar IPC file
schema = pa.schema(
      [pa.field("value", pa.list_(pa.list_(pa.list_(pa.list_(pa.list_(pa.list_(pa.float64())))))))]
  )
  pyarrow_table = pa.Table.from_arrays([pa.array([None])], schema=schema)
  with open("/tmp/arrow_stream2", "wb") as write_file:
      with pa.ipc.new_stream(write_file, schema) as writer:
          for batch in pyarrow_table.to_batches():
              writer.write_batch(batch)

arrow_stream2.txt

@jorisvandenbossche
Copy link
Member

Thanks, I can now reproduce it as well!

I think you are right in the observation that this seems a problem with the data generated on the Java/Spark side (although it is still strange it segfaults or not depending on numpy being imported first or not)

When reading your IPC stream file without converting to pandas and inspecting the data, we can see that it is indeed invalid data:

import pyarrow as pa

with pa.ipc.open_stream("../Downloads/arrow_stream.txt") as reader:
    batch = reader.read_next_batch()

arr = batch["value"]

>>> arr
<pyarrow.lib.ListArray object at 0x7fe352a8d1e0>
[
  null,
  null,
  null
]

# Validating / inspecting the parent array
>>> arr.validate(full=True)
>>> arr.offsets
<pyarrow.lib.Int32Array object at 0x7fe29daa0460>
[
  0,
  0,
  0,
  0
]
>>> arr.values
<pyarrow.lib.ListArray object at 0x7fe29d84c0a0>
[]

# Validating / inspecting the first child array
>>> arr.values.validate(full=True)
>>> arr.values.offsets
<pyarrow.lib.Int32Array object at 0x7fe29f3ef880>
<Invalid array: Buffer #1 too small in array of type int32 and length 1: expected at least 4 byte(s), got 0
>>> arr.values.values
<pyarrow.lib.ListArray object at 0x7fe29f238760>
[]

So the offsets of the child array are missing. This child array has a length of 0, but following the format the offsets still need to have length of 1.
I seem to remember this is a case that has come up before.

@jorisvandenbossche
Copy link
Member

A different way to inspect the data using nanoarrow (using the arr from the code example above), where we can also see that each of the list child arrays has a "data_offset" buffer of 0 bytes:

>>> import nanoarrow as na
>>> na.array(arr).inspect()
<ArrowArray list<element: list<element: list<element: list<element: l>
- length: 3
- offset: 0
- null_count: 3
- buffers[2]:
  - validity <bool[1 b] 00000000>
  - data_offset <int32[16 b] 0 0 0 0>
- dictionary: NULL
- children[1]:
  'element': <ArrowArray list<element: list<element: list<element: list<elemen>
    - length: 0
    - offset: 0
    - null_count: 0
    - buffers[2]:
      - validity <bool[0 b] >
      - data_offset <int32[0 b] >
    - dictionary: NULL
    - children[1]:
      'element': <ArrowArray list<element: list<element: list<element: double>>
        - length: 0
        - offset: 0
        - null_count: 0
        - buffers[2]:
          - validity <bool[0 b] >
          - data_offset <int32[0 b] >
        - dictionary: NULL
        - children[1]:
          'element': <ArrowArray list<element: list<element: double>>>
            - length: 0
            - offset: 0
            - null_count: 0
            - buffers[2]:
              - validity <bool[0 b] >
              - data_offset <int32[0 b] >
            - dictionary: NULL
            - children[1]:
              'element': <ArrowArray list<element: double>>
                - length: 0
                - offset: 0
                - null_count: 0
                - buffers[2]:
                  - validity <bool[0 b] >
                  - data_offset <int32[0 b] >
                - dictionary: NULL
                - children[1]:
                  'element': <ArrowArray double>
                    - length: 0
                    - offset: 0
                    - null_count: 0
                    - buffers[2]:
                      - validity <bool[0 b] >
                      - data <double[0 b] >
                    - dictionary: NULL
                    - children[0]:

@jorisvandenbossche
Copy link
Member

Related issue: #31396

@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented May 2, 2024

And a more recent issue: #40038, with a fix in Arrow Java 16.0 for this (at least for strings, not sure if the lists were automatically fixed as well). Although the discussion was only about the C Data Interface, so for IPC they might still do the same as before (didn't check the PR in detail)

@Tom-Newton
Copy link
Contributor Author

And a more recent issue: #40038, with a fix in Arrow Java 16.0 for this (at least for strings, not sure if the lists were automatically fixed as well).

I did try building a custom pyspark based on arrow 16.0.0 on the Java side. Unfortunately it still fails in the same way. In fact the resulting stream was identical to the official pyspark release which uses arrow 12.0.1 on the java side.

I will probably try to create a minimal reproducer for the bad IPC file with java arrow and without spark.

@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented May 7, 2024

We are having some discussion about this on Zulip chat, and the conclusion might be that the C++ library is generally forgiving about this and accepts it as input.
But so if it does that, we should of course ensure we handle it properly in all cases internally and don't crash on such data (or at least error properly complaining about invalid data instead of segfaulting). Can try to take a look at fixing it in the pyarrow->pandas conversion.

@jorisvandenbossche jorisvandenbossche added this to the 18.0.0 milestone Aug 27, 2024
@jorisvandenbossche jorisvandenbossche modified the milestones: 18.0.0, 19.0.0 Oct 2, 2024
@paleolimbot
Copy link
Member

I suspect that the call to arr.value_offset(0) here might cause this:

// values() does not account for offsets, so we need to slice into it.
// We can't use Flatten(), because it removes the values behind a null list
// value, and that makes the offsets into original list values and our
// flattened_values array different.
std::shared_ptr<Array> flattened_values = arr.values()->Slice(
arr.value_offset(0), arr.value_offset(arr.length()) - arr.value_offset(0));
if (arr.value_type()->id() == Type::EXTENSION) {
const auto& arr_ext = checked_cast<const ExtensionArray&>(*flattened_values);
value_arrays.emplace_back(arr_ext.storage());
} else {
value_arrays.emplace_back(flattened_values);
}

...probably we can just bail before that if arr->length() == 0.

I can't seem to reproduce the crash here, although perhaps it's my environment:

import urllib.request

import numpy as np
import pyarrow as pa

with urllib.request.urlopen("https://github.com/apache/arrow/files/15181244/arrow_stream2.txt") as f:
    with open("/tmp/arrow_stream", "wb") as fout:
        fout.write(f.read())

with open("/tmp/arrow_stream", "rb") as read_file:
    with pa.ipc.open_stream(read_file) as reader:
        schema = reader.schema

        for batch in reader:
            batch.to_pandas()

print("SUCCESS")
#> SUCCESS

An experiment at creating something similar to see where some errors occur:

import io

import numpy

import nanoarrow as na
from nanoarrow import ipc  # requires about-to-be-released nanoarrow for writer
import pyarrow as pa
from pyarrow import ipc as pa_ipc

bad_arr = na.c_array_from_buffers(
    na.list_(na.list_(na.float64())),
    3,
    [na.c_buffer([False, False, False], na.bool_()), na.c_buffer([0, 0, 0, 0], na.int32())],
    null_count=3,
    children=[
        na.c_array_from_buffers(
            na.list_(na.float64()),
            0,
            [None, None],
            children=[na.c_array([], na.float64())]
        )
    ]
)

# This will error
# pa.array(bad_arr)
# ArrowInvalid: ArrowArrayStruct contains null data pointer for a buffer with non-zero computed size

bad_batch = na.c_array_from_buffers(
    na.struct({"col": bad_arr.schema}),
    3,
    [],
    null_count=0,
    children=[bad_arr]
)

buf = io.BytesIO()
with ipc.StreamWriter.from_writable(buf) as writer:
    writer.write_stream(bad_batch)

with pa_ipc.open_stream(buf.getvalue()) as reader:
    batch = reader.read_next_batch()
    bad_pyarr = batch.column(0)

bad_pyarr.values.offsets
import io

import nanoarrow as na
from nanoarrow import ipc  # requires about-to-be-released nanoarrow for writer
import pyarrow as pa
from pyarrow import ipc as pa_ipc

bad_arr = na.c_array_from_buffers(
    na.list_(na.list_(na.float64())),
    3,
    [na.c_buffer([False, False, False], na.bool_()), na.c_buffer([0, 0, 0, 0], na.int32())],
    null_count=3,
    children=[
        na.c_array_from_buffers(
            na.list_(na.float64()),
            0,
            [None, None],
            children=[na.c_array([], na.float64())]
        )
    ]
)

# This will error
# pa.array(bad_arr)
# ArrowInvalid: ArrowArrayStruct contains null data pointer for a buffer with non-zero computed size

bad_batch = na.c_array_from_buffers(
    na.struct({"col": bad_arr.schema}),
    3,
    [],
    null_count=0,
    children=[bad_arr]
)

buf = io.BytesIO()
with ipc.StreamWriter.from_writable(buf) as writer:
    writer.write_stream(bad_batch)

with pa_ipc.open_stream(buf.getvalue()) as reader:
    batch = reader.read_next_batch()
    bad_pyarr = batch.column(0)

bad_pyarr.values.offsets
#> <pyarrow.lib.Int32Array object at 0x10d463580>
#> <Invalid array: Buffer #1 too small in array of type int32 and length 1: expected at least 4 byte(s), got 0>

@Tom-Newton
Copy link
Contributor Author

Tom-Newton commented Oct 4, 2024

I just tried it and I also couldn't reproduce. So I tried creating a new stream in the same way as I did last time and with the new stream I can reproduce again.

import urllib.request
import numpy as np  # For some reason this needs to be imported before pyarrow to reproduce the segfault
import pyarrow as pa

with urllib.request.urlopen("https://github.com/user-attachments/files/17258296/arrow_stream_2024_10_04.txt") as f:
    with open("/tmp/arrow_stream", "wb") as fout:
        fout.write(f.read())

with open("/tmp/arrow_stream", "rb") as read_file:
    with pa.ipc.open_stream(read_file) as reader:
        schema = reader.schema
        print(schema)

        for batch in reader:
            print(batch.to_pandas())

print("SUCCESS")

arrow_stream_2024_10_04.txt

This was a fresh python venv using Python 3.10.14 then install latest version of pyarrow and pandas

$ pip freeze
numpy==2.1.1
pandas==2.2.3
pyarrow==17.0.0
python-dateutil==2.9.0.post0
pytz==2024.2
six==1.16.0
tzdata==2024.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants