Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] iceberg writes unpartitioned #2016

Merged
merged 15 commits into from
Mar 20, 2024
Merged

Conversation

samster25
Copy link
Member

No description provided.

@github-actions github-actions bot added the enhancement New feature or request label Mar 19, 2024
# We perform the merge here since IcebergTable is not pickle-able
merge = _MergingSnapshotProducer(operation=operation, table=table)

builder = self._builder.write_iceberg(table)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko can you take a look to see if this is fine?

Since we can't pickle the Iceberg Table and we still want distributed writes, we're having daft write out the parquet files and then return pyiceberg DataFiles in the distributed dataframe. Then we commit the data files with _MergingSnapshotProducer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @samster25

Since we can't pickle the Iceberg Table and we still want distributed writes, we're having daft write out the parquet files and then return pyiceberg DataFiles in the distributed dataframe.

That sounds like a good approach!

Then we commit the data files with _MergingSnapshotProducer.

Typically you want to use a higher level API. For example the transaction API: https://github.com/apache/iceberg-python/blob/a7f207f7e5831b3be02bd023c4b33babc3ea13f6/pyiceberg/table/__init__.py#L1153-L1161

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return pa.table(columns, schema=input_schema)


def write_iceberg(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko This is where we are writing out the iceberg files. We should be correctly writing out the field_id via schema_to_pyarrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the most important part I was looking for. You can easily check this using parq:

parq 00000-0-39ec4caa-4d45-46b9-a6e9-c35cfb4e9290-0.parquet --schema

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x131ca1c80>
required group field_id=1 schema {
  optional double field_id=2 lat;
  optional double field_id=3 long;
}

else:
raise ValueError(f"Only support `append` or `overwrite` mode. {mode} is unsupported")

# We perform the merge here since IcebergTable is not pickle-able
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -527,3 +520,185 @@ def file_visitor(written_file, i=i):
for c_name in partition_values.column_names():
data_dict[c_name] = partition_values.get_column(c_name).take(partition_idx_series)
return MicroPartition.from_pydict(data_dict)


def coerce_pyarrow_table_to_schema(pa_table: pa.Table, input_schema: pa.Schema) -> pa.Table:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this very frustrating of Arrow where you cannot just cast the table. I did already quite a bit of work on this on the Arrow side, but is still not complete :(

Comment on lines +626 to +628
# TODO: these should be populate by `properties` but pyiceberg doesn't support them yet
target_file_size = 512 * 1024 * 1024
TARGET_ROW_GROUP_SIZE = 128 * 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

codecov bot commented Mar 20, 2024

Codecov Report

Attention: Patch coverage is 23.87097% with 118 lines in your changes are missing coverage. Please review.

Project coverage is 81.31%. Comparing base (d2f28d6) to head (0a620a4).
Report is 1 commits behind head on main.

❗ Current head 0a620a4 differs from pull request most recent head b3cd0fc. Consider uploading reports for the commit b3cd0fc to get more accurate results

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2016      +/-   ##
==========================================
- Coverage   82.71%   81.31%   -1.40%     
==========================================
  Files          62       62              
  Lines        6623     6766     +143     
==========================================
+ Hits         5478     5502      +24     
- Misses       1145     1264     +119     
Files Coverage Δ
daft/execution/physical_plan.py 94.69% <50.00%> (-0.49%) ⬇️
daft/execution/rust_physical_plan_shim.py 94.44% <50.00%> (-4.05%) ⬇️
daft/execution/execution_step.py 91.36% <59.09%> (-2.20%) ⬇️
daft/logical/builder.py 83.19% <8.33%> (-8.40%) ⬇️
daft/dataframe/dataframe.py 82.70% <4.34%> (-6.11%) ⬇️
daft/table/table_io.py 75.17% <23.80%> (-14.78%) ⬇️

@samster25 samster25 enabled auto-merge (squash) March 20, 2024 07:17
@samster25 samster25 merged commit c2db062 into main Mar 20, 2024
29 checks passed
@samster25 samster25 deleted the sammy/iceberg-writes-unpartitioned branch March 20, 2024 07:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants