Skip to content

Commit

Permalink
remove the tightBound config
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Nov 13, 2024
1 parent c6d9713 commit 9f8f90f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,6 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating partition stats index.");

public static final ConfigProperty<Boolean> PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE = ConfigProperty
.key(METADATA_PREFIX + ".index.partition.stats.consolidate.on.every.write")
.defaultValue(true)
.sinceVersion("1.0.0")
.withDocumentation("When enabled, partition stats is consolidated is computed on every commit for the min/max value of every column "
+ "at the storage partition level. Typically, the min/max range for each column can become wider (i.e. the minValue is <= all valid values "
+ "in the file, and the maxValue >= all valid values in the file) with updates and deletes. If this config is enabled, "
+ "the min/max range will be updated to the tight bound of the valid values after every commit for the partitions touched.");

public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".index.secondary.enable")
.defaultValue(false)
Expand Down Expand Up @@ -534,10 +525,6 @@ public int getPartitionStatsIndexParallelism() {
return getInt(PARTITION_STATS_INDEX_PARALLELISM);
}

public boolean isPartitionStatsIndexConsolidationEnabledOnEveryWrite() {
return isPartitionStatsIndexEnabled() && getBooleanOrDefault(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE);
}

public boolean isSecondaryIndexEnabled() {
// Secondary index is enabled only iff record index (primary key index) is also enabled
return isRecordIndexEnabled() && getBoolean(SECONDARY_INDEX_ENABLE_PROP);
Expand Down Expand Up @@ -749,11 +736,6 @@ public Builder withPartitionStatsIndexParallelism(int parallelism) {
return this;
}

public Builder withPartitionStatsIndexConsolidationEnabledOnEveryWrite(boolean enable) {
metadataConfig.setValue(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE, String.valueOf(enable));
return this;
}

public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2251,8 +2251,7 @@ public static HoodieData<HoodieRecord> convertMetadataToPartitionStatsRecords(Ho
.collect(Collectors.toList());

int parallelism = Math.max(Math.min(partitionedWriteStats.size(), metadataConfig.getPartitionStatsIndexParallelism()), 1);
boolean shouldScanColStatsForTightBound = MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
&& metadataConfig.isPartitionStatsIndexConsolidationEnabledOnEveryWrite();
boolean shouldScanColStatsForTightBound = MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient);
HoodieTableMetadata tableMetadata;
if (shouldScanColStatsForTightBound) {
tableMetadata = HoodieTableMetadata.create(engineContext, dataMetaClient.getStorage(), metadataConfig, dataMetaClient.getBasePath().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,89 +285,85 @@ class TestPartitionStatsIndexWithSql extends HoodieSparkSqlTestBase {
* 2. Update to widen the bounds: Perform updates to increase the price values, causing the min/max stats to widen.
* 3. Update to remove or lower the max value: Delete or update the record that holds the max value to simulate a
* scenario where the bounds need to be tightened.
* 4. Trigger stats recomputation: Assuming the config for tighter bounds is enabled, validate that the partition stats
* 4. Trigger stats recomputation: Assuming tighter bounds is enabled on every write, validate that the partition stats
* have adjusted correctly after scanning and recomputing accurate stats.
*/
test("Test partition stats index with tight bound") {
Seq("cow", "mor").foreach { tableType =>
Seq("true", "false").foreach { isPartitionStatsIndexConsolidationEnabledOnEveryWrite =>
withTempDir { tmp =>
val tableName = generateTableName + s"_tight_bound_$isPartitionStatsIndexConsolidationEnabledOnEveryWrite"
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price int,
| ts long
|) using hudi
|partitioned by (ts)
|tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'price',
| hoodie.metadata.index.partition.stats.enable = 'true',
| hoodie.metadata.index.partition.stats.consolidate.on.every.write = '$isPartitionStatsIndexConsolidationEnabledOnEveryWrite',
| hoodie.metadata.index.column.stats.enable = 'true',
| hoodie.metadata.index.column.stats.column.list = 'price'
|)
|location '$tablePath'
|""".stripMargin
)
withTempDir { tmp =>
val tableName = generateTableName + s"_tight_bound_$tableType"
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price int,
| ts long
|) using hudi
|partitioned by (ts)
|tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'price',
| hoodie.metadata.index.partition.stats.enable = 'true',
| hoodie.metadata.index.column.stats.enable = 'true',
| hoodie.metadata.index.column.stats.column.list = 'price'
|)
|location '$tablePath'
|""".stripMargin
)

/**
* Insert: Insert values for price across multiple partitions (ts=10, ts=20, ts=30):
*
* Partition ts=10: price = 1000, 1500
* Partition ts=20: price = 2000, 2500
* Partition ts=30: price = 3000, 3500
*
* This will initialize the partition stats with bounds like [1000, 1500], [2000, 2500], and [3000, 3500].
*/
spark.sql(s"insert into $tableName values (1, 'a1', 1000, 10), (2, 'a2', 1500, 10)")
spark.sql(s"insert into $tableName values (3, 'a3', 2000, 20), (4, 'a4', 2500, 20)")
spark.sql(s"insert into $tableName values (5, 'a5', 3000, 30), (6, 'a6', 3500, 30)")

// validate partition stats initialization
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3500)
)
/**
* Insert: Insert values for price across multiple partitions (ts=10, ts=20, ts=30):
*
* Partition ts=10: price = 1000, 1500
* Partition ts=20: price = 2000, 2500
* Partition ts=30: price = 3000, 3500
*
* This will initialize the partition stats with bounds like [1000, 1500], [2000, 2500], and [3000, 3500].
*/
spark.sql(s"insert into $tableName values (1, 'a1', 1000, 10), (2, 'a2', 1500, 10)")
spark.sql(s"insert into $tableName values (3, 'a3', 2000, 20), (4, 'a4', 2500, 20)")
spark.sql(s"insert into $tableName values (5, 'a5', 3000, 30), (6, 'a6', 3500, 30)")

// validate partition stats initialization
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3500)
)

// First Update (widen the bounds): Update the price in partition ts=30, where price = 4000 for id=6.
// This will widen the max bounds in ts=30 from 3500 to 4000.
spark.sql(s"update $tableName set price = 4000 where id = 6")
// Validate widened stats
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4000, isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean)
)
// verify file pruning
var metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", HoodieMetadataConfig.ENABLE.key -> "true"),
GreaterThan(AttributeReference("price", IntegerType)(), Literal(3000)), metaClient, isDataSkippingExpected = true)

// Second update (reduce max value)
spark.sql(s"delete from $tableName where id = 6")
// Validate that stats have recomputed and tightened
// if tighter bound, note that record with prev max was deleted
val expectedMaxValue = if(isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean) 3000 else 4000
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, expectedMaxValue, isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean)
)
// verify file pruning
metaClient = HoodieTableMetaClient.reload(metaClient)
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", HoodieMetadataConfig.ENABLE.key -> "true"),
GreaterThan(AttributeReference("price", IntegerType)(), Literal(3000)), metaClient, isDataSkippingExpected = true, isNoScanExpected = true)
}
// First Update (widen the bounds): Update the price in partition ts=30, where price = 4000 for id=6.
// This will widen the max bounds in ts=30 from 3500 to 4000.
spark.sql(s"update $tableName set price = 4000 where id = 6")
// Validate widened stats
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4000, true)
)
// verify file pruning
var metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", HoodieMetadataConfig.ENABLE.key -> "true"),
GreaterThan(AttributeReference("price", IntegerType)(), Literal(3000)), metaClient, isDataSkippingExpected = true)

// Second update (reduce max value)
spark.sql(s"delete from $tableName where id = 6")
// Validate that stats have recomputed and tightened
// if tighter bound, note that record with prev max was deleted
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3000, true)
)
// verify file pruning
metaClient = HoodieTableMetaClient.reload(metaClient)
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", HoodieMetadataConfig.ENABLE.key -> "true"),
GreaterThan(AttributeReference("price", IntegerType)(), Literal(3000)), metaClient, isDataSkippingExpected = true, isNoScanExpected = true)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,6 @@ public void testPartitionStatsValidation(String tableType) {
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
writeOptions.put("hoodie.table.name", "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType);
if (tableType.equals("COPY_ON_WRITE")) {
writeOptions.put(HoodieMetadataConfig.PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE.key(), "true");
}
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp");
writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition_path");
Expand Down

0 comments on commit 9f8f90f

Please sign in to comment.