Skip to content

Commit

Permalink
Update table metadata throughout transaction (apache#471)
Browse files Browse the repository at this point in the history
* Update table metadata throughout transaction

This PR add support for updating the table metadata throughout
the transaction.

This way, if a schema is first evolved, and then a snapshot is
created based on the latest schema, it will be able to find the
schema.

* Fix integration tests

* Thanks Honah!

* Include the partition evolution

* Cleanup
  • Loading branch information
Fokko authored Feb 29, 2024
1 parent 9a5bb07 commit 6708a6e
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 339 deletions.
21 changes: 11 additions & 10 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
visit_with_partner,
)
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
Expand Down Expand Up @@ -1720,7 +1721,7 @@ def fill_parquet_file_metadata(
data_file.split_offsets = split_offsets


def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[Schema] = None) -> Iterator[DataFile]:
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
task = next(tasks)

try:
Expand All @@ -1730,15 +1731,15 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S
except StopIteration:
pass

parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)

file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
file_schema = file_schema or table.schema()
arrow_file_schema = schema_to_pyarrow(file_schema)
file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
schema = table_metadata.schema()
arrow_file_schema = schema_to_pyarrow(schema)

fo = table.io.new_output(file_path)
fo = io.new_output(file_path)
row_group_size = PropertyUtil.property_as_int(
properties=table.properties,
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
)
Expand All @@ -1757,16 +1758,16 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table.spec().spec_id,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(file_schema, table.properties),
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
return iter([data_file])

Expand Down
Loading

0 comments on commit 6708a6e

Please sign in to comment.