Skip to content

Commit

Permalink
[HUDI-2883] Refactor hive sync tool and config
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajesh Mahindra authored and xushiyan committed Mar 19, 2022
1 parent 6843e6d commit 09f0556
Show file tree
Hide file tree
Showing 38 changed files with 1,412 additions and 1,176 deletions.
34 changes: 18 additions & 16 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;

Expand All @@ -43,14 +45,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");
Expand All @@ -75,14 +77,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";

public static final ConfigProperty<String> TBL_NAME = ConfigProperty
.key("hoodie.table.name")
.key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class HoodieTableConfig extends HoodieConfig {

public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup";
public static final String HOODIE_WRITE_TABLE_NAME_KEY = "hoodie.datasource.write.table.name";
public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";

public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
.key("hoodie.database.name")
Expand All @@ -90,7 +92,7 @@ public class HoodieTableConfig extends HoodieConfig {
+ "we can set it to limit the table name under a specific database");

public static final ConfigProperty<String> NAME = ConfigProperty
.key("hoodie.table.name")
.key(HOODIE_TABLE_NAME_KEY)
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with Hive. Needs to be same across runs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

package org.apache.hudi.integ.testsuite.dag.nodes;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.sync.common.HoodieSyncConfig;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
* A hive query node in the DAG of operations for a workflow. used to perform a hive query with given config.
Expand All @@ -46,13 +48,14 @@ public HiveQueryNode(DeltaConfig.Config config) {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive query node {}", this.getName());
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
HiveSyncConfig hiveSyncConfig = DataSourceUtils
.buildHiveSyncConfig(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps(),
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath,
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
TypedProperties properties = new TypedProperties();
properties.putAll(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps());
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH, executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath);
properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
Connection con = DriverManager.getConnection(hiveSyncConfig.jdbcUrl, hiveSyncConfig.hiveUser,
hiveSyncConfig.hivePass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.service.server.HiveServer2;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.testutils.HiveTestService;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
Expand All @@ -46,12 +49,17 @@ public void startLocalHiveServiceIfNeeded(Configuration configuration) throws IO
}

public void syncToLocalHiveIfNeeded(HoodieTestSuiteWriter writer) {
HiveSyncTool hiveSyncTool;
if (this.config.isHiveLocal()) {
writer.getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync()
.syncHive(getLocalHiveServer().getHiveConf());
hiveSyncTool = new HiveSyncTool(writer.getWriteConfig().getProps(),
getLocalHiveServer().getHiveConf(),
FSUtils.getFs(writer.getWriteConfig().getBasePath(), getLocalHiveServer().getHiveConf()));
} else {
writer.getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync().syncHive();
hiveSyncTool = new HiveSyncTool(writer.getWriteConfig().getProps(),
getLocalHiveServer().getHiveConf(),
FSUtils.getFs(writer.getWriteConfig().getBasePath(), writer.getConfiguration()));
}
hiveSyncTool.syncHoodieTable();
}

public void stopLocalHiveServiceIfNeeded() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
Expand Down Expand Up @@ -173,10 +174,10 @@ private static TypedProperties getProperties() {
// Make path selection test suite specific
props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName());
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL().key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), "table1");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "datestr");
props.setProperty(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1");
props.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), "table1");
props.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
props.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), TimestampBasedKeyGenerator.class.getName());

props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
Expand All @@ -59,7 +57,6 @@
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -269,32 +266,4 @@ public static List<WriteStatus> getWriteStatuses(ControlMessage.ParticipantInfo
ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
}

/**
* Build Hive Sync Config
* Note: This method is a temporary solution.
* Future solutions can be referred to: https://issues.apache.org/jira/browse/HUDI-3199
*/
public static HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.basePath = tableBasePath;
hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(KafkaConnectConfigs.HIVE_USE_PRE_APACHE_INPUT_FORMAT, false);
hiveSyncConfig.databaseName = props.getString(KafkaConnectConfigs.HIVE_DATABASE, "default");
hiveSyncConfig.tableName = props.getString(KafkaConnectConfigs.HIVE_TABLE, "");
hiveSyncConfig.hiveUser = props.getString(KafkaConnectConfigs.HIVE_USER, "");
hiveSyncConfig.hivePass = props.getString(KafkaConnectConfigs.HIVE_PASS, "");
hiveSyncConfig.jdbcUrl = props.getString(KafkaConnectConfigs.HIVE_URL, "");
hiveSyncConfig.partitionFields = props.getStringList(KafkaConnectConfigs.HIVE_PARTITION_FIELDS, ",", Collections.emptyList());
hiveSyncConfig.partitionValueExtractorClass =
props.getString(KafkaConnectConfigs.HIVE_PARTITION_EXTRACTOR_CLASS, SlashEncodedDayPartitionValueExtractor.class.getName());
hiveSyncConfig.useJdbc = props.getBoolean(KafkaConnectConfigs.HIVE_USE_JDBC, true);
if (props.containsKey(KafkaConnectConfigs.HIVE_SYNC_MODE)) {
hiveSyncConfig.syncMode = props.getString(KafkaConnectConfigs.HIVE_SYNC_MODE);
}
hiveSyncConfig.autoCreateDatabase = props.getBoolean(KafkaConnectConfigs.HIVE_AUTO_CREATE_DATABASE, true);
hiveSyncConfig.ignoreExceptions = props.getBoolean(KafkaConnectConfigs.HIVE_IGNORE_EXCEPTIONS, false);
hiveSyncConfig.skipROSuffix = props.getBoolean(KafkaConnectConfigs.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, false);
hiveSyncConfig.supportTimestamp = props.getBoolean(KafkaConnectConfigs.HIVE_SUPPORT_TIMESTAMP_TYPE, false);
return hiveSyncConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,17 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -54,7 +49,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
Expand Down Expand Up @@ -167,43 +161,10 @@ private void syncMeta() {
if (connectConfigs.isMetaSyncEnabled()) {
Set<String> syncClientToolClasses = new HashSet<>(
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
for (String impl : syncClientToolClasses) {
impl = impl.trim();
switch (impl) {
case "org.apache.hudi.hive.HiveSyncTool":
syncHive();
break;
default:
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
Properties properties = new Properties();
properties.putAll(connectConfigs.getProps());
properties.put("basePath", tableBasePath);
AbstractSyncTool syncTool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new Class[] {Properties.class, FileSystem.class}, properties, fs);
syncTool.syncHoodieTable();
}
SyncUtilHelpers.createAndSyncHoodieMeta(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue());
}
}
}

private void syncHive() {
HiveSyncConfig hiveSyncConfig = KafkaConnectUtils.buildSyncConfig(new TypedProperties(connectConfigs.getProps()), tableBasePath);
String url;
if (!StringUtils.isNullOrEmpty(hiveSyncConfig.syncMode) && HiveSyncMode.of(hiveSyncConfig.syncMode) == HiveSyncMode.HMS) {
url = hadoopConf.get(KafkaConnectConfigs.HIVE_METASTORE_URIS);
} else {
url = hiveSyncConfig.jdbcUrl;
}

LOG.info("Syncing target hoodie table with hive table("
+ hiveSyncConfig.tableName
+ "). Hive URL :"
+ url
+ ", basePath :" + tableBasePath);
LOG.info("Hive Sync Conf => " + hiveSyncConfig);
FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRD
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
}

/** @deprecated Use {@link HiveSyncConfig} constructor directly and provide the props,
* and set {@link HoodieSyncConfig.META_SYNC_BASE_PATH} and {@link HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT}*/
@Deprecated
public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath, String baseFileFormat) {
checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE().key()));
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
Expand Down
Loading

0 comments on commit 09f0556

Please sign in to comment.