Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add apply interface in transaction #596

Open
ZENOTME opened this issue Sep 1, 2024 · 10 comments
Open

Add apply interface in transaction #596

ZENOTME opened this issue Sep 1, 2024 · 10 comments

Comments

@ZENOTME
Copy link
Contributor

ZENOTME commented Sep 1, 2024

This will work for now but might get problematic later on. Just a heads up.

An important concept for Iceberg is to stack snapshots in a single commit. For example, now with append being added in this PR, we can easily add support for truncate. This would be a delete operation where all the data is being dropped, and then just an append.

Originally posted by @Fokko in #349 (comment)

Transaction should be able reflect the update in time. According to pyiceberg, we can provide a apply interface to update the table metedata.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Sep 1, 2024

Exactly. Internally you want to stack the changes together. For example, within a single transaction, you add a new field and then write the data, then the latest schema should be taken into account. We do this in PyIceberg here: https://github.com/apache/iceberg-python/blob/03a0d65ac05d556d0815e61a016effc2b8993702/pyiceberg/table/__init__.py#L715

We can implement the update_table_metadata based on #587. cc @liurenjie1024 @Xuanwo @Fokko @c-thiel

@liurenjie1024
Copy link
Collaborator

Hi, @ZENOTME Could you elaborate on this? I'm kind of confusing about the proposal.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Sep 4, 2024

Hi, @ZENOTME Could you elaborate on this? I'm kind of confusing about the proposal.

For now, transaction can't reflect the update in time so we can stack them together.
e.g.

// table is a v1 table
let tx = Transaction(table);
// This will end up sending two UpgradeFormatVersion into catalog
tx.upgrade_table_version().unwrap().
   .upgrade_table_version().unwrap().commit()

But In pyiceberg, above behaviour will only send one UpgradeFormatVersion and the second one will see that the metadata of table has been updated. The update will be apply into local medata and reflect the change first.

@liurenjie1024
Copy link
Collaborator

We have check to avoid such duplicated case. For metastore tables, it's supposed to apply transaction actions in local, and update metastore pointer. For rest catalog, it should be sent to rest catalog server.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Sep 8, 2024

We have check to avoid such duplicated case. For metastore tables, it's supposed to apply transaction actions in local, and update metastore pointer. For rest catalog, it should be sent to rest catalog server.

Why for rest catalog, it should be sent to rest catalog server.🤔
According to API from pyiceberg, it seems possible to create a transaction without auto commit , which means that we also can apply transaction actions in local for rest catalog(do I miss something here)

@liurenjie1024
Copy link
Collaborator

Why for rest catalog, it should be sent to rest catalog server.🤔
According to API from pyiceberg, it seems possible to create a transaction without auto commit , which means that we also can apply transaction actions in local for rest catalog(do I miss something here)

Sorry, I don't quite get the point, if updates are sent to rest catalog server, why we need to update it in local first?

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Sep 24, 2024

Sorry, I don't quite get the point, if updates are sent to rest catalog server, why we need to update it in local first?

E.g. the user wants to batch multiple updates and commit them once. For the following, after tx.action2().apply(), the update will not be sent to the catalog, but it should apply this action to table metadata locally so that action3 can realize that.

let tx = table.transaction()
tx.action1().apply() // Update table versi
tx.action2().apply() // Append new data file 
tx.action3().apply()
tx.commit()

@liurenjie1024
Copy link
Collaborator

Sorry, I don't quite get the point, if updates are sent to rest catalog server, why we need to update it in local first?

E.g. the user wants to batch multiple updates and commit them once. For the following, after tx.action2().apply(), the update will not be sent to the catalog, but it should apply this action to table metadata locally so that action3 can realize that.

let tx = table.transaction()
tx.action1().apply() // Update table versi
tx.action2().apply() // Append new data file 
tx.action3().apply()
tx.commit()

This still don't answer in what case we need to apply the updates in local before sending to rest catalog server, the rest catalog spec allows for sending multi updates as one transaction: https://github.com/apache/iceberg/blob/6319712b612b724fedbc5bed41942ac3426ffe48/open-api/rest-catalog-open-api.yaml#L705

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Oct 1, 2024

Sorry, I don't quite get the point, if updates are sent to rest catalog server, why we need to update it in local first?

E.g. the user wants to batch multiple updates and commit them once. For the following, after tx.action2().apply(), the update will not be sent to the catalog, but it should apply this action to table metadata locally so that action3 can realize that.

let tx = table.transaction()
tx.action1().apply() // Update table versi
tx.action2().apply() // Append new data file 
tx.action3().apply()
tx.commit()

This still don't answer in what case we need to apply the updates in local before sending to rest catalog server, the rest catalog spec allows for sending multi updates as one transaction: https://github.com/apache/iceberg/blob/6319712b612b724fedbc5bed41942ac3426ffe48/open-api/rest-catalog-open-api.yaml#L705

Let's use the following pyiceberg example to illustrate that there may be an error in multiple updates if we do not apply the update locally before sending it to rest catalog server.

...
// 1. insert data and delete it in the same transaction, we will send all updates to the rest catalog server until   tx.commit_transaction()
>>> tx = tbl.transaction()
>>> df = pa.Table.from_pylist([{"key":1,"value":1}],schema=schema)
>>> tx.append(df)               // 2. pyiceberg will apply the update locally in https://github.com/apache/iceberg-python/blob/e891bcddb1584c6b7a35b61537ab5802b514ec6d/pyiceberg/table/__init__.py#L276
>>> tx.delete("value==1")  // 3. so that delete can realize that there are some data that need to delete 
>>> tx.commit_transaction()

// 4. Expect result, not data insert 
>>> tbl.scan().to_arrow()
pyarrow.Table
key: int32
value: int32
----
key: []
value: []

If we comment out on the https://github.com/apache/iceberg-python/blob/e891bcddb1584c6b7a35b61537ab5802b514ec6d/pyiceberg/table/__init__.py#L276 to cancel update locally, we can see the unexpected result happened.

>>> tx.append(df)
>>> tx.delete("value=1") // 1. delete can't realized that there are some data that need to delete 
/Users/ze/Project/iceberg-python/pyiceberg/table/__init__.py:622: UserWarning: Delete operation did not match any records
  warnings.warn("Delete operation did not match any records")
>>> tx.commit_transaction()

// 2. We can see that the data will not be delete
>>> table.scan().to_arrow()
/Users/ze/Project/iceberg-python/pyiceberg/avro/decoder.py:185: UserWarning: Falling back to pure Python Avro decoder, missing Cython implementation
  warnings.warn("Falling back to pure Python Avro decoder, missing Cython implementation")
pyarrow.Table
key: int32
value: int32
----
key: [[1]]
value: [[1]]
>>> 

This is my understanding for originally posted by @Fokko in #349 (comment). Please correct me if there are something wrong here.

@liurenjie1024
Copy link
Collaborator

Thanks @ZENOTME 's explaination. I think I've got your point, we need sth like commit in transaction action so that later transaction action could take into account previously happened changes. The requires to build stage only snapshots.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants