Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Fixed certain operations failing to add new data files during retries #9230

Merged
merged 8 commits into from
Dec 18, 2023

Conversation

jasonf20
Copy link
Contributor

@jasonf20 jasonf20 commented Dec 6, 2023

Since the following PR: #6335 FastAppend and subclasses of MergingSnapshotProducer will skip newly added data files during retries.

This happens because the cached value is set to an empty list instead of null and then if commit is called again, when newDataFilesAsManifests is called the logic is skipped and no data files are returned.

This commit fixes the issue and adds some tests that fail without the fix.

See #9222 for an earlier discussion.

Initially, I thought this bug should be solved with validation but upon further discussion, I am re-submitting this with tests.

Fixes #9227

@github-actions github-actions bot added the core label Dec 6, 2023
@jasonf20 jasonf20 changed the title Fixed certain operations failing to add new data files during retries Core: Fixed certain operations failing to add new data files during retries Dec 6, 2023
Assert.assertTrue("Should commit same new manifest", new File(newManifest.path()).exists());
Assert.assertTrue(
"Should commit the same new manifest",
metadata.currentSnapshot().allManifests(FILE_IO).contains(newManifest));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually testing that the manifest is reused across commit attempts because it came from apply that was called after the latest failure (without a cleanup operation).

Instead of manually exercising this path, why not use a transaction here and fail just once to validate?

Copy link
Contributor Author

@jasonf20 jasonf20 Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this to not refresh the manifest. Can you clarify what you mean about failing just once, it's significant for this bug report that the entire commitTransaction call fails. The behavior with a single failure and internal retries is different.


Assertions.assertThatThrownBy(append::commit)
.isInstanceOf(CommitFailedException.class)
.hasMessage("Injected failure");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is likely failing the style checks. You can run ./gradlew spotlessApply to fix it.

@jasonf20 jasonf20 force-pushed the fix-retries-skipping-data branch 2 times, most recently from f89a103 to aa9bee7 Compare December 6, 2023 20:46
@nastra
Copy link
Contributor

nastra commented Dec 7, 2023

@ConeyLiu could you also please take a look at this?

@@ -895,7 +895,7 @@ private void cleanUncommittedAppends(Set<ManifestFile> committed) {
}
}

this.cachedNewDataManifests = committedNewDataManifests;
this.cachedNewDataManifests = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than setting this to null, I think a better approach would be to handle this exactly like it's done for cachedNewDeleteManifests, where cachedNewDeleteManifests is initialized to be a linked list and then is iterated over and files are removed.

I did a quick check with

--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -94,7 +94,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   private PartitionSpec dataSpec;
 
   // cache new data manifests after writing
-  private List<ManifestFile> cachedNewDataManifests = null;
+  private final List<ManifestFile> cachedNewDataManifests = Lists.newLinkedList();
   private boolean hasNewDataFiles = false;
 
   // cache new manifests for delete files
@@ -885,17 +885,15 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   }
 
   private void cleanUncommittedAppends(Set<ManifestFile> committed) {
-    if (cachedNewDataManifests != null) {
-      List<ManifestFile> committedNewDataManifests = Lists.newArrayList();
-      for (ManifestFile manifest : cachedNewDataManifests) {
-        if (committed.contains(manifest)) {
-          committedNewDataManifests.add(manifest);
-        } else {
-          deleteFile(manifest.path());
+    if (!cachedNewDataManifests.isEmpty()) {
+      ListIterator<ManifestFile> dataManifestsIterator = cachedNewDataManifests.listIterator();
+      while (dataManifestsIterator.hasNext()) {
+        ManifestFile dataManifest = dataManifestsIterator.next();
+        if (!committed.contains(dataManifest)) {
+          deleteFile(dataManifest.path());
+          dataManifestsIterator.remove();
         }
       }
-
-      this.cachedNewDataManifests = null;
     }
 
     ListIterator<ManifestFile> deleteManifestsIterator = cachedNewDeleteManifests.listIterator();
@@ -950,12 +948,12 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   }
 
   private List<ManifestFile> newDataFilesAsManifests() {
-    if (hasNewDataFiles && cachedNewDataManifests != null) {
+    if (hasNewDataFiles && !cachedNewDataManifests.isEmpty()) {
       cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
-      cachedNewDataManifests = null;
+      cachedNewDataManifests.clear();
     }
 
-    if (cachedNewDataManifests == null) {
+    if (cachedNewDataManifests.isEmpty()) {
       try {
         RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec());
         try {
@@ -968,7 +966,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
           writer.close();
         }
 
-        this.cachedNewDataManifests = writer.toManifestFiles();
+        this.cachedNewDataManifests.addAll(writer.toManifestFiles());

and that seemed to work. We would probably want to use the same approach in FastAppend (but I haven't checked that part for FastAppend)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @nastra this solution is much better and solid. We should not set cachedNewDataManifests to null directly. Consider the following case:

    1. Transaction transaction = table.newTransaction();
    
    2. transaction.newAppend().appendFile(...).commit(); // generate new manifest file
    
    3. // transaction operation failed

    4. transaction.commitTransaction(); // failed to commit

With the changes in this PR, the new manifest files generated by step 2 will not be deleted in step 4. I think the failed UTs could be verified this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I pushed a new commit that uses the linked list like your patch.

Copy link
Contributor Author

@jasonf20 jasonf20 Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra Wouldn't it be safer to set the list to empty at the end of this function? If this list isn't emptied it can lead to the same bug since it will think it doesn't need to produce manifests in the newDataFilesAsManifests method.

I don't think this is an issue right now because I expect all commits to fail or succeed so all files to be deleted, but maybe for safety we should just clear it?

@ConeyLiu If I understand correctly the files are only cleared once the transaction/operation fully fails so it should clear everything at the end right?

@jasonf20 jasonf20 requested a review from nastra December 7, 2023 18:22
@@ -313,6 +313,37 @@ public void testRecoveryWithoutManifestList() {
metadata.currentSnapshot().allManifests(FILE_IO).contains(newManifest));
}

@Test
public void testRecoveryWithTransaction() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test doesn't fail, I'm guessing you were planning to update FastAppend?

Copy link
Contributor Author

@jasonf20 jasonf20 Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a test and fix for FastAppend that failed in my first commit. It only fails when not using a transaction.

Based on Ryan's comments in slack I removed that for now (though it's still a bug since it can break the table) but still added a new test to cover the case with a transaction that wasn't covered before.

During manual retries (calling commit again on a failed change) the
operation would partially succeed. For example Rewrite may only write
the deletes and not the appends.

This commit fixes the issue and adds some tests that fail without the
fix.
@jasonf20
Copy link
Contributor Author

jasonf20 commented Dec 14, 2023

Based on further Slack discussions I re-added the FastApply fix.

Couple of notes:

  • I think it’s better to clear the lists instead of removing one item at a time. If the list is left partially full it will create the same bug. This can’t happen as far as I can tell since the entire transaction will fail or succeed, but in terms of protecting ourselves, clearing might be better.
  • I found a similar bug with Transaction, it would also not clear its cache properly and end up committing a TableMetadata that pointed to a manifest file that got deleted. I added a fix for this as well in the PR (4th commit)

Fixed trying to delete an already deleted file during cleanup. This
would cause the cleanup to fail early and skip the deletion of all
subsequent files.
This is a similar issue to the bugs with manual retries with SnapshotProducer but the transaction was caching the TableMetadata which points to deleted files after cleanup.
This forces the transaction to re-apply the changes ensuring a new valid TableMetadata is used
@danielcweeks danielcweeks added this to the Iceberg 1.4.3 milestone Dec 15, 2023
@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2023

@jasonf20, sorry for being unclear in my reply earlier, but I don't think that the tests in this PR reproduce the error. The tests here reproduce similar errors, but do it by manually calling commit on operations multiple times, which is only allowed by transactions.

After looking at the problem more, I was able to produce a test case that used both the transaction and append APIs correctly and lost data. Here's the test:

  @Test
  public void testTransactionRecommit() {
    // update table settings to merge when there are 3 manifests
    table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "3").commit();

    // create manifests so that the next commit will trigger a merge
    table.newFastAppend().appendFile(FILE_A).commit();
    table.newFastAppend().appendFile(FILE_B).commit();

    // start a transaction with appended files that will merge
    Transaction transaction = Transactions.newTransaction(table.name(), table.ops());

    AppendFiles append = transaction.newAppend().appendFile(FILE_D);
    Snapshot pending = append.apply();

    Assert.assertEquals(
        "Should produce 1 pending merged manifest", 1, pending.allManifests(table.io()).size());

    // because a merge happened, the appended manifest is deleted the by append operation
    append.commit();

    // concurrently commit FILE_A without a transaction to cause the previous append to retry
    table.newAppend().appendFile(FILE_C).commit();
    Assert.assertEquals(
        "Should produce 1 committed merged manifest",
        1,
        table.currentSnapshot().allManifests(table.io()).size());

    transaction.commitTransaction();

    Set<String> paths =
        Sets.newHashSet(
            Iterables.transform(
                table.newScan().planFiles(), task -> task.file().path().toString()));
    Set<String> expectedPaths = Sets.newHashSet(
        FILE_A.path().toString(),
        FILE_B.path().toString(),
        FILE_C.path().toString(),
        FILE_D.path().toString()
    );

    Assert.assertEquals("Should contain all committed files", expectedPaths, paths);

    Assert.assertEquals(
        "Should produce 2 manifests",
        2,
        table.currentSnapshot().allManifests(table.io()).size());
  }

The problem happens when two concurrent commits both compact the latest manifests. When that happens, the new data file manifest is removed by the operation cleanup logic because it was not committed to the transaction. Then, when the transaction needs to re-apply the change, the reuse logic notices that the appended file manifests have already been written and reuses them. However, the list is no longer correct because it was filtered.

There are a few things to take away from this. First, this doesn't affect fast appends because that operation never merges manifests. However, it's still incorrect to filter the added manifests list so we should apply the same fix to FastAppend that we do to MergingSnapshotProducer. Second, we can't recover when any new file manifest has been deleted, so I think the solution is to catch when any manifest is deleted and rewrite the batch by setting the reused manifests to null. That's like your original solution, but the difference is that we need to check whether deleteFunc is called and only set the cached manifests to null if that's the case:

diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index e06a491098..bde92daa4a 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -879,16 +879,20 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
 
   private void cleanUncommittedAppends(Set<ManifestFile> committed) {
     if (cachedNewDataManifests != null) {
+      boolean hasDelete = false;
       List<ManifestFile> committedNewDataManifests = Lists.newArrayList();
       for (ManifestFile manifest : cachedNewDataManifests) {
         if (committed.contains(manifest)) {
           committedNewDataManifests.add(manifest);
         } else {
           deleteFile(manifest.path());
+          hasDelete = true;
         }
       }
 
-      this.cachedNewDataManifests = committedNewDataManifests;
+      if (hasDelete) {
+        cachedNewDataManifests = null;
+      }
     }
 
     ListIterator<ManifestFile> deleteManifestsIterator = cachedNewDeleteManifests.listIterator();

This is going to be better for transactions than always setting the cached manifests to null because the manifests are usually in the committed list. We also need to check whether similar logic is needed for cachedNewDeleteManifests, which is also modified. I think we will need it.

@jasonf20
Copy link
Contributor Author

jasonf20 commented Dec 18, 2023

Hi @rdblue. Thanks for info.

Good to know about the manifest compaction conflict case. I was looking for a way the list could be partially cleared and this answers that.

I have pushed another commit returning to my original fix with the hasDelete optimization.

Some more notes/questions:

  1. The transaction test I added did fail before the fix since the cleanup is called with EMPTY_SET anyway if the commit fails and the manifest list files are also cleaned up in SnapshotProducer::cleanAll(). The manifest lists being cleaned up required adding forceReApply fix and the manifest files are fixed by the change in MergingSnapshotProducer. I think this fix is still relevant as the test fails without it. Removing either of the fixes causes the test to fail.

  2. I understand that calling commit() again is only allowed by transaction in theory, but the API doesn't prevent this currently and I think it's safer to fix it for now. The only change that is specifically for this use case is the forceReApply I added to the transaction, the other changes are needed regardless. If we decide to block this at some point in the future the tests I added can be modified to verify that they get an invalid usage exception.

What do you think?

@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2023

I think this fix is still relevant as the test fails without it.

To which test case are you referring? I didn't think any of them were valid uses of the API because commit was called multiple times and the behavior is undefined.

The only change that is specifically for this use case is the forceReApply I added to the transaction

What is the purpose of forceReApply?

I think that we should focus on fixing the bug and then discuss cases where commit or commitTransaction are called multiple times. The important thing is getting a fix we can put in a patch release.

@jasonf20
Copy link
Contributor Author

jasonf20 commented Dec 18, 2023

To which test case are you referring? I didn't think any of them were valid uses of the API because commit was called multiple times and the behavior is undefined.

To the test cases, I added to TestTransaction that calls commit multiple times. I understand the behaviour is undefined but it works with this fix and without it, it changes the table state to a state that isn't queryable.

What is the purpose of forceReApply?

When commit is called again on a transaction that failed and was cleaned up it commits a manifest list that was deleted. This remembers that the cleanup was called and applies the changes of the transaction operations again to create a valid state that it can commit again.

I think that we should focus on fixing the bug and then discuss cases where commit or commitTransaction are called multiple times. The important thing is getting a fix we can put in a patch release.

Alright, I pushed a commit that removes forceReApply from the transaction and removed the test cases that call commit multiple times so this fix can get merged asap. The other code path does break the table though, but we can fix it in a separate PR.

Let me know if this is good and I can squash all the commits when ready to merge.

@jasonf20 jasonf20 requested a review from rdblue December 18, 2023 19:27
@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2023

Thanks, @jasonf20! The current version looks great. I'll commit it when tests are passing.

For the issues caused by calling commit more than once, I think we should keep state and fail if someone calls commit a second time on an operation. Transaction creates operations, so we can have it work around that restriction by passing a flag when creating instances. Then we would close off the ability to do weird things by recommitting.

@rdblue rdblue merged commit d6eba2a into apache:main Dec 18, 2023
41 checks passed
nastra pushed a commit to nastra/iceberg that referenced this pull request Dec 19, 2023
nastra added a commit that referenced this pull request Dec 19, 2023
lisirrx pushed a commit to lisirrx/iceberg that referenced this pull request Jan 4, 2024
geruh pushed a commit to geruh/iceberg that referenced this pull request Jan 26, 2024
devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Manually retrying commits can result in partially applied changes
5 participants