Skip to content

Commit

Permalink
[HUDI-4571] Fix partition extractor infer function when partition fie…
Browse files Browse the repository at this point in the history
…ld mismatch (#6333)

Infer META_SYNC_PARTITION_FIELDS and 
META_SYNC_PARTITION_EXTRACTOR_CLASS 
from hoodie.table.partition.fields first. 
If not set, then from hoodie.datasource.write.partitionpath.field.

Co-authored-by: Raymond Xu <[email protected]>
  • Loading branch information
codope and xushiyan authored Aug 9, 2022
1 parent 417cca9 commit 75f6266
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.hive.replication;

import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.hive.testutils.HiveTestCluster;

import org.apache.hadoop.fs.Path;
Expand All @@ -41,6 +42,7 @@
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -75,6 +77,7 @@ private HiveSyncGlobalCommitParams getGlobalCommitConfig(String commitTime) thro
params.loadedProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true");
params.loadedProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
params.loadedProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
params.loadedProps.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), SlashEncodedDayPartitionValueExtractor.class.getName());
return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT;
import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_NAME_KEY;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY;
import static org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;

/**
* Configs needed to sync data into external meta stores, catalogs, etc.
Expand Down Expand Up @@ -72,31 +75,38 @@ public class HoodieSyncConfig extends HoodieConfig {
public static final ConfigProperty<String> META_SYNC_TABLE_NAME = ConfigProperty
.key("hoodie.datasource.hive_sync.table")
.defaultValue("unknown")
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(HOODIE_WRITE_TABLE_NAME_KEY))
.or(() -> Option.ofNullable(cfg.getString(HOODIE_TABLE_NAME_KEY))))
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(HOODIE_TABLE_NAME_KEY))
.or(() -> Option.ofNullable(cfg.getString(HOODIE_WRITE_TABLE_NAME_KEY))))
.withDocumentation("The name of the destination table that we should sync the hudi table to.");

public static final ConfigProperty<String> META_SYNC_BASE_FILE_FORMAT = ConfigProperty
.key("hoodie.datasource.hive_sync.base_file_format")
.defaultValue("PARQUET")
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(HoodieTableConfig.BASE_FILE_FORMAT)))
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(BASE_FILE_FORMAT)))
.withDocumentation("Base file format for the sync.");

public static final ConfigProperty<String> META_SYNC_PARTITION_FIELDS = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_fields")
.defaultValue("")
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)))
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
.or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
.withDocumentation("Field in the table to use for determining hive partition columns.");

public static final ConfigProperty<String> META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_extractor_class")
.defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor")
.withInferFunction(cfg -> {
if (StringUtils.nonEmpty(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))) {
int numOfPartFields = cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME).split(",").length;
Option<String> partitionFieldsOpt = Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
.or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
if (!partitionFieldsOpt.isPresent()) {
return Option.empty();
}
String partitionFields = partitionFieldsOpt.get();
if (StringUtils.nonEmpty(partitionFields)) {
int numOfPartFields = partitionFields.split(",").length;
if (numOfPartFields == 1
&& cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
&& cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) {
&& cfg.contains(HIVE_STYLE_PARTITIONING_ENABLE)
&& cfg.getString(HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) {
return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
} else {
return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
Expand All @@ -106,7 +116,7 @@ public class HoodieSyncConfig extends HoodieConfig {
}
})
.withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, "
+ "default 'SlashEncodedDayPartitionValueExtractor'.");
+ "default 'org.apache.hudi.hive.MultiPartKeysValueExtractor'.");

public static final ConfigProperty<String> META_SYNC_ASSUME_DATE_PARTITION = ConfigProperty
.key("hoodie.datasource.hive_sync.assume_date_partitioning")
Expand All @@ -117,7 +127,7 @@ public class HoodieSyncConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> META_SYNC_DECODE_PARTITION = ConfigProperty
.key("hoodie.meta.sync.decode_partition")
.defaultValue(false)
.withInferFunction(cfg -> Option.ofNullable(cfg.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING)))
.withInferFunction(cfg -> Option.ofNullable(cfg.getBoolean(URL_ENCODE_PARTITIONING)))
.withDocumentation("If true, meta sync will url-decode the partition path, as it is deemed as url-encoded. Default to false.");

public static final ConfigProperty<Boolean> META_SYNC_USE_FILE_LISTING_FROM_METADATA = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,69 @@ void testInferBaseFileFormat() {

@Test
void testInferPartitionFields() {
Properties props0 = new Properties();
HoodieSyncConfig config0 = new HoodieSyncConfig(props0, new Configuration());
assertEquals("", config0.getStringOrDefault(META_SYNC_PARTITION_FIELDS),
String.format("should get default value due to absence of both %s and %s",
HoodieTableConfig.PARTITION_FIELDS.key(), KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));

Properties props1 = new Properties();
props1.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo,bar");
props1.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "foo,bar,baz");
HoodieSyncConfig config1 = new HoodieSyncConfig(props1, new Configuration());
assertEquals("foo,bar", config1.getStringOrDefault(META_SYNC_PARTITION_FIELDS));
assertEquals("foo,bar,baz", config1.getStringOrDefault(META_SYNC_PARTITION_FIELDS),
String.format("should infer from %s", HoodieTableConfig.PARTITION_FIELDS.key()));

Properties props2 = new Properties();
props2.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo,bar");
HoodieSyncConfig config2 = new HoodieSyncConfig(props2, new Configuration());
assertEquals("foo,bar", config2.getStringOrDefault(META_SYNC_PARTITION_FIELDS),
String.format("should infer from %s", KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));

Properties props3 = new Properties();
props3.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "foo,bar,baz");
props3.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo,bar");
HoodieSyncConfig config3 = new HoodieSyncConfig(props3, new Configuration());
assertEquals("foo,bar,baz", config3.getStringOrDefault(META_SYNC_PARTITION_FIELDS),
String.format("should infer from %s, which has higher precedence.", HoodieTableConfig.PARTITION_FIELDS.key()));

}

@Test
void testInferPartitonExtractorClass() {
Properties props0 = new Properties();
HoodieSyncConfig config0 = new HoodieSyncConfig(props0, new Configuration());
assertEquals("org.apache.hudi.hive.MultiPartKeysValueExtractor",
config0.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
String.format("should get default value due to absence of both %s and %s",
HoodieTableConfig.PARTITION_FIELDS.key(), KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));

Properties props1 = new Properties();
props1.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo,bar");
props1.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "");
HoodieSyncConfig config1 = new HoodieSyncConfig(props1, new Configuration());
assertEquals("org.apache.hudi.hive.MultiPartKeysValueExtractor",
config1.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
assertEquals("org.apache.hudi.hive.NonPartitionedExtractor",
config1.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
String.format("should infer from %s", HoodieTableConfig.PARTITION_FIELDS.key()));

Properties props2 = new Properties();
props2.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo");
props2.setProperty(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), "true");
props2.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo,bar");
HoodieSyncConfig config2 = new HoodieSyncConfig(props2, new Configuration());
assertEquals("org.apache.hudi.hive.HiveStylePartitionValueExtractor",
config2.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
assertEquals("org.apache.hudi.hive.MultiPartKeysValueExtractor",
config2.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
String.format("should infer from %s", KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));

HoodieSyncConfig config3 = new HoodieSyncConfig(new Properties(), new Configuration());
Properties props3 = new Properties();
props3.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "");
props3.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo,bar");
HoodieSyncConfig config3 = new HoodieSyncConfig(props3, new Configuration());
assertEquals("org.apache.hudi.hive.NonPartitionedExtractor",
config3.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
config3.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
String.format("should infer from %s, which has higher precedence.", HoodieTableConfig.PARTITION_FIELDS.key()));

Properties props4 = new Properties();
props4.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "");
props4.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "foo");
props4.setProperty(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key(), "true");
HoodieSyncConfig config4 = new HoodieSyncConfig(props4, new Configuration());
assertEquals("org.apache.hudi.hive.NonPartitionedExtractor",
assertEquals("org.apache.hudi.hive.HiveStylePartitionValueExtractor",
config4.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
}

Expand Down

0 comments on commit 75f6266

Please sign in to comment.