Skip to content

Commit

Permalink
Flink: Fix duplicate data in Flink's upsert writer for format V2
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqishang committed Jun 20, 2024
1 parent 3d3e565 commit f55d507
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 75 deletions.
24 changes: 22 additions & 2 deletions core/src/main/java/org/apache/iceberg/io/WriteResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,30 @@
import org.apache.iceberg.util.CharSequenceSet;

public class WriteResult implements Serializable {
private long checkpointId;
private DataFile[] dataFiles;
private DeleteFile[] deleteFiles;
private CharSequence[] referencedDataFiles;

private WriteResult(
List<DataFile> dataFiles, List<DeleteFile> deleteFiles, CharSequenceSet referencedDataFiles) {
long checkpointId,
List<DataFile> dataFiles,
List<DeleteFile> deleteFiles,
CharSequenceSet referencedDataFiles) {
this.checkpointId = checkpointId;
this.dataFiles = dataFiles.toArray(new DataFile[0]);
this.deleteFiles = deleteFiles.toArray(new DeleteFile[0]);
this.referencedDataFiles = referencedDataFiles.toArray(new CharSequence[0]);
}

public void checkpointId(long checkpointId) {
this.checkpointId = checkpointId;
}

public long checkpointId() {
return checkpointId;
}

public DataFile[] dataFiles() {
return dataFiles;
}
Expand All @@ -56,16 +69,23 @@ public static Builder builder() {
}

public static class Builder {
private long checkpointId;
private final List<DataFile> dataFiles;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;

private Builder() {
this.checkpointId = -1;
this.dataFiles = Lists.newArrayList();
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
}

public Builder checkpointId(long checkpoint) {
this.checkpointId = checkpoint;
return this;
}

public Builder add(WriteResult result) {
addDataFiles(result.dataFiles);
addDeleteFiles(result.deleteFiles);
Expand Down Expand Up @@ -110,7 +130,7 @@ public Builder addReferencedDataFiles(Iterable<CharSequence> files) {
}

public WriteResult build() {
return new WriteResult(dataFiles, deleteFiles, referencedDataFiles);
return new WriteResult(checkpointId, dataFiles, deleteFiles, referencedDataFiles);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ public void testFlinkManifests() throws Exception {
public void testDeleteFiles() throws Exception {
assumeThat(formatVersion)
.as("Only support equality-delete in format v2 or later.")
.isGreaterThan(2);
.isGreaterThan(1);

long timestamp = 0;
long checkpoint = 10;
Expand Down Expand Up @@ -837,7 +837,7 @@ public void testDeleteFiles() throws Exception {
public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
assumeThat(formatVersion)
.as("Only support equality-delete in format v2 or later.")
.isGreaterThan(2);
.isGreaterThan(1);

long timestamp = 0;
long checkpoint = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ public void testFlinkManifests() throws Exception {
public void testDeleteFiles() throws Exception {
assumeThat(formatVersion)
.as("Only support equality-delete in format v2 or later.")
.isGreaterThan(2);
.isGreaterThan(1);

long timestamp = 0;
long checkpoint = 10;
Expand Down Expand Up @@ -837,7 +837,7 @@ public void testDeleteFiles() throws Exception {
public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
assumeThat(formatVersion)
.as("Only support equality-delete in format v2 or later.")
.isGreaterThan(2);
.isGreaterThan(1);

long timestamp = 0;
long checkpoint = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>

// The completed files cache for current checkpoint. Once the snapshot barrier received, it will
// be flushed to the 'dataFilesPerCheckpoint'.
private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
private final Map<Long, List<WriteResult>> writeResultsOfCurrentCkpt = Maps.newHashMap();
private final String branch;

// It will have an unique identifier for one job.
Expand Down Expand Up @@ -259,28 +259,28 @@ private void commitUpToCheckpoint(
long checkpointId)
throws IOException {
NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
List<ManifestFile> manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
List<ManifestFile> manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();

if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
// Skip the empty flink manifest.
continue;
} else {
DeltaManifests deltaManifests =
SimpleVersionedSerialization.readVersionAndDeSerialize(
DeltaManifestsSerializer.INSTANCE, e.getValue());
pendingResults.put(
e.getKey(),
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
manifests.addAll(deltaManifests.manifests());
}

DeltaManifests deltaManifests =
SimpleVersionedSerialization.readVersionAndDeSerialize(
DeltaManifestsSerializer.INSTANCE, e.getValue());
pendingResults.put(
e.getKey(),
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
manifests.addAll(deltaManifests.manifests());
CommitSummary summary = new CommitSummary(pendingResults);
commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, e.getKey());
committerMetrics.updateCommitSummary(summary);
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
}

CommitSummary summary = new CommitSummary(pendingResults);
commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
committerMetrics.updateCommitSummary(summary);
pendingMap.clear();
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
}

private void commitPendingResult(
Expand Down Expand Up @@ -427,7 +427,11 @@ private void commitOperation(

@Override
public void processElement(StreamRecord<WriteResult> element) {
this.writeResultsOfCurrentCkpt.add(element.getValue());
WriteResult writeResult = element.getValue();
List<WriteResult> writeResults =
writeResultsOfCurrentCkpt.computeIfAbsent(
writeResult.checkpointId(), k -> Lists.newArrayList());
writeResults.add(writeResult);
}

@Override
Expand All @@ -448,8 +452,8 @@ private byte[] writeToManifest(long checkpointId) throws IOException {
if (writeResultsOfCurrentCkpt.isEmpty()) {
return EMPTY_MANIFEST_DATA;
}

WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
List<WriteResult> writeResults = writeResultsOfCurrentCkpt.get(checkpointId);
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
result, () -> manifestOutputFileFactory.create(checkpointId), spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void open() {

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
flush();
flush(checkpointId);
this.writer = taskWriterFactory.create();
}

Expand All @@ -89,7 +89,7 @@ public void endInput() throws IOException {
// Note that if the task is not closed after calling endInput, checkpoint may be triggered again
// causing files to be sent repeatedly, the writer is marked as null after the last file is sent
// to guard against duplicated writes.
flush();
flush(Long.MAX_VALUE);
}

@Override
Expand All @@ -102,13 +102,14 @@ public String toString() {
}

/** close all open files and emit files to downstream committer operator */
private void flush() throws IOException {
private void flush(long checkpointId) throws IOException {
if (writer == null) {
return;
}

long startNano = System.nanoTime();
WriteResult result = writer.complete();
result.checkpointId(checkpointId);
writerMetrics.updateFlushResult(result);
output.collect(new StreamRecord<>(result));
writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));
Expand Down
Loading

0 comments on commit f55d507

Please sign in to comment.