-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add Catalog Transactions API #6948
base: main
Are you sure you want to change the base?
Conversation
d2a85cd
to
82ce4ff
Compare
82ce4ff
to
d2abd6d
Compare
d2abd6d
to
d1fb14a
Compare
70e61d9
to
fa0a20c
Compare
7dd1277
to
e6b9549
Compare
e6b9549
to
1c1ce96
Compare
1c1ce96
to
94d3e20
Compare
enum IsolationLevel { | ||
|
||
/** | ||
* All reads that are being made will see the last committed values that existed when the table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have doubt about this definition:
All reads that are being made will see the last committed values that existed when the table was loaded first inside the catalog transaction.
This is different from the definition in other places like here:
SNAPSHOT isolation specifies that data read within a transaction will never reflect changes made by other simultaneous transactions.
The key difference is when the table was loaded first inside the catalog transaction, which means there will be a time difference between when 2 tables are loaded in the transaction.
Consider the following case of 2 processes:
process 1:
t0: CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
Catalog txCatalog = catalogTransaction.asCatalog();
t1: Table table1 = txCatalog.load(TableIdentifier.of("db", "table1"))
t2: Table table2 = txCatalog.load(TableIdentifier.of("db", "table2"))
process 2:
t1.5: table2.newAppend().addFiles(...).commit()
This means the state of tables in the same transaction in process1 is different.
When translated to a real-life SQL use case, it means:
process 1:
SELECT * FROM table1 JOIN table2 ON ...
process 2:
INSERT INTO table2 ...
has a chance to cause a phantom read in process 1, and that clearly violates the isolation guarantee
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995, I think the behavior for the situation you're describing is an open question for the snapshot
isolation level. First, though, I want to cover some background on how we think about serializable
isolation to make sure we are thinking about "simultaneous" the same way.
For serializable
, the requirement is that there exists some ordering of transactions that produces the correct result. Say transaction 1 starts and reads from a table, then transaction 2 commits to that table, and finally transaction 1 attempts to commit. Iceberg will allow the commit as long as the result of the commit is not affected by the changes made by transaction 2. That is, Iceberg reorders the transactions if the reads would have produced the same result. This relies on checking conflicts in tables that have been updated. And the idea of when a transaction happens or starts is flexible in this model.
Another complication is the idea of a point in time in a catalog. There's no guarantee that catalogs have an order of changes that applies across tables. The only requirement imposed by Iceberg is that there is a linear history for any single table and there isn't a general happens-before relationship between commits across tables. There is, however, a limited relationship between tables involved in transactions.
I think there's a good argument that we can't just use that as a reason not to do anything. After all, I can have situations where completely ignoring the point-in-time concern is clearly wrong:
CREATE TABLE a (id bigint);
CREATE TABLE b (id bigint);
INSERT INTO a VALUES (1);
-- Time T1
BEGIN TRANSACTION;
INSERT INTO b SELECT id FROM a;
TRUNCATE TABLE a;
COMMIT;
-- Time T2
If I were to read from a
at T1 and read from b
at T2, then I could see row(id=1)
twice, even though there was never a "time" when it was in both tables because the transaction was atomic. I don't think I'd call this a "dirty read" because that means uncommitted data was read, but it's still clearly a problem.
I think there are 3 options:
- Coordinate table loading with the catalog
- Use validations that data read has not changed
- Define snapshot isolation differently, or change the name to something else
Right now, the difference between 2 and 3 is basically adding checks so we don't have to decide right away. That's why it's like this today.
1. Coordinate table loading with the catalog
To coordinate table loading, we'd add something like startTransaction()
to get a transaction state. Then we'd pass that state back to the catalog in order to load at the start time. You can imagine the catalog passing back basically a transaction ID and loading tables using the state of that TID.
I don't think this is a viable option. It's a lot of work to define the APIs and there are still catalogs that can't implement it. The result is that we'd end up with inconsistent behavior across catalogs, where some catalogs just load the latest copy of the table because that's all they can do.
2. Use validations that data read has not changed
This option is similar to how serializable
is handled. The transaction "start" time floats -- you can use data that was read as long as when the transaction commits, the outcome would be the same.
The main drawback of this approach is that it isn't clear whether this is actually distinct from serializable
, which basically does the same validation.
3. Define snapshot
isolation differently or rename it
This option may seem crazy, but there's a good argument for it. If there's little difference between serializable
and snapshot
isolation with option 2, then serializable
should be used for cases like the SQL above. However, we clearly want to be able to relax the constraints for certain cases:
- Simultaneous
DELETE FROM
andINSERT INTO
-- it makes little sense to fail a delete because a matching row was just inserted, when a slightly different order (delete commits first) would have been perfectly fine - External coordination -- running a scheduled
MERGE INTO
that selects from a source table that's constantly updated should generally succeed, even if the source table is modified. Most of the time, consumption will be incremental and coordinated externally. The important thing is knowing what version of the source table was used, not failing if it is updated during the job.
These two cases align with the existing behavior of snapshot
isolation in single-table commits -- but it makes sense there because "snapshot" applies to a single table. If the table is simultaneously modified, the original version is used and new data is ignored. However, deletions are validated.
Last comment
I think that captures the background for the decision that we need to make on this. Right now, I think the best approach is to collect use cases and think about how they should be handled, rather than making a decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my late reply on this topic.
The current implementation does perform option 2, where it detects that read data hasn't been changed. This is being done for snapshot
as well as serializable
isolation.
The only difference between snapshot
and serializable
isolation in the current implementation is that serializable
also detects and avoids write skew
.
@jackye1995 did you have a chance to look at the implementation and the tests or is the definition of snapshot
isolation confusing?
@@ -99,7 +99,7 @@ protected Schema tableSchema() { | |||
return schema; | |||
} | |||
|
|||
protected TableScanContext context() { | |||
public TableScanContext context() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be made public. I think we'll need to find a different way to report context.
/** | ||
* Create a new {@link CatalogTransaction} with the given {@link IsolationLevel}. | ||
* | ||
* @param isolationLevel The isolation level to use. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually omit .
from situations like this.
Discussed offline with Ryan, Daniel and Eduard. I think we should be good here. The requirement to have all table snapshots consistent at the starting time of transaction is not a hard requirement in the definition of snapshot or serializable isolation. For the non-repeatable or phantom read issue raised, the issue has to happen in a repeated read of the same table in the same transaction. So in the example I gave, table2 is only read once, so whatever state is at the catalog load time could be considered the valid status and not a non-repeatable or phantom read. There are 2 cases which could cause phantom read:
In the first case, although not strictly enforced, but most query engines cache the metadata when fetching table from catalog, so it is not possible to read new data in the second scan of the table. This is something that ideally engines should enforce, but definitely not something that catalog should enforce. In the second case, a multi-statement transaction must implement the proposed transaction API, so we should be good there. I will continue to think about any other edge cases, but at least we are not blocked by this. |
DataFiles.builder(SPEC) | ||
.withPath("/path/to/data-a.parquet") | ||
.withFileSizeInBytes(10) | ||
.withPartitionPath("data_bucket=0") // easy way to set partition data for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the comment is still true, although there's no need to change the code. You should now be able to do something like this:
.withPartition(TestHelpers.Row.of(0))
} | ||
|
||
@Test | ||
public void catalogTxWithSingleOp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: unnecessary abbreviation makes code harder to read. Generally prefer "transaction" over "tx" or "txn".
|
||
@Test | ||
public void catalogTxWithSingleOp() { | ||
catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you intend to import SNAPSHOT as well as SERIALIZABLE? Usually, we'd import IsolationLevel
so there is a bit more context for both: IsolationLevel.SERIALIZABLE
.
for (String tbl : tables) { | ||
TableMetadata current = | ||
((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh(); | ||
assertThat(current.snapshots()).isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this should cast to BaseTable
so much. You could use the Table
API for this assertion. The Table
API is much more reliable than casting to an assumed implementation.
assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); | ||
|
||
txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit(); | ||
txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally prefer for tests to be more targeted, with just enough changes to exercise the behavior you want to demonstrate. I get wanting to have a multi-table transaction here, but this doesn't require other modifications to third
and it doesn't require 3 tables.
Assertions.assertThatThrownBy(catalogTransaction::commitTransaction) | ||
.isInstanceOf(ValidationException.class) | ||
.hasMessageContaining( | ||
"SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this different than the CommitFailedException
?
There are two possible failures:
- The schema update conflicts, so the REST catalog sends back
CommitFailedException
- A scan projected the dropped column before the concurrent schema update, so a
CommitFailedException
is sent by the REST catalog and the commit fails on retry because the underlying data has changed
I think the exception here is too broad and should be one of those cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may need a helper to perform checks on the schema. I think what you'd need to do is to validate that the projected columns (by ID) are still present in the latest schema.
// perform updates outside the catalog TX | ||
three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); | ||
Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot(); | ||
assertThat(snapshot).isNotNull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can assume that normal operations that do not go through CatalogTransaction
behave the way you expect. That will allow you to avoid longer tests by omitting those assertions. It shouldn't be necessary to assert that the latest snapshot is non-null, nor should it be necessary to check that the data
column was deleted in the example above.
Assertions.assertThatThrownBy(catalogTransaction::commitTransaction) | ||
.isInstanceOf(ValidationException.class) | ||
.hasMessageContaining( | ||
"SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these tests just checking that the table was modified at all?
// perform updates outside catalog TX but before table has been read inside the catalog TX | ||
one.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); | ||
|
||
Snapshot snapshot = ((BaseTable) one).operations().refresh().currentSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can use the Table
API.
assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("2"); | ||
|
||
// this should not fail with any isolation level | ||
txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to use names that tie tables and identifiers together, like oneIdent
so that it is clear that this is loading table one
.
Also, this adds the same file to the table twice, which is probably not a good thing to do in tests because it doesn't match real-world behavior.
} | ||
|
||
@Test | ||
public void readOnlyTxWithSerializableOnBranchShouldNotFail() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to use the same approach as some of the branch tests and have a commit
method that can use a branch or can use main, making it easy to parameterize the whole suite to run on branches.
table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit(); | ||
} | ||
|
||
TableIdentifier first = TableIdentifier.of("ns", "a"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these could be moved out of test cases and into constants.
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.catalog; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many of these tests are good validations that the transaction is isolated. You might want to separate these into more specific suites, like tests of isolation for uncommitted transactions. This is going to get pretty large otherwise.
What's missing are tests of real-world cases that will help us look at the behavior of transactions. For that, I think it would be helpful to refactor and introduce methods with high-level names, like createTestTable(A_IDENT)
or removeColumn(t.load(A_IDENT), "data")
.
eb99486
to
3d9c7cb
Compare
3d9c7cb
to
998e71e
Compare
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
I have written up a design doc, which is available here
I think eventually we'd want to split this into 2 PRs, so that the introduced APIs can be reviewed independently from the REST-specific things.