Skip to content

Commit

Permalink
[HUDI-7127] Fixing set up and tear down in tests (apache#10146)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored Nov 21, 2023
1 parent eaba114 commit 0c4f3a3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,9 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS
@BeforeEach
override def setUp() {
setTableName("hoodie_test")
super.setUp()
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
initTestDataGenerator()
initFileSystem()
initMetaClient()

queryOpts = queryOpts ++ Map("path" -> basePath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,29 @@ package org.apache.hudi

import org.apache.spark.sql._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.util.{AccumulatorV2}
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.SparkContext

import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}


import org.junit.jupiter.api.Assertions.{assertEquals}
import org.junit.jupiter.api.{BeforeEach}
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{EnumSource}

class TestHoodieParquetBloomFilter {

var spark: SparkSession = _
var sqlContext: SQLContext = _
var sc: SparkContext = _
import org.junit.jupiter.params.provider.EnumSource

def initSparkContext(): Unit = {
val sparkConf = getSparkConfForTest(getClass.getSimpleName)

spark = SparkSession.builder()
.withExtensions(new HoodieSparkSessionExtension)
.config(sparkConf)
.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark.sqlContext
}

@BeforeEach
def setUp() {
initSparkContext()
}
class TestHoodieParquetBloomFilter extends HoodieSparkClientTestBase with ScalaAssertionSupport {

@ParameterizedTest
@EnumSource(value = classOf[WriteOperationType], names = Array("BULK_INSERT", "INSERT", "UPSERT", "INSERT_OVERWRITE"))
def testBloomFilter(operation: WriteOperationType): Unit = {
// setup hadoop conf with bloom col enabled
spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.enabled#bloom_col", "true")
spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.expected.ndv#bloom_col", "2")
jsc.hadoopConfiguration.set("parquet.bloom.filter.enabled#bloom_col", "true")
jsc.hadoopConfiguration.set("parquet.bloom.filter.expected.ndv#bloom_col", "2")
// ensure nothing but bloom can trigger read skip
spark.sql("set parquet.filter.columnindex.enabled=false")
spark.sql("set parquet.filter.stats.enabled=false")
sparkSession.sql("set parquet.filter.columnindex.enabled=false")
sparkSession.sql("set parquet.filter.stats.enabled=false")

val basePath = java.nio.file.Files.createTempDirectory("hoodie_bloom_source_path").toAbsolutePath.toString
val opts = Map(
Expand All @@ -75,7 +51,7 @@ class TestHoodieParquetBloomFilter {
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition"
)
val inputDF = spark.sql(
val inputDF = sparkSession.sql(
"""select '0' as _row_key, '1' as bloom_col, '2' as partition, '3' as ts
|union
|select '1', '2', '3', '4'
Expand All @@ -86,19 +62,19 @@ class TestHoodieParquetBloomFilter {
.save(basePath)

val accu = new NumRowGroupsAcc
spark.sparkContext.register(accu)
sparkSession.sparkContext.register(accu)

// this one shall skip partition scanning thanks to bloom when spark >=3
spark.read.format("hudi").load(basePath).filter("bloom_col = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
sparkSession.read.format("hudi").load(basePath).filter("bloom_col = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
assertEquals(if (currentSparkSupportParquetBloom()) 0 else 1, accu.value)

// this one will trigger one partition scan
spark.read.format("hudi").load(basePath).filter("bloom_col = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
sparkSession.read.format("hudi").load(basePath).filter("bloom_col = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
assertEquals(1, accu.value)
}

def currentSparkSupportParquetBloom(): Boolean = {
Integer.valueOf(spark.version.charAt(0)) >= 3
Integer.valueOf(sparkSession.version.charAt(0)) >= 3
}
}

Expand Down

0 comments on commit 0c4f3a3

Please sign in to comment.