Skip to content

Commit

Permalink
fix props and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Mar 19, 2022
1 parent 09f0556 commit 309116a
Show file tree
Hide file tree
Showing 18 changed files with 179 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ public static Object loadClass(String clazz, Class<?>[] constructorArgTypes, Obj
}
}

/**
* Check if the clazz has the target constructor or not.
*
* When catch {@link HoodieException} from {@link #loadClass}, it's inconvenient to say if the exception was thrown
* due to the instantiation's own logic or missing constructor.
*
* TODO: ReflectionUtils should throw a specific exception to indicate Reflection problem.
*/
public static boolean hasConstructor(String clazz, Class<?>[] constructorArgTypes) {
try {
getClass(clazz).getConstructor(constructorArgTypes);
return true;
} catch (NoSuchMethodException e) {
LOG.warn("Unable to instantiate class " + clazz, e);
return false;
}
}

/**
* Creates an instance of the given class. Constructor arg types are inferred.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E
TypedProperties properties = new TypedProperties();
properties.putAll(executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getProps());
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH, executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().targetBasePath);
properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper()
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public HiveSyncNode(Config config) {
@Override
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive sync node");
SyncUtilHelpers.createAndSyncHoodieMeta(HiveSyncTool.class.getName(), new TypedProperties(executionContext.getHoodieTestSuiteWriter().getProps()),
SyncUtilHelpers.runHoodieMetaSync(HiveSyncTool.class.getName(), new TypedProperties(executionContext.getHoodieTestSuiteWriter().getProps()),
executionContext.getHoodieTestSuiteWriter().getConfiguration(),
new Path(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath).getFileSystem(executionContext.getHoodieTestSuiteWriter().getConfiguration()),
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath, executionContext.getHoodieTestSuiteWriter().getCfg().baseFileFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private void syncMeta() {
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
for (String impl : syncClientToolClasses) {
SyncUtilHelpers.createAndSyncHoodieMeta(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue());
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.util.DataTypeUtils;

Expand Down Expand Up @@ -270,8 +271,10 @@ public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRD
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
}

/** @deprecated Use {@link HiveSyncConfig} constructor directly and provide the props,
* and set {@link HoodieSyncConfig.META_SYNC_BASE_PATH} and {@link HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT}*/
/**
* @deprecated Use {@link HiveSyncConfig} constructor directly and provide the props,
* and set {@link HoodieSyncConfig#META_SYNC_BASE_PATH} and {@link HoodieSyncConfig#META_SYNC_BASE_FILE_FORMAT} instead.
*/
@Deprecated
public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath, String baseFileFormat) {
checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE().key()));
Expand Down Expand Up @@ -313,7 +316,7 @@ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String b
hiveSyncConfig.isConditionalSync = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().key(),
DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().defaultValue()));
hiveSyncConfig.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(),
(boolean) DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null;
if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SPARK_VERSION, SparkContext}

import scala.collection.JavaConversions._
import scala.collection.mutable
Expand Down Expand Up @@ -564,15 +564,15 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
val properties = new TypedProperties()
properties.putAll(hoodieConfig.getProps)
properties.put(HiveSyncConfig.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD, spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString)
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION, SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA, hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
properties.put(HiveSyncConfig.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key, spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString)
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))

val hiveConf: HiveConf = new HiveConf()
hiveConf.addResource(fs.getConf)

syncClientToolClassSet.foreach(impl => {
SyncUtilHelpers.createAndSyncHoodieMeta(impl.trim, properties, hiveConf, fs, basePath.toString, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue)
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, hiveConf, fs, basePath.toString, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue)
})
}
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class HiveSyncConfig extends HoodieSyncConfig {
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format");

/**
* @deprecated Use {@link HIVE_SYNC_MODE} instead of this config from 0.9.0
* @deprecated Use {@link #HIVE_SYNC_MODE} instead of this config from 0.9.0
*/
@Deprecated
public static final ConfigProperty<String> HIVE_USE_JDBC = ConfigProperty
Expand Down Expand Up @@ -240,41 +240,10 @@ public HiveSyncConfig(TypedProperties props) {
this.syncAsSparkDataSourceTable = getBooleanOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE);
this.sparkSchemaLengthThreshold = getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
this.createManagedTable = getBooleanOrDefault(HIVE_CREATE_MANAGED_TABLE);
this.bucketSpec = props.getString(HIVE_SYNC_BUCKET_SYNC_SPEC.key(), null);
this.bucketSpec = getStringOrDefault(HIVE_SYNC_BUCKET_SYNC_SPEC);
this.syncComment = getBooleanOrDefault(HIVE_SYNC_COMMENT);
}

// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.databaseName = cfg.databaseName;
newConfig.hivePass = cfg.hivePass;
newConfig.hiveUser = cfg.hiveUser;
newConfig.partitionFields = cfg.partitionFields;
newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.metastoreUris = cfg.metastoreUris;
newConfig.tableName = cfg.tableName;
newConfig.bucketSpec = cfg.bucketSpec;
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
newConfig.supportTimestamp = cfg.supportTimestamp;
newConfig.decodePartition = cfg.decodePartition;
newConfig.tableProperties = cfg.tableProperties;
newConfig.serdeProperties = cfg.serdeProperties;
newConfig.createManagedTable = cfg.createManagedTable;
newConfig.batchSyncNum = cfg.batchSyncNum;
newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable;
newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
newConfig.withOperationField = cfg.withOperationField;
newConfig.isConditionalSync = cfg.isConditionalSync;
newConfig.sparkVersion = cfg.sparkVersion;
newConfig.syncComment = cfg.syncComment;
return newConfig;
}

@Override
public String toString() {
return "HiveSyncConfig{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@

package org.apache.hudi.hive.replication;

import com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.HiveSyncConfig;

import com.beust.jcommander.Parameter;

public class GlobalHiveSyncConfig extends HiveSyncConfig {
@Parameter(names = {"--replicated-timestamp"}, description = "Add globally replicated timestamp to enable consistent reads across clusters")
public String globallyReplicatedTimeStamp;

public GlobalHiveSyncConfig() {
}

public GlobalHiveSyncConfig(TypedProperties props) {
super(props);
}

public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig();
GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig(cfg.getProps());
newConfig.basePath = cfg.basePath;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.databaseName = cfg.databaseName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
public class HiveTestUtil {

public static final String DB_NAME = "testdb";
public static String TABLE_NAME = "test1";
public static final String TABLE_NAME = "test1";
public static String basePath;
public static TypedProperties hiveSyncProps;
public static HiveTestService hiveTestService;
Expand Down Expand Up @@ -125,7 +125,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_PASS.key(), "");
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME);
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME);
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH, basePath);
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), basePath);
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.key(), "true");
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public AbstractSyncTool(TypedProperties props, Configuration conf, FileSystem fs

@Deprecated
public AbstractSyncTool(Properties props, FileSystem fileSystem) {
this(new TypedProperties(props), new Configuration(), fileSystem);
this(new TypedProperties(props), fileSystem.getConf(), fileSystem);
}

public abstract void syncHoodieTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import com.beust.jcommander.Parameter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

Expand All @@ -37,15 +37,13 @@
*/
public class HoodieSyncConfig extends HoodieConfig {

public static final String META_SYNC_BASE_PATH = "meta.sync.base.path";

@Parameter(names = {"--database"}, description = "name of the target database in Hive", required = true)
@Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
public String databaseName;

@Parameter(names = {"--table"}, description = "name of the target table in Hive", required = true)
@Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
public String tableName;

@Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true)
@Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
public String basePath;

@Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
Expand All @@ -71,9 +69,14 @@ public class HoodieSyncConfig extends HoodieConfig {
@Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
public Boolean isConditionalSync;

@Parameter(names = {"--spark-version"}, description = "The spark version", required = false)
@Parameter(names = {"--spark-version"}, description = "The spark version")
public String sparkVersion;

public static final ConfigProperty<String> META_SYNC_BASE_PATH = ConfigProperty
.key("hoodie.datasource.meta.sync.base.path")
.defaultValue("")
.withDocumentation("Base path of the hoodie table to sync");

public static final ConfigProperty<String> META_SYNC_ENABLED = ConfigProperty
.key("hoodie.datasource.meta.sync.enable")
.defaultValue("false")
Expand Down Expand Up @@ -166,11 +169,11 @@ public HoodieSyncConfig(TypedProperties props) {
super(props);
setDefaults();

this.basePath = props.getString(META_SYNC_BASE_PATH, "");
this.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
this.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
this.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
this.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
this.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", new ArrayList<>());
this.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", Collections.emptyList());
this.partitionValueExtractorClass = getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
this.assumeDatePartitioning = getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
this.decodePartition = getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,60 +8,65 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Properties;

/**
* Helper class for syncing Hudi commit data with external metastores.
*/
public class SyncUtilHelpers {
private static final Logger LOG = LogManager.getLogger(SyncUtilHelpers.class);

/**
* Create an instance of an implementation of {@link AbstractSyncTool} that will sync all the relevant meta information
* with an external metastore such as Hive etc. to ensure Hoodie tables can be queried or read via external systems.
*
* @param metaSyncClass The class that implements the sync of the metadata.
* @param metaSyncFQCN The class that implements the sync of the metadata.
* @param props property map.
* @param hadoopConfig Hadoop confs.
* @param fs Filesystem used.
* @param targetBasePath The target base path that contains the hoodie table.
* @param baseFileFormat The file format used by the hoodie table (defaults to PARQUET).
* @return
*/
public static void createAndSyncHoodieMeta(String metaSyncClass,
TypedProperties props,
Configuration hadoopConfig,
FileSystem fs,
String targetBasePath,
String baseFileFormat) {
public static void runHoodieMetaSync(String metaSyncFQCN,
TypedProperties props,
Configuration hadoopConfig,
FileSystem fs,
String targetBasePath,
String baseFileFormat) {
try {
createMetaSyncClass(metaSyncClass, props, hadoopConfig, fs, targetBasePath, baseFileFormat).syncHoodieTable();
instantiateMetaSyncTool(metaSyncFQCN, props, hadoopConfig, fs, targetBasePath, baseFileFormat).syncHoodieTable();
} catch (Throwable e) {
throw new HoodieException("Could not sync using the meta sync class " + metaSyncClass, e);
throw new HoodieException("Could not sync using the meta sync class " + metaSyncFQCN, e);
}
}

static AbstractSyncTool createMetaSyncClass(String metaSyncClass,
TypedProperties props,
Configuration hadoopConfig,
FileSystem fs,
String targetBasePath,
String baseFileFormat) {
static AbstractSyncTool instantiateMetaSyncTool(String metaSyncFQCN,
TypedProperties props,
Configuration hadoopConfig,
FileSystem fs,
String targetBasePath,
String baseFileFormat) {
TypedProperties properties = new TypedProperties();
properties.putAll(props);
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH, targetBasePath);
properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, baseFileFormat);
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), targetBasePath);
properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), baseFileFormat);

try {
return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncClass,
if (ReflectionUtils.hasConstructor(metaSyncFQCN,
new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class})) {
return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class},
properties, hadoopConfig, fs));
} catch (HoodieException e) {
// fallback to old interface
return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncClass,
new Class<?>[] {Properties.class, FileSystem.class}, properties, fs));
} catch (Throwable e) {
throw new HoodieException("Could not load meta sync class " + metaSyncClass, e);
} else {
LOG.warn("Falling back to deprecated constructor for class: " + metaSyncFQCN);
try {
return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
new Class<?>[] {Properties.class, FileSystem.class}, properties, fs));
} catch (Throwable t) {
throw new HoodieException("Could not load meta sync class " + metaSyncFQCN, t);
}
}
}
}
Loading

0 comments on commit 309116a

Please sign in to comment.