Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Spark structured streaming ingestion into Hudi fails after an upgrade to 0.12.2 #8890

Closed
psendyk opened this issue Jun 5, 2023 · 20 comments
Labels
metadata metadata table spark-streaming spark structured streaming related stability

Comments

@psendyk
Copy link

psendyk commented Jun 5, 2023

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

After an upgrade from 0.12.1 to 0.13.0, ingestion from Kafka into a Hudi table via Spark structured streaming fails on the second micro-batch. When the job is restarted, it fails on the first micro-batch. After reverting the version to 0.12.1 the issue goes away. Each time the upgrade is attempted, the first micro-batch succeeds and the second one fails. The issue seems to occur on an attempt to expand small files which do not exist in the underlying storage.

To Reproduce

Steps to reproduce the behavior:

Use the write options provided in the below section to write data via Spark structured streaming. The job should fail when writing data in the second micro-batch.

Expected behavior

The ingestion job should continue to ingest more micro-batches.

Environment Description

  • Hudi version : 0.13.0

  • Spark version : 3.3.0

  • Hive version : 3.1.3

  • Hadoop version : 3.3.3

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

Table services are set up asynchronously in separate jobs but were not running at the time. There was only one writer into the table at the time. Below are the full write options of the streaming ingestion (some values were redacted):

        "checkpointLocation" -> "<REDACTED>",
        DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
        HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key() -> 200,
        DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key() -> "false",
        HoodieMetricsConfig.TURN_METRICS_ON.key() -> "true",
        HoodieMetricsConfig.METRICS_REPORTER_CLASS_NAME
          .key() -> "<REDACTED>",
        HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key() -> "DATADOG",
        DatadogStatsdMetricsOptions.STATSD_HOST.key() -> writeOptionsArgs.statsDHost,
        DatadogStatsdMetricsOptions.PREFIX.key() -> "<REDACTED>",
        HoodieWriteConfig.TBL_NAME.key -> dataLakeRecord.tableName,
        DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
        DataSourceWriteOptions.RECORDKEY_FIELD.key -> dataLakeRecord.recordKey.mkString(","),
        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> dataLakeRecord.keyGeneratorClassName,
        DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> dataLakeRecord.partitionPathKey,
        DataSourceWriteOptions.PRECOMBINE_FIELD.key -> dataLakeRecord.precombineKey,
        DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true",
        DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key() -> "false",
        HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key() -> "false",
        HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
        HoodieArchivalConfig.AUTO_ARCHIVE.key() -> "false",
        HoodieMetadataConfig.ENABLE.key() -> "false",
        HoodieCleanConfig.AUTO_CLEAN.key() -> "false",
        HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
        DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key() -> "<REDACTED>",
        DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key() -> s"${tableName}-${awsRegion}",
        DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key() -> "us-east-1",
        HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name(),
        HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() -> HoodieFailedWritesCleaningPolicy.LAZY.name(),
        DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER.key() -> s"${writeOptionsArgs.topicName}-ingestion"

Stacktrace

The exact partition values were redacted.

org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :379
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:336)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:342)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:253)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.NoSuchElementException: FileID 1fdf04cc-229d-48a8-8b85-a6951b484fc0-0 of partition path env_id=<REDACTED>/week=<REDACTED> does not exist.
	at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:156)
	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:122)
	at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:64)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:385)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:362)
	at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
	... 29 more

sparkui

@psendyk psendyk changed the title [SUPPORT] [SUPPORT] Spark structured streaming ingestion into Hudi fails with org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition Jun 5, 2023
@psendyk psendyk changed the title [SUPPORT] Spark structured streaming ingestion into Hudi fails with org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition [SUPPORT] Spark structured streaming ingestion into Hudi fails after an upgrade to 0.13.0 Jun 5, 2023
@danny0405
Copy link
Contributor

There seems some inconsistency between data set table and metadata table about the file handle list.

@danny0405 danny0405 added spark-streaming spark structured streaming related metadata metadata table stability labels Jun 6, 2023
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Jun 6, 2023
@xushiyan xushiyan moved this from ⏳ Awaiting Triage to 🚧 Needs Repro in Hudi Issue Support Jun 6, 2023
@psendyk psendyk changed the title [SUPPORT] Spark structured streaming ingestion into Hudi fails after an upgrade to 0.13.0 [SUPPORT] Spark structured streaming ingestion into Hudi fails after an upgrade to 0.12.2 Jun 6, 2023
@psendyk
Copy link
Author

psendyk commented Jun 6, 2023

I've tried upgrading from 0.12.1 to 0.12.2 first and hit the same error. I updated the issue name to reflect that, it seems to be caused by that minor version upgrade.

@danny0405
Copy link
Contributor

Did you have any chance to try the 0.12.3 release then? I kind of believe it is not caused by version upgrade, the in consistency should be a bug.

@psendyk
Copy link
Author

psendyk commented Jun 7, 2023

I've tried both 0.12.2 and 0.13.0, would you like me to also test it with 0.12.3?

@danny0405
Copy link
Contributor

Yeah, we have bunch of fixes for 0.12.3 and 0.13.1.

@psendyk
Copy link
Author

psendyk commented Jun 8, 2023

I've tested it with 0.12.3 and it fails with the same error.

@zyclove
Copy link

zyclove commented Jun 20, 2023

@danny0405 I have same error. Is there any solution please? its stuck.

@psendyk Excuse me, are you ok now? Is there a solution?

@danny0405
Copy link
Contributor

@ad1happy2go Is there any possibility you can re-produce this issue if you have spare time to help, that would be greate.
The user is kind of in emergency.

@psendyk
Copy link
Author

psendyk commented Jun 20, 2023

@psendyk Excuse me, are you ok now? Is there a solution?

@zyclove We've rolled back to 0.12.1 and are waiting with the upgrade until the issue is resolved in a higher version.

@zyclove
Copy link

zyclove commented Jun 21, 2023

@psendyk You add this settings and try it. May be it works.

                .option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), true)
                .option(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), true)
                .option(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true)
                .option(HoodieCommonConfig.RECONCILE_SCHEMA.key(), true)
                .option(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true)
                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.SIMPLE.name())
                .option(HoodieLayoutConfig.LAYOUT_TYPE.key(), HoodieStorageLayout.LayoutType.DEFAULT.name())

and query with configs

set hoodie.schema.on.read.enable=true;
set hoodie.datasource.read.extract.partition.values.from.path=true;

@codope codope moved this from 🚧 Needs Repro to 👤 User Action in Hudi Issue Support Jul 5, 2023
@psendyk
Copy link
Author

psendyk commented Jul 19, 2023

I tested it again using the options @zyclove posted above and the job still fails with the same error. Also, this time I tested it on a fresh table to make sure there were no issues with our production table. I ingested ~1B records from Kafka to a new S3 location, written to ~18k partitions. So it should be reproducible, let me know if you need any additional details.

@ad1happy2go
Copy link
Collaborator

@psendyk Thanks. I will try reproducing it.

@codope codope moved this from 👤 User Action to 🚧 Needs Repro in Hudi Issue Support Jul 19, 2023
@ad1happy2go
Copy link
Collaborator

@psendyk Wanted to confirm, To reproduce Should we recreate the table using 0.12.1 and then upgrade.
Did you followed the similar thing when you tried to test again as you mentioned I tested it on a fresh table

Also, to ingest into Hudi table we suggest to use deltastreamer. Is their any limitation due to which you are forced to use spark structured stream.

@psendyk
Copy link
Author

psendyk commented Jul 19, 2023

Yes, I created a new table and ingested ~1B records using Hudi 0.12.1. Then I restarted the job with 0.13.0 (same issue happened with 0.12.2, 0.12.3); the first micro-batch succeeded and the next one failed.

We haven't looked into the DeltaStreamer to be honest as all of our jobs are written in Spark. Our ingestion job does a bunch of transformations and filtering and adds computed columns. Seems like this might be possible with the transformer class using the DeltaStreamer but we haven't looked into it, we haven't had any issues with Spark Streaming until now.

@ehurheap
Copy link

Any update here @ad1happy2go ? We are blocked on upgrades.

@ad1happy2go
Copy link
Collaborator

@ehurheap @psendyk I tried to reproduce using a input stream with smaller data but couldn't able to reproduce. I am trying with a large dataset to reproduce it.
@psendyk Is this only happening for smaller dataset too in case you tried?

@ad1happy2go
Copy link
Collaborator

ad1happy2go commented Jul 26, 2023

@psendyk This is the gist with code I am using to reproduce - https://gist.github.com/ad1happy2go/a2df5b11c3aff1a15b205b458b6b480a
Can you review and see if anything I am missing which is needed to reproduce this issue.

@psendyk
Copy link
Author

psendyk commented Jul 27, 2023

@ad1happy2go Our initial upgrade attempt only failed for one out of four of our tables; the other three have much lower incoming data volume so perhaps it's related to that. I just trie reproducing the error on another fresh table with less data -- I ingested a single micro-batch (which also created the table) using 0.12.1, and then continued the ingestion with 0.13.0. This time the 0.13.0 job continued to make progress for a couple of micro-batches until I killed it; it didn't run into that issue.
Also, the exception only happens for some partitions in the micro-batch while others are written successfully. Perhaps it can be related to partition cardinality/file size distribution across partitions; each of the micro-batches in our job writes to ~12-15k partitions and the number of records per partition varies quite significantly, probably from a few records min to ~10,000s max. I haven't verified this but given that the issue seems to be "missing small files," I suspect this error might only happen to the partitions with less data/more small files. Perhaps you can attempt to reproduce it by modifying the partitioning schema in your snippet -- not sure how much data you're ingesting but perhaps file sizing is more uniform when only partitioning on year. Let me know if I can provide any other info that'd help with reproducing. Thanks!

@psendyk
Copy link
Author

psendyk commented Aug 3, 2023

I've narrowed down the bug to a specific commit -- I believe it's 218240e from this PR. After I reverted it on the release-0.12.2 branch, built the JAR, and ran the ingestion again, it did not run into any issues. While I don't fully understand why this was a breaking change for us, this commit touches AbstractTableFileSystemView.getLatestFileSlicesBeforeOrOn which is invoked by SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates when computing small files. Fwiw, I don't see anything wrong with that commit itself but perhaps it exposed a different bug.
It seems there's already a fix for it though. 0.13.1 still runs into the same issue but when I ran a master build it proceeded with no issues. Is there an ETA on the 0.14.0 release? Thanks!

Also, I realize the commit I linked was released in 0.12.1 not 0.12.2... Apologies about that, we manage our JARs in our own artifact repository and it must've been mislabeled as 0.12.1 while we're actually running 0.12.0. To confirm, I've downloaded a 0.12.1 version from the Maven repo and ran into the same issue but 0.12.0 worked fine. It shouldn't have affected the reproducibility though as (as @danny0405 already mentioned) it's not a version incompatibility issue but rather a bug; even when I create the table using 0.12.1, the job still fails after a while.

@ad1happy2go
Copy link
Collaborator

Already fixed by - #9879 in master.

Closing this.

@github-project-automation github-project-automation bot moved this from 🚧 Needs Repro to ✅ Done in Hudi Issue Support Aug 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
metadata metadata table spark-streaming spark structured streaming related stability
Projects
Archived in project
Development

No branches or pull requests

5 participants