Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Huge Performance Issue With BLOOM Index On A 1.6 Billion COW Table #11875

Open
silly-carbon opened this issue Sep 4, 2024 · 9 comments

Comments

@silly-carbon
Copy link
Contributor

silly-carbon commented Sep 4, 2024

Describe the problem you faced

Spark Config: spark.driver.cores=1;spark.driver.memory=18g;spark.executor.cores=10;spark.executor.memory=32g;spark.driver.maxResultSize=8g;spark.default.parallelism=400;spark.sql.shuffle.partitions=400;spark.dynamicAllocation.maxExecutors=20;spark.executor.memoryOverhead=3g;spark.kryoserializer.buffer.max=1024m

But HUDI spends many time on HoodieBloomIndex.tagLocation:

image

Previous stages: they are rather quick.
image

And With GC issues:
image

To Reproduce

Steps to reproduce the behavior:

  1. Create a table
    CREATE TABLE temp_db.xxxxxxxxxxx (
    _hoodie_is_deleted BOOLEAN,
    t_pre_combine_field BIGINT,
    order_type INT,
    order_no INT,
    profile_no INT,
    profile_type STRING,
    profile_cat STRING,
    u_version STRING,
    order_line_no INT,
    profile_c STRING,
    profile_i INT,
    profile_f DECIMAL(20,8),
    profile_d TIMESTAMP,
    active STRING,
    entry_datetime TIMESTAMP,
    entry_id INT,
    h_version INT)
    USING hudi
    CLUSTERED BY (order_no, profile_type, profile_no, order_type, profile_cat)
    INTO 2 BUCKETS
    TBLPROPERTIES (
    'primaryKey' = 'order_no,profile_type,profile_no,order_type,profile_cat',
    'hoodie.cleaner.policy.failed.writes' = 'LAZY',
    'type' = 'cow',
    'hoodie.write.lock.filesystem.expire' = '15',
    'preCombineField' = 't_pre_combine_field',
    'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',
    'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',
    'hoodie.index.type' = 'BLOOM'
    );
    2. BULK_INSERT 1.6 Billion Data, this is quick, costing 12 mins

SET spark.sql.parquet.datetimeRebaseModeInWrite = CORRECTED;

set hoodie.datasource.write.operation = bulk_insert;

SET hoodie.combine.before.insert=false;

INSERT OVERWRITE temp_db.xxxxxxxxxxxxx
SELECT FALSE, 1, * FROM ods_us.xxxxxx_source;

3. Insert 1 million data, this is the step with mentioned performance issue.
INSERT INTO temp_db.xxxxxxxxx
(
SELECT TRUE AS _hoodie_is_deleted, * -- 0 rows
FROM ods_us.xxxxxxxx_dddd
UNION ALL
SELECT FALSE AS _hoodie_is_deleted,* -- 1 million rows
FROM ods_us.xxxxxxxxx_stage
)

Expected behavior

UPSERT quickly.

Environment Description

  • Hudi version : hudi-spark3.2-bundle_2.12-0.14.1.jar

  • Spark version : 3.2

  • Hive version : 3.0

  • Hadoop version : 3.0

  • Storage (HDFS/S3/GCS..) : HDFS

  • Running on Docker? (yes/no) : no

Additional context

I've also tried BUKCET index , but BULK_INSERT takes 1h 30 mins, and INSERT failed with this exception:

Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 41) (gsc-bissssss.org executor 1): java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "f2c9f2eb"
at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NumberFormatException: For input string: "f2c9f2eb"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)
at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.lambda$loadBucketIdToFileIdMappingForPartition$0(HoodieSimpleBucketIndex.java:60)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition(HoodieSimpleBucketIndex.java:56)
at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:94)
at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:87)
at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 25 more

Record Level Index failed during BULK_INSERT, also OOM issue.

Stacktrace

Add the stacktrace of the error.

@silly-carbon
Copy link
Contributor Author

Update: This Stage completed after 1.6 hours
Other stages completed all very quickly.
The performance is so bad and I cannot figure out why. Could anyone help with this? Thanks in advance.

image

@danny0405
Copy link
Contributor

It might be the false-positive, can you try bucket index or simple index instead.

@silly-carbon
Copy link
Contributor Author

silly-carbon commented Sep 4, 2024

It might be the false-positive, can you try bucket index or simple index instead.

@danny0405 I've also tried BUCKET index , but BULK_INSERT takes 1h 30 mins, and UPSERT 1 million data later failed with this exception:

Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 41) (gsc-bissssss.org executor 1): java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "f2c9f2eb"
at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NumberFormatException: For input string: "f2c9f2eb"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)
at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.lambda$loadBucketIdToFileIdMappingForPartition$0(HoodieSimpleBucketIndex.java:60)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition(HoodieSimpleBucketIndex.java:56)
at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:94)
at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:87)
at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 25 more

For BUCKET Index, if I don't use BULK_INSERT to load initial data, it fails with OOM issue still.

Record Level Index failed during BULK_INSERT, also OOM issue.

With SimpeIndex, it never ends running , also has OOM issue

@danny0405
Copy link
Contributor

This error indicates that bucket index does not take effect:

Caused by: java.lang.NumberFormatException: For input string: "f2c9f2eb"

@silly-carbon
Copy link
Contributor Author

This error indicates that bucket index does not take effect:

Caused by: java.lang.NumberFormatException: For input string: "f2c9f2eb"

Hi @danny0405
I think that this may be a new issue, and it is easy to reproduce:

  1. BULK_INSERT & Overwrite into a BUCKET index table
  2. UPSERT into this table with some data again

Then this error shows up. Maybe BULK_INSERT is not compatible with BUCKET Index?

@danny0405
Copy link
Contributor

BULK_INSERT & Overwrite into a BUCKET index table

The bucket index for Spark bulk insert is introduced since release 0.14.0, did you use that release?

@silly-carbon
Copy link
Contributor Author

BULK_INSERT & Overwrite into a BUCKET index table

The bucket index for Spark bulk insert is introduced since release 0.14.0, did you use that release?

Yes I use 0.14.1 @danny0405

@silly-carbon
Copy link
Contributor Author

Hi @nsivabalan Could you please help with this?

And I reduced the size of the base table to 100 million, UPSERT still took very long to finish (33 mins)

@ad1happy2go
Copy link
Collaborator

@silly-carbon Did we know how much file groups the job is touching. Is it possible to attach the .hoodie zip (without metadata dir) or share one commit file to look further into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Awaiting Triage
Development

No branches or pull requests

3 participants