Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Fix duplicate data with upsert writer in case of aborted checkpoints #10526

Merged
merged 8 commits into from
Aug 26, 2024

Conversation

zhongqishang
Copy link
Contributor

resolve #10431

@github-actions github-actions bot added the flink label Jun 18, 2024
@pvary
Copy link
Contributor

pvary commented Jun 18, 2024

@zhongqishang: Why would this solve the issue?

In my mental model, this only changes when the files are closed. The files are added in the IcebergFilesCommitter operator state in the snapshotState and to the table in the notifyCheckpointComplete, so I think the issue could be about how calling these methods are orchestrated.

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 18, 2024

@zhongqishang: Why would this solve the issue?

I am trying to solve this problem. This scenario is hard to reproduce.

I want to re-express the background of the problem. The checkpoint was triggered twice in succession, and the previous checkpoint was canceled. Cancelling the checkpoint did not cause the flink job to fail. Therefore, the results of prepareSnapshotPreBarrier twice were generated in one commit.

Moving to snapshot is just to ensure that flush success is bound to checkpoint success, so that the above scenario will not occur.

Of course I'm not sure if this will solve the problem or if it will cause other problems.

@pvary Thank you very much for your guidance.

@pvary
Copy link
Contributor

pvary commented Jun 18, 2024

@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly?

Also, could you please check if the IcebergFilesCommitter operator is restarted, or not in the meantime?

@zhongqishang
Copy link
Contributor Author

@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly?

Just like you said.

Also, could you please check if the IcebergFilesCommitter operator is restarted, or not in the meantime?

Operator not restarted. The flink job is normal.

@pvary
Copy link
Contributor

pvary commented Jun 18, 2024

@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly?

Just like you said.

Sadly, I don't understand your answer fully.
What do we know about the writers/sources? Are they restarted?

Also, could you please check if the IcebergFilesCommitter operator is restarted, or not in the meantime?

Operator not restarted. The flink job is normal.

Do we have logs like these?
https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L204C10-L204C63
https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240
https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240

@zhongqishang
Copy link
Contributor Author

@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly?

Just like you said.

Sadly, I don't understand your answer fully. What do we know about the writers/sources? Are they restarted?

I'm sorry for not expressing myself clearly. I confirmed that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. I confirmed that the writers and the source are not restarted.

Do we have logs like these? https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L204C10-L204C63 https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240 https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240

Not found the logs.

I think the checkpoint is not executing snapshotState and the checkpoint is aborted after running prepareSnapshotPreBarrier.

@pvary
Copy link
Contributor

pvary commented Jun 19, 2024

Thanks for the clarification around the restarts.

Not found the logs.

I think the checkpoint is not executing snapshotState and the checkpoint is aborted after running prepareSnapshotPreBarrier.

That's strange. Could you please double check (log levels, TM logs)? These are the logs which should be there if the changes are committed by the Flink job.

If the logs aren't there, you can cross-check by the snapshot summary. Here please check if the flink jobId, checkpointId, operatorId is there or missing on the commit.

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 19, 2024

That's strange. Could you please double check (log levels, TM logs)? These are the logs which should be there if the changes are committed by the Flink job.

https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L204C10-L204C63 https://github.com/apache/iceberg/blob/apache-iceberg-

For #10431 (comment)

log level is INFO, A normal checkpoint has a commit log.

Start to flush snapshot state to state backend, table: iceberg_catalog.ods_iceberg_db.xx, checkpointId: 86446
// The log of aborted checkpoint 86447 is missing
Start to flush snapshot state to state backend, table: iceberg_catalog.ods_iceberg_db.xx, checkpointId: 86448

https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240

This log does not exist in the all tm log.

@pvary
Copy link
Contributor

pvary commented Jun 19, 2024

log level is INFO, A normal checkpoint has a commit log.

Start to flush snapshot state to state backend, table: iceberg_catalog.ods_iceberg_db.xx, checkpointId: 86446
// The log of aborted checkpoint 86447 is missing
Start to flush snapshot state to state backend, table: iceberg_catalog.ods_iceberg_db.xx, checkpointId: 86448

This means, that the IcebergFilesCommitter checkpointing is not started at all.
Since the files are created we could conclude that the IcebergStreamWriter.flush is called. Also, since the next write didn't fail, I would expect that the prepareSnapshotPreBarrier is finished correctly. That would mean, that the created data files are collected in the flush method... So everything seems normal.

Again.. are we sure that we don't get duplicated records from the input stream for whatever reasons?

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 19, 2024

Again.. are we sure that we don't get duplicated records from the input stream for whatever reasons?

The task has upsert enabled.
All data will be entered multiple times from the input stream. I have hundreds of tasks. Only in the scenario I described will two pieces of data with the same ID be displayed in the query results. Upsert should only keep one.

We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint.

As you described before.

The problem occurs when the result of 2 prepareSnapshotPreBarriers appears in one commit.

#10431 (comment)

As I described before, in the one commit, it will query 2 pieces of data with the same id.

@pvary
Copy link
Contributor

pvary commented Jun 19, 2024

So in summary:

  • The duplicated data is expected as this is coming from the input stream
  • The issue is that the WriteResult does not contain the checkpointId, and the IcebergFilesCommitter commits the changes for 2 checkpoints in a single snapshot.

So the fix should be something along these lines:

  • Add checkpointId to the WriteResult
  • Change the IcebergFileCommitter to consider the checkpointId in the WriteResults and generate multiple commits if needed.

@rodmeneses: Is this issue solved naturally by the new SinkV2 implemetation?

@github-actions github-actions bot added the core label Jun 20, 2024
@zhongqishang zhongqishang changed the title Flink: move flush operation from prepareSnapshotPreBarrier to snapshotState Flink: Fix duplicate data in Flink's upsert writer for format V2 Jun 20, 2024
@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 20, 2024

@pvary
Adding checkpointId to WriteResult will cause changes to the Public API and is incompatible with serialization.
So I added a wrapper class FlinkWriteResult to Flink to store checkpointId.
WDYT?

Can you give me some advice?

@zhongqishang zhongqishang marked this pull request as ready for review June 20, 2024 12:16
@stevenzwu
Copy link
Contributor

@zhongqishang @pvary I have a uber question.

let's say checkpoint N was cancelled or timed out and checkpoint N+1 completed successfully. In this case, do we know all the writer subtasks have flushed data files for checkpoint N and all write results have all been received by the committer?

@pvary
Copy link
Contributor

pvary commented Jul 19, 2024

@zhongqishang @pvary I have a uber question.

let's say checkpoint N was cancelled or timed out and checkpoint N+1 completed successfully. In this case, do we know all the writer subtasks have flushed data files for checkpoint N and all write results have all been received by the committer?

I think we only need to be sure, that every writer which has successfully closed on a given checkpoint is added in an Iceberg commit for the given checkpoint.
If some of the writers are not closed, then they will keep collecting consistent data, and the results of the next checkpoint will be consistent.

What we should be aware, is that the Iceberg commit for the non-successful checkpoint might not contain everything which is received up to the checkpoint time. I think this is ok, since the users could check if the given checkpoint was successful or not

@pvary
Copy link
Contributor

pvary commented Aug 15, 2024

@zhongqishang: Thanks for the patience. I revisited the PR again, and found one potential issue. Could you please check it?
I think we are getting there. Thanks @gyfora for the clarification.

@pvary
Copy link
Contributor

pvary commented Aug 23, 2024

@zhongqishang: Could you please rebase?

# Conflicts:
#	flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
#	flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@zhongqishang
Copy link
Contributor Author

@zhongqishang: Could you please rebase?

@pvary Already rebased, PTAL.

Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from my side.
@stevenzwu: Any more comments?

Shall we add the documentation about the state compatibility/incompatibility in a different PR?

@stevenzwu stevenzwu changed the title Flink: Fix duplicate data in Flink's upsert writer for format V2 Flink: Fix duplicate data with upsert writer in case of aborted checkpoints Aug 26, 2024
@stevenzwu stevenzwu merged commit a7398ab into apache:main Aug 26, 2024
20 checks passed
@stevenzwu
Copy link
Contributor

stevenzwu commented Aug 26, 2024

thanks @zhongqishang for the root cause and fix. and thanks @pvary for the review.

@zhongqishang please create a back port PR to 1.18 and 1.20.

also let's create a standing PR for the next 1.7 release notes (site/docs/releases.md) to clarify the state incompatibility and savepoint is required. we can make that PR for the 1.7 milestone and merge it when 1.7 is ready.

zhongqishang added a commit to zhongqishang/iceberg that referenced this pull request Aug 27, 2024
zhongqishang added a commit to zhongqishang/iceberg that referenced this pull request Aug 27, 2024
pvary pushed a commit that referenced this pull request Aug 30, 2024
@klion26
Copy link
Member

klion26 commented Sep 14, 2024

@stevenzwu @pvary @zhongqishang As 1.6 is the last version that supports JDK8[1], and this patch will fix the data duplication problem which is critical, IMHO, we need to backport this to 1.6.x, what do you think about this?

[1] https://lists.apache.org/thread/5qn6pyfmx6xp0w0cytldvk4kj71k410c

@pvary
Copy link
Contributor

pvary commented Sep 14, 2024

@klion26: It's a tricky question, as the change includes Flink state changes. See: #10526 (comment)

I would try to avoid such incompatibility in a patch release.

@klion26
Copy link
Member

klion26 commented Sep 14, 2024

@pvary thanks for the reply

  1. IMHO, it's fine to not backport this to the old version because of state incompatibility, but 1.6 is the last version that supports JDK8, and users who use JDK8 will always suffer the data quality problems -- which is an "import problem" from my side
  2. the user can stop the job with savepoint and then upgrade to the new version for the state incompatibility problem. we can have doc for that.

Please let me what you think about this, thanks.

jenbaldwin pushed a commit to Teradata/iceberg that referenced this pull request Sep 17, 2024
* main: (208 commits)
  Docs: Fix Flink 1.20 support versions (apache#11065)
  Flink: Fix compile warning (apache#11072)
  Docs: Initial committer guidelines and requirements for merging (apache#10780)
  Core: Refactor ZOrderByteUtils (apache#10624)
  API: implement types timestamp_ns and timestamptz_ns (apache#9008)
  Build: Bump com.google.errorprone:error_prone_annotations (apache#11055)
  Build: Bump mkdocs-material from 9.5.33 to 9.5.34 (apache#11062)
  Flink: Backport PR apache#10526 to v1.18 and v1.20 (apache#11018)
  Kafka Connect: Disable publish tasks in runtime project (apache#11032)
  Flink: add unit tests for range distribution on bucket partition column (apache#11033)
  Spark 3.5: Use FileGenerationUtil in PlanningBenchmark (apache#11027)
  Core: Add benchmark for appending files (apache#11029)
  Build: Ignore benchmark output folders across all modules (apache#11030)
  Spec: Add RemovePartitionSpecsUpdate REST update type (apache#10846)
  Docs: bump latest version to 1.6.1 (apache#11036)
  OpenAPI, Build: Apply spotless to testFixtures source code (apache#11024)
  Core: Generate realistic bounds in benchmarks (apache#11022)
  Add REST Compatibility Kit (apache#10908)
  Flink: backport PR apache#10832 of inferring parallelism in FLIP-27 source (apache#11009)
  Docs: Add Druid docs url to sidebar (apache#10997)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flink sink writes duplicate data in upsert mode
5 participants