Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyarrow type error #541

Closed
dev-goyal opened this issue Mar 23, 2024 · 8 comments · Fixed by #848
Closed

Pyarrow type error #541

dev-goyal opened this issue Mar 23, 2024 · 8 comments · Fixed by #848

Comments

@dev-goyal
Copy link

dev-goyal commented Mar 23, 2024

Apache Iceberg version

0.6.0 (latest release)

Please describe the bug 🐞

Given a table like so:

In [36]: table
Out[36]:
matches(
 ...
  14: player_last_session: optional timestamptz,
...
  30: subject_last_session: optional timestamptz,
),
partition by: [run_date, player_agg_cluster_name, initiating_at],
sort order: [],
snapshot: Operation.APPEND: id=6595288807809068528, schema_id=0

I get the following error

In [25]: table.scan().to_arrow()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[25], line 1
----> 1 table.scan().to_arrow()

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/table/__init__.py:1418, in DataScan.to_arrow(self)
   1415 def to_arrow(self) -> pa.Table:
   1416     from pyiceberg.io.pyarrow import project_table
-> 1418     return project_table(
   1419         self.plan_files(),
   1420         self.table,
   1421         self.row_filter,
   1422         self.projection(),
   1423         case_sensitive=self.case_sensitive,
   1424         limit=self.limit,
   1425     )

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:1114, in project_table(tasks, table, row_filter, projected_schema, case_sensitive, limit)
   1111 if limit is not None:
   1112     _ = [f.cancel() for f in futures if not f.done()]
-> 1114 tables = [f.result() for f in completed_futures if f.result()]
   1116 if len(tables) < 1:
   1117     return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema))

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:1114, in <listcomp>(.0)
   1111 if limit is not None:
   1112     _ = [f.cancel() for f in futures if not f.done()]
-> 1114 tables = [f.result() for f in completed_futures if f.result()]
   1116 if len(tables) < 1:
   1117     return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema))

File ~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
    447     raise CancelledError()
    448 elif self._state == FINISHED:
--> 449     return self.__get_result()
    451 self._condition.wait(timeout)
    453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:957, in _task_to_table(fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, row_counts, limit, name_mapping)
    954 if metadata := physical_schema.metadata:
    955     schema_raw = metadata.get(ICEBERG_SCHEMA)
    956 file_schema = (
--> 957     Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema, name_mapping)
    958 )
    960 pyarrow_filter = None
    961 if bound_row_filter is not AlwaysTrue():

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:655, in pyarrow_to_schema(schema, name_mapping)
    651 else:
    652     raise ValueError(
    653         "Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
    654     )
--> 655 return visit_pyarrow(schema, visitor)

File ~/.pyenv/versions/3.11.7/lib/python3.11/functools.py:909, in singledispatch.<locals>.wrapper(*args, **kw)
    905 if not args:
    906     raise TypeError(f'{funcname} requires at least '
    907                     '1 positional argument')
--> 909 return dispatch(args[0].__class__)(*args, **kw)

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:676, in _(obj, visitor)
    674 @visit_pyarrow.register(pa.Schema)
    675 def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:
--> 676     return visitor.schema(obj, visit_pyarrow(pa.struct(obj), visitor))

File ~/.pyenv/versions/3.11.7/lib/python3.11/functools.py:909, in singledispatch.<locals>.wrapper(*args, **kw)
    905 if not args:
    906     raise TypeError(f'{funcname} requires at least '
    907                     '1 positional argument')
--> 909 return dispatch(args[0].__class__)(*args, **kw)

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:685, in _(obj, visitor)
    683 for field in obj:
    684     visitor.before_field(field)
--> 685     result = visit_pyarrow(field.type, visitor)
    686     results.append(visitor.field(field, result))
    687     visitor.after_field(field)

File ~/.pyenv/versions/3.11.7/lib/python3.11/functools.py:909, in singledispatch.<locals>.wrapper(*args, **kw)
    905 if not args:
    906     raise TypeError(f'{funcname} requires at least '
    907                     '1 positional argument')
--> 909 return dispatch(args[0].__class__)(*args, **kw)

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:718, in _(obj, visitor)
    716 if pa.types.is_nested(obj):
    717     raise TypeError(f"Expected primitive type, got: {type(obj)}")
--> 718 return visitor.primitive(obj)

File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:891, in _ConvertToIceberg.primitive(self, primitive)
    888     primitive = cast(pa.FixedSizeBinaryType, primitive)
    889     return FixedType(primitive.byte_width)
--> 891 raise TypeError(f"Unsupported type: {primitive}")

TypeError: Unsupported type: timestamp[ns]

After some debugging, at this line I find

ipdb> physical_schema
player_last_session: timestamp[ns]
...
subject_last_session: timestamp[ns]

I imagine the fix is to do something like this on this line, but currently those overrides are not exposed. Am I on the right track?

I believe that this issue is somewhat similar to #520

@Fokko
Copy link
Contributor

Fokko commented Mar 23, 2024

@dev-goyal Thanks for raising this. It looks like that's a timestamp with nanosecond precision. Support for nanosecond timestamps is currently being added in the latest specification. Can I ask how you wrote the Parquet file?

@dev-goyal
Copy link
Author

dev-goyal commented Mar 23, 2024

Thanks @Fokko, makes sense! I was able to simply reduce precision on my end so it's not a big deal, but I figured it couldn't hurt to raise this.

I wrote these data using DBT into an iceberg table (Athena/Trino as the engine) (sourced from a CDC topic, hence the nanosecond precision), they are represented originally as Timestamp.

@kevinjqliu
Copy link
Contributor

Hey @dev-goyal, do you mind posting a snippet of your example above? I think this is very similar to #520

In #520, the iceberg table is created with pyarrow schema. Internally, iceberg converts the schema and "downcast" certain types (large_string -> string, timestamp nano -> timestamp).
So when the pyarrow data (with the same schema) is saved as iceberg table, there is a schema mismatch. Iceberg schema is downcast/translated while pyarrow data schema is unchanged.

#523 should help solve this

@bigluck
Copy link

bigluck commented Apr 8, 2024

I'm facing a similar issue in my code.

Tested using main@7fcdb8d25dfa2498ba98a2b8e8d2b327d85fa7c9 (the commit after Minor fixes, #523 followup (#563) and Cast data to Iceberg Table's pyarrow schema (#523))

In my case I'm creating a new table from this arrow schema:

ds: timestamp[ns]
yhat: double
yhat_lower: double
yhat_upper: double
-- schema metadata --

This is the full stacktrace:

Traceback (most recent call last):
  File "/bpln/cba723bb/82eddf43/pip/runtime/s3write/invoke.py", line 62, in invoke
    self._pyiceberg_write_model(model)
  File "/bpln/cba723bb/82eddf43/pip/runtime/s3write/invoke.py", line 137, in _pyiceberg_write_model
    pyiceberg_table = catalog.create_table(
                      ^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/runtime/shared/pyiceberg_patch.py", line 105, in create_table
    iceberg_schema = self._convert_schema_if_needed(schema)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/catalog/__init__.py", line 559, in _convert_schema_if_needed
    schema: Schema = visit_pyarrow(schema, _ConvertToIcebergWithoutIDs())  # type: ignore
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 909, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/io/pyarrow.py", line 682, in _
    return visitor.schema(obj, visit_pyarrow(pa.struct(obj), visitor))
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 909, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/io/pyarrow.py", line 691, in _
    result = visit_pyarrow(field.type, visitor)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 909, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/io/pyarrow.py", line 726, in _
    return visitor.primitive(obj)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/io/pyarrow.py", line 899, in primitive
    raise TypeError(f"Unsupported type: {primitive}")
TypeError: Unsupported type: timestamp[ns]

@bigluck
Copy link

bigluck commented Apr 8, 2024

So, timestamp_ns & timestamptz_ns has been added on the v3 of the iceberg specs, pyiceberg right now supports v1 &v2.

In my case, the column has been generated by this user-generated snipped:

df['ds'] = pd.to_datetime(df["pickup_datetime"]).dt.date

Unfortunately, I can not control what a user can write and how she produces the table.

What's the recommended solution for downcasting unsupported column types into something less precise, without raising an error?

@Fokko
Copy link
Contributor

Fokko commented Apr 8, 2024

Ciao @bigluck. Thanks for jumping in here. Until V3 is finalized, we can add a flag to cast a nanosecond to a microsecond precision. Would that work for you?

@bigluck
Copy link

bigluck commented Apr 8, 2024

@Fokko it sounds good to me! :)

@kp-tom-sc
Copy link

kp-tom-sc commented Jul 4, 2024

For anyone else that stumbles across this, you can:

    timestamp_fields = [field.name for field in tbl.schema if pa.types.is_timestamp(field.type)]
    null_fields = [field.name for field in tbl.schema if pa.types.is_null(field.type)]
    fields = []

    for field in tbl.schema:
        if field.name in timestamp_fields:
            fields.append(pa.field(field.name, pa.timestamp("us")))
        elif field.name in null_fields:
            fields.append(pa.field(field.name, pa.string()))
        else:
            fields.append(field)

    tbl = tbl.cast(pa.schema(fields))

(where tbl is a pyarrow Table)

as nulls are also not supported

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants