Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] restart flink job got InvalidAvroMagicException: Not an Avro data file #10285

Closed
zlinsc opened this issue Dec 8, 2023 · 8 comments
Closed
Labels
flink Issues related to flink stability table-service

Comments

@zlinsc
Copy link

zlinsc commented Dec 8, 2023

Describe the problem you faced

hudi sink job cannot restart normally from checkpoint because of InvalidAvroMagicException (CleanFunction)

This sink job does data writing and generates compaction plan. I also set HoodieCleanConfig.AUTO_CLEAN.key() -> "false" and deploy table compaction server alone to execute compaction plan and do some cleaning.

I also wonder to know why clean commits operation still exists when I close AUTO_CLEAN:
image

Here is my hudi sink job config:

FlinkOptions.PATH.key() -> targetHdfsPath,
FlinkOptions.TABLE_TYPE.key() -> HoodieTableType.MERGE_ON_READ.name(),
FlinkOptions.OPERATION.key() -> WriteOperationType.UPSERT.value(),

FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key() -> "true",
FlinkOptions.COMPACTION_ASYNC_ENABLED.key() -> "false",
FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key() -> FlinkOptions.NUM_OR_TIME,
FlinkOptions.COMPACTION_DELTA_COMMITS.key() -> "5",
FlinkOptions.COMPACTION_DELTA_SECONDS.key() -> "1800",

HoodieCleanConfig.AUTO_CLEAN.key() -> "false",
HoodieArchivalConfig.ASYNC_ARCHIVE.key() -> "true",

And this is my table compaction server config:

val cfg = new FlinkCompactionConfig()
cfg.cleanPolicy = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()
cfg.cleanRetainCommits = 10080

Environment Description

  • Hudi version : 0.14.0

  • Hadoop version : 3.2.1

  • Storage (HDFS/S3/GCS..) : hdfs

  • Running on Docker? (yes/no) : no

Stacktrace

2023-12-08 22:34:28,663 ERROR org.apache.hudi.sink.CleanFunction                           [] - Executor executes action [wait for cleaning finish] error
org.apache.hudi.exception.HoodieException: org.apache.hudi.org.apache.avro.InvalidAvroMagicException: Not an Avro data file
        at org.apache.hudi.common.util.CompactionUtils.getCompactionPlan(CompactionUtils.java:201) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.util.CompactionUtils.getCompactionPlan(CompactionUtils.java:189) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.util.CompactionUtils.lambda$getCompactionPlansByTimeline$4(CompactionUtils.java:163) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_251]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_251]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_251]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_251]
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_251]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_251]
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_251]
        at org.apache.hudi.common.util.CompactionUtils.getCompactionPlansByTimeline(CompactionUtils.java:164) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.util.CompactionUtils.getAllPendingCompactionPlans(CompactionUtils.java:133) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.util.CompactionUtils.getAllPendingCompactionOperations(CompactionUtils.java:213) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:121) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:115) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:109) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:176) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$8be8b1a6$1(FileSystemViewManager.java:270) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:115) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) ~[?:1.8.0_251]
        at org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:114) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.table.HoodieTable.getHoodieView(HoodieTable.java:341) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.table.action.clean.CleanPlanner.<init>(CleanPlanner.java:93) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.requestClean(CleanPlanActionExecutor.java:105) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.requestClean(CleanPlanActionExecutor.java:151) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.execute(CleanPlanActionExecutor.java:177) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.scheduleCleaning(HoodieFlinkCopyOnWriteTable.java:359) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:628) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:751) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:861) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:834) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.sink.CleanFunction.lambda$open$0(CleanFunction.java:70) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_251]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_251]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: org.apache.hudi.org.apache.avro.InvalidAvroMagicException: Not an Avro data file
        at org.apache.hudi.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:57) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:207) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCompactionPlan(TimelineMetadataUtils.java:169) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        at org.apache.hudi.common.util.CompactionUtils.getCompactionPlan(CompactionUtils.java:198) ~[hudi-flink1.17-bundle-0.14.0.jar:0.14.0]
        ... 35 more
@zlinsc
Copy link
Author

zlinsc commented Dec 8, 2023

Actually I found that org.apache.hudi.util.FlinkWriteClients#getHoodieClientConfig function does not set withAutoClean by default, even though I had closed AUTO_CLEAN in hudi sink config.
image

Is there any problem when hudi sink job generate compaction plan content?

@danny0405
Copy link
Contributor

danny0405 commented Dec 9, 2023

The clean function is always there but it will stop cleaning if you set up clean.async.enabled as false. It seems you got a corrupt compaction plan file (which is actually the xxx.compaction.requested file), you need to find out this file and roll back it manually through HUDI CLI or just remove this file by hand.

You can check the hoodie timeline for existing inflight/requested compaction metdata file, which situate at folder: ./hoodie

@zlinsc
Copy link
Author

zlinsc commented Dec 9, 2023

The clean function is always there but it will stop cleaning if you set up clean.async.enabled as false. It seems you got a corrupt compaction plan file (which is actually the xxx.compaction.requested file), you need to find out this file and roll back it manually through HUDI CLI or just remove this file by hand.

You can check the hoodie timeline for existing inflight/requested compaction metdata file, which siturate at folder: ./hoodie

thx danny. I found that compaction file is empty. After I remove this empty file, this job can start up again. Is there another automatical way to resolve this case without removing this file manually in the future?

Another question after I close clean_async_enable in hudi sink job using

FlinkOptions.CLEAN_ASYNC_ENABLED.key() -> "false",

this job still has clean_commits step in job graph. Actually the realization of org.apache.hudi.table.HoodieTableSink#getSinkRuntimeProvider is
image
when I turn off compaction.async.enabled, it will do clean_commits.

@danny0405
Copy link
Contributor

It only triggers once in the #open method.

@zlinsc
Copy link
Author

zlinsc commented Dec 10, 2023

It only triggers once in the #open method.

@danny0405 I see. So what is the purpose hudi sink trigger this clean action when the job start? Now I found even I run a compaction service outside the sink job with clean trigger strategy, this clean action in #open method will clean old files because I don't set the same trigger strategy. This impact is not as expected.

@danny0405
Copy link
Contributor

It is designed for batch execution job, maybe we should add back the decision with clean.async.enabled check: #6515, we trigger the cleaning on #open just because we have cooperation from batch and streaming jobs, if we can know the execution is unbounded source, then we can avoid this cleaning.

@danny0405
Copy link
Contributor

@github-project-automation github-project-automation bot moved this from ⏳ Awaiting Triage to ✅ Done in Hudi Issue Support Dec 12, 2023
@zlinsc
Copy link
Author

zlinsc commented Dec 12, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink stability table-service
Projects
Archived in project
Development

No branches or pull requests

2 participants