Skip to content

Commit

Permalink
Revert "[HUDI-2883] Refactor hive sync tool / config to use reflectio…
Browse files Browse the repository at this point in the history
…n and standardize configs (apache#4175)"

This reverts commit 5f570ea.
  • Loading branch information
Alexey Kudinkin committed Mar 23, 2022
1 parent 0f6b38f commit 6accc83
Show file tree
Hide file tree
Showing 43 changed files with 1,219 additions and 1,523 deletions.
34 changes: 16 additions & 18 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;

Expand All @@ -45,14 +43,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");
Expand All @@ -77,14 +75,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";

public static final ConfigProperty<String> TBL_NAME = ConfigProperty
.key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)
.key("hoodie.table.name")
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ public class HoodieTableConfig extends HoodieConfig {

public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup";
public static final String HOODIE_WRITE_TABLE_NAME_KEY = "hoodie.datasource.write.table.name";
public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";

public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
.key("hoodie.database.name")
Expand All @@ -92,7 +90,7 @@ public class HoodieTableConfig extends HoodieConfig {
+ "we can set it to limit the table name under a specific database");

public static final ConfigProperty<String> NAME = ConfigProperty
.key(HOODIE_TABLE_NAME_KEY)
.key("hoodie.table.name")
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with Hive. Needs to be same across runs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,6 @@ public static Object loadClass(String clazz, Class<?>[] constructorArgTypes, Obj
}
}

/**
* Check if the clazz has the target constructor or not.
*
* When catch {@link HoodieException} from {@link #loadClass}, it's inconvenient to say if the exception was thrown
* due to the instantiation's own logic or missing constructor.
*
* TODO: ReflectionUtils should throw a specific exception to indicate Reflection problem.
*/
public static boolean hasConstructor(String clazz, Class<?>[] constructorArgTypes) {
try {
getClass(clazz).getConstructor(constructorArgTypes);
return true;
} catch (NoSuchMethodException e) {
LOG.warn("Unable to instantiate class " + clazz, e);
return false;
}
}

/**
* Creates an instance of the given class. Constructor arg types are inferred.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@

package org.apache.hudi.integ.testsuite.dag.nodes;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.sync.common.HoodieSyncConfig;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;

/**
* A hive query node in the DAG of operations for a workflow. used to perform a hive query with given config.
Expand All @@ -48,14 +46,13 @@ public HiveQueryNode(DeltaConfig.Config config) {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive query node {}", this.getName());
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
TypedProperties properties = new TypedProperties();
properties.putAll(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps());
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath);
properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
HiveSyncConfig hiveSyncConfig = DataSourceUtils
.buildHiveSyncConfig(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps(),
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath,
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
Connection con = DriverManager.getConnection(hiveSyncConfig.jdbcUrl, hiveSyncConfig.hiveUser,
hiveSyncConfig.hivePass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@

package org.apache.hudi.integ.testsuite.dag.nodes;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;

import org.apache.hadoop.fs.Path;

/**
* Represents a hive sync node in the DAG of operations for a workflow. Helps to sync hoodie data to hive table.
*/
public class HiveSyncNode extends DagNode<Boolean> {

private HiveServiceProvider hiveServiceProvider;

public HiveSyncNode(Config config) {
this.config = config;
this.hiveServiceProvider = new HiveServiceProvider(config);
}

@Override
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive sync node");
SyncUtilHelpers.runHoodieMetaSync(HiveSyncTool.class.getName(), new TypedProperties(executionContext.getHoodieTestSuiteWriter().getProps()),
executionContext.getHoodieTestSuiteWriter().getConfiguration(),
new Path(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath).getFileSystem(executionContext.getHoodieTestSuiteWriter().getConfiguration()),
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath, executionContext.getHoodieTestSuiteWriter().getCfg().baseFileFormat);
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
}

public HiveServiceProvider getHiveServiceProvider() {
return hiveServiceProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.service.server.HiveServer2;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.testutils.HiveTestService;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
Expand All @@ -49,17 +46,12 @@ public void startLocalHiveServiceIfNeeded(Configuration configuration) throws IO
}

public void syncToLocalHiveIfNeeded(HoodieTestSuiteWriter writer) {
HiveSyncTool hiveSyncTool;
if (this.config.isHiveLocal()) {
hiveSyncTool = new HiveSyncTool(writer.getWriteConfig().getProps(),
getLocalHiveServer().getHiveConf(),
FSUtils.getFs(writer.getWriteConfig().getBasePath(), getLocalHiveServer().getHiveConf()));
writer.getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync()
.syncHive(getLocalHiveServer().getHiveConf());
} else {
hiveSyncTool = new HiveSyncTool(writer.getWriteConfig().getProps(),
getLocalHiveServer().getHiveConf(),
FSUtils.getFs(writer.getWriteConfig().getBasePath(), writer.getConfiguration()));
writer.getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync().syncHive();
}
hiveSyncTool.syncHoodieTable();
}

public void stopLocalHiveServiceIfNeeded() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
Expand Down Expand Up @@ -174,10 +173,10 @@ private static TypedProperties getProperties() {
// Make path selection test suite specific
props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName());
// Hive Configs
props.setProperty(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1");
props.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), "table1");
props.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_URL().key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), "table1");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "datestr");
props.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), TimestampBasedKeyGenerator.class.getName());

props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
Expand All @@ -57,6 +59,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -266,4 +269,32 @@ public static List<WriteStatus> getWriteStatuses(ControlMessage.ParticipantInfo
ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
}

/**
* Build Hive Sync Config
* Note: This method is a temporary solution.
* Future solutions can be referred to: https://issues.apache.org/jira/browse/HUDI-3199
*/
public static HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.basePath = tableBasePath;
hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(KafkaConnectConfigs.HIVE_USE_PRE_APACHE_INPUT_FORMAT, false);
hiveSyncConfig.databaseName = props.getString(KafkaConnectConfigs.HIVE_DATABASE, "default");
hiveSyncConfig.tableName = props.getString(KafkaConnectConfigs.HIVE_TABLE, "");
hiveSyncConfig.hiveUser = props.getString(KafkaConnectConfigs.HIVE_USER, "");
hiveSyncConfig.hivePass = props.getString(KafkaConnectConfigs.HIVE_PASS, "");
hiveSyncConfig.jdbcUrl = props.getString(KafkaConnectConfigs.HIVE_URL, "");
hiveSyncConfig.partitionFields = props.getStringList(KafkaConnectConfigs.HIVE_PARTITION_FIELDS, ",", Collections.emptyList());
hiveSyncConfig.partitionValueExtractorClass =
props.getString(KafkaConnectConfigs.HIVE_PARTITION_EXTRACTOR_CLASS, SlashEncodedDayPartitionValueExtractor.class.getName());
hiveSyncConfig.useJdbc = props.getBoolean(KafkaConnectConfigs.HIVE_USE_JDBC, true);
if (props.containsKey(KafkaConnectConfigs.HIVE_SYNC_MODE)) {
hiveSyncConfig.syncMode = props.getString(KafkaConnectConfigs.HIVE_SYNC_MODE);
}
hiveSyncConfig.autoCreateDatabase = props.getBoolean(KafkaConnectConfigs.HIVE_AUTO_CREATE_DATABASE, true);
hiveSyncConfig.ignoreExceptions = props.getBoolean(KafkaConnectConfigs.HIVE_IGNORE_EXCEPTIONS, false);
hiveSyncConfig.skipROSuffix = props.getBoolean(KafkaConnectConfigs.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, false);
hiveSyncConfig.supportTimestamp = props.getBoolean(KafkaConnectConfigs.HIVE_SUPPORT_TIMESTAMP_TYPE, false);
return hiveSyncConfig;
}
}
Loading

0 comments on commit 6accc83

Please sign in to comment.