Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster ingestion from Parquet #346

Closed
jonashaag opened this issue Feb 1, 2024 · 3 comments
Closed

Faster ingestion from Parquet #346

jonashaag opened this issue Feb 1, 2024 · 3 comments

Comments

@jonashaag
Copy link
Contributor

Question

I'm using the new pyiceberg write functionality. I wonder if there is any way to make it faster in my scenario:

I have around 1 TiB of Parquet files (zstd 3 compressed) that I want to ingest into Iceberg.

Table sizes are ~ power law distributed: The largest table is 25 % of total size, and there are ~ 100 tables.

Since Iceberg wants to repartition data I don't see a way to have it use my Parquet files without rewriting them.

Is it possible to use multiple cores for writing the Parquet files? I don't think that's something that PyArrow supports natively but it might be possible to run multiple PyArrow writers?

@kevinjqliu
Copy link
Contributor

Hello! As of right now, the Pyiceberg write API requires the input to be read as pyarrow tables and can only write to unpartitioned tables. Partitioned write support is currently worked on in #208.

With that said, I wonder if there's a way to create Iceberg tables without rewriting the data files. Technically, Iceberg is a metadata layer on top of the actual data files. The metadata is created from table file stats.

def commit(self) -> Snapshot:
new_manifests = self._manifests()
next_sequence_number = self._table.next_sequence_number()
summary = self._summary()
manifest_list_file_path = _generate_manifest_list_path(
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid
)
with write_manifest_list(
format_version=self._table.metadata.format_version,
output_file=self._table.io.new_output(manifest_list_file_path),
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
sequence_number=next_sequence_number,
) as writer:
writer.add_manifests(new_manifests)
snapshot = Snapshot(
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
manifest_list=manifest_list_file_path,
sequence_number=next_sequence_number,
summary=summary,
schema_id=self._table.schema().schema_id,
)
with self._table.transaction() as tx:
tx.add_snapshot(snapshot=snapshot)
tx.set_ref_snapshot(
snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
)
return snapshot

It would be super fast to just read the input files, collect metadata, and create the necessary metadata files.

@kevinjqliu
Copy link
Contributor

Another avenue to look into. On the Spark side of Iceberg, there's a add_files procedure that can create an Iceberg table without rewriting parquet files.

This command will create metadata for the new files and will not move them.

And it supports table partitioning!

@Fokko
Copy link
Contributor

Fokko commented May 13, 2024

Fixed in #444

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants