Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2442] Change default values for certain clustering configs #3875

Merged
merged 1 commit into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Number of partitions to list to create ClusteringPlan");

public static final ConfigProperty<String> PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit")
.defaultValue(String.valueOf(600 * 1024 * 1024L))
.sinceVersion("0.7.0")
.withDocumentation("Files smaller than the size specified here are candidates for clustering");

public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = ConfigProperty
.key("hoodie.clustering.plan.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy")
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy")
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+ "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
+ DAYBASED_LOOKBACK_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
+ "i.e select what file groups are being clustered. Default strategy, looks at the clustering small file size limit (determined by "
+ PLAN_STRATEGY_SMALL_FILE_LIMIT.key() + ") to pick the small file slices within partitions for clustering.");

public static final ConfigProperty<String> EXECUTION_STRATEGY_CLASS_NAME = ConfigProperty
.key("hoodie.clustering.execution.strategy.class")
Expand Down Expand Up @@ -86,12 +92,6 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan");

public static final ConfigProperty<String> PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit")
.defaultValue(String.valueOf(600 * 1024 * 1024L))
.sinceVersion("0.7.0")
.withDocumentation("Files smaller than the size specified here are candidates for clustering");

public static final ConfigProperty<String> PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
.defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L))
Expand Down Expand Up @@ -133,7 +133,7 @@ public class HoodieClusteringConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
.key("hoodie.clustering.preserve.commit.metadata")
.defaultValue(false)
.defaultValue(true)
.sinceVersion("0.9.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {

// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("_row_key")
.withClusteringSortColumns("_row_key").withInlineClustering(true)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();

HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ public void testDeletesWithDeleteApi() throws Exception {
public void testSimpleClustering(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
Expand All @@ -1365,7 +1365,7 @@ public void testClusteringWithSortColumns(boolean populateMetaFields, boolean pr
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
Expand All @@ -1375,7 +1375,7 @@ public void testClusteringWithSortColumns(boolean populateMetaFields, boolean pr
public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();

// start clustering, but don't commit
List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, populateMetaFields, false);
Expand Down Expand Up @@ -1406,7 +1406,7 @@ public void testPendingClusteringRollback(boolean populateMetaFields) throws Exc
public void testClusteringWithFailingValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("_hoodie_record_key")
.withClusteringSortColumns("_hoodie_record_key").withInlineClustering(true)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, true, true, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
Expand All @@ -1420,7 +1420,7 @@ public void testClusteringWithFailingValidator() throws Exception {
public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
try {
testInsertAndClustering(clusteringConfig, false, true, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
fail("expected pre-commit clustering validation to fail because sql query is not configured");
Expand All @@ -1433,7 +1433,7 @@ public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();

testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(),
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#400");
Expand All @@ -1443,7 +1443,7 @@ public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws
public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();

try {
testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0)
.withInlineClustering(true)
.withInlineClusteringNumCommits(1)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build())
.withRollbackUsingMarkers(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,13 +703,7 @@ class TestMORDataSource extends HoodieClientTestBase {
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// option for clustering
.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "1")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(12 *1024 * 1024L))
.option("hoodie.clustering.plan.strategy.sort.columns", "begin_lat, begin_lon")
.mode(SaveMode.Overwrite)
.save(basePath)
Expand Down