diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java index d51ab37719af..cbb5ee2240d8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java @@ -34,7 +34,7 @@ public interface Type extends Serializable { */ enum TypeID { RECORD, ARRAY, MAP, FIXED, STRING, BINARY, - INT, LONG, FLOAT, DOUBLE, DATE, BOOLEAN, TIME, TIMESTAMP, DECIMAL, UUID; + INT, LONG, FLOAT, DOUBLE, DATE, BOOLEAN, TIME, TIMESTAMP, DECIMAL, UUID, BIGINT; private String name; TypeID() { diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java index 3d2774bc19bc..40633583a01f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java @@ -100,6 +100,27 @@ public String toString() { } } + /** + * Long primitive type. + */ + public static class BigIntType extends PrimitiveType { + private static final BigIntType INSTANCE = new BigIntType(); + + public static BigIntType get() { + return INSTANCE; + } + + @Override + public TypeID typeId() { + return TypeID.BIGINT; + } + + @Override + public String toString() { + return "bigint"; + } + } + /** * Float primitive type. */ diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java index 69ccc49528fd..18dbef5fdb5b 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java @@ -105,12 +105,13 @@ private void createAdbConnection() { @Override public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, - String outputFormatClass, String serdeClass, - Map serdeProperties, Map tableProperties) { + String outputFormatClass, String serdeClass, Map serdeProperties, + Map tableProperties) { try { - LOG.info("Creating table:{}", tableName); - String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, - config, inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); + LOG.info("Creating table {}.{}.", databaseName, tableName); + String createSQLQuery = HiveSchemaUtil.generateCreateTableDDL(config, databaseName, + tableName, storageSchema, inputFormatClass, outputFormatClass, serdeClass, + serdeProperties, tableProperties); executeAdbSql(createSQLQuery); } catch (IOException e) { throw new HoodieException("Fail to create table:" + tableName, e); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index b5d13bce9dd9..a049d58c7af3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -32,6 +32,7 @@ import javax.annotation.concurrent.Immutable; +import java.util.List; import java.util.Properties; /** @@ -202,4 +203,36 @@ public TypedProperties toProps() { public void validateParameters() { ValidationUtils.checkArgument(getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) > 0, "batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); } + + public int getHiveBatchSyncPartitionNum() { + return getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + } + + public List getMetaSyncPartitionFields() { + return getSplitStrings(META_SYNC_PARTITION_FIELDS); + } + + public String getMetaSyncBasePath() { + return getString(META_SYNC_BASE_PATH); + } + + public boolean getHiveCreateManagedTable() { + return getBoolean(HIVE_CREATE_MANAGED_TABLE); + } + + public boolean getHiveCreateExternalTable() { + return !getHiveCreateManagedTable(); + } + + public String getMetaSyncPartitionExtractorClass() { + return getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS); + } + + public String getBuckets() { + return getString(HIVE_SYNC_BUCKET_SYNC_SPEC); + } + + public boolean getIsHiveSupportTimestampType() { + return getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE); + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index 11f9f6636376..67a6cc7a1187 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -133,7 +133,7 @@ public void updateTableProperties(String tableName, Map tablePro client.alter_table(databaseName, tableName, table); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to update table properties for table: " - + tableName, e); + + tableId(databaseName, tableName), e); } } @@ -152,7 +152,8 @@ public void updateSerdeProperties(String tableName, Map serdePro boolean different = serdeProperties.entrySet().stream().anyMatch(e -> !parameters.containsKey(e.getKey()) || !parameters.get(e.getKey()).equals(e.getValue())); if (!different) { - LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update serde properties."); + LOG.debug("Table {} serdeProperties already up to date, skip update serde properties.", + tableId(databaseName, tableName)); return; } } @@ -163,7 +164,7 @@ public void updateSerdeProperties(String tableName, Map serdePro client.alter_table(databaseName, tableName, table); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to update table serde info for table: " - + tableName, e); + + tableId(databaseName, tableName), e); } } @@ -180,7 +181,8 @@ public List getAllPartitions(String tableName) { .map(p -> new Partition(p.getValues(), p.getSd().getLocation())) .collect(Collectors.toList()); } catch (TException e) { - throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(databaseName, tableName), e); + throw new HoodieHiveSyncException("Failed to get all partitions for table " + + tableId(databaseName, tableName), e); } } @@ -218,7 +220,8 @@ public boolean tableExists(String tableName) { try { return client.tableExists(databaseName, tableName); } catch (TException e) { - throw new HoodieHiveSyncException("Failed to check if table exists " + tableName, e); + throw new HoodieHiveSyncException("Failed to check if table exists " + + tableId(databaseName, tableName), e); } } @@ -257,17 +260,19 @@ public Option getLastReplicatedTime(String tableName) { Table table = client.getTable(databaseName, tableName); return Option.ofNullable(table.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP, null)); } catch (NoSuchObjectException e) { - LOG.warn("the said table not found in hms " + tableId(databaseName, tableName)); + LOG.warn("the said table {} not found in hms.", tableId(databaseName, tableName)); return Option.empty(); } catch (Exception e) { - throw new HoodieHiveSyncException("Failed to get the last replicated time from the table " + tableName, e); + throw new HoodieHiveSyncException("Failed to get the last replicated time from the table " + + tableId(databaseName, tableName), e); } } public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) { if (getActiveTimeline().getInstantsAsStream().noneMatch(i -> i.getTimestamp().equals(timeStamp))) { throw new HoodieHiveSyncException( - "Not a valid completed timestamp " + timeStamp + " for table " + tableName); + "Not a valid completed timestamp " + timeStamp + " for table " + + tableId(databaseName, tableName)); } try { Table table = client.getTable(databaseName, tableName); @@ -275,7 +280,8 @@ public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) { client.alter_table(databaseName, tableName, table); } catch (Exception e) { throw new HoodieHiveSyncException( - "Failed to update last replicated time to " + timeStamp + " for " + tableName, e); + "Failed to update last replicated time to " + timeStamp + " for " + + tableId(databaseName, tableName), e); } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java index dac82911800c..b63084ac58fc 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java @@ -33,6 +33,8 @@ public interface DDLExecutor extends AutoCloseable { /** + * Create a database with the given name. + * * @param databaseName name of database to be created. */ void createDatabase(String databaseName); @@ -40,7 +42,7 @@ public interface DDLExecutor extends AutoCloseable { /** * Creates a table with the following properties. * - * @param tableName + * @param tableName table name. * @param storageSchema * @param inputFormatClass * @param outputFormatClass @@ -48,14 +50,14 @@ public interface DDLExecutor extends AutoCloseable { * @param serdeProperties * @param tableProperties */ - void createTable(String tableName, MessageType storageSchema, String inputFormatClass, - String outputFormatClass, String serdeClass, - Map serdeProperties, Map tableProperties); + void createTable(String tableName, MessageType storageSchema, + String inputFormatClass, String outputFormatClass, String serdeClass, + Map serdeProperties, Map tableProperties); /** * Updates the table with the newSchema. * - * @param tableName + * @param tableName table name. * @param newSchema */ void updateTableDefinition(String tableName, MessageType newSchema); @@ -63,7 +65,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat /** * Fetches tableSchema for a table. * - * @param tableName + * @param tableName table name. * @return */ Map getTableSchema(String tableName); @@ -71,7 +73,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat /** * Adds partition to table. * - * @param tableName + * @param tableName table name. * @param partitionsToAdd */ void addPartitionsToTable(String tableName, List partitionsToAdd); @@ -79,7 +81,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat /** * Updates partitions for a given table. * - * @param tableName + * @param tableName table name. * @param changedPartitions */ void updatePartitionsToTable(String tableName, List changedPartitions); @@ -87,7 +89,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat /** * Drop partitions for a given table. * - * @param tableName + * @param tableName table name. * @param partitionsToDrop */ void dropPartitionsToTable(String tableName, List partitionsToDrop); @@ -95,7 +97,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat /** * update table comments * - * @param tableName + * @param tableName table name. * @param newSchema Map key: field name, Map value: [field type, field comment] */ void updateTableComments(String tableName, Map> newSchema); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index b86ab6c6e8b1..3684e1e6f391 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -35,13 +35,11 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.parquet.schema.MessageType; import org.apache.thrift.TException; @@ -49,14 +47,11 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; -import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; @@ -75,16 +70,17 @@ public class HMSDDLExecutor implements DDLExecutor { private final IMetaStoreClient client; private final PartitionValueExtractor partitionValueExtractor; - public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) throws HiveException, MetaException { + public HMSDDLExecutor(HiveSyncConfig syncConfig, IMetaStoreClient metaStoreClient) { this.syncConfig = syncConfig; this.databaseName = syncConfig.getStringOrDefault(META_SYNC_DATABASE_NAME); this.client = metaStoreClient; + String className = syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS); try { this.partitionValueExtractor = - (PartitionValueExtractor) Class.forName(syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); + (PartitionValueExtractor) Class.forName(className).newInstance(); } catch (Exception e) { throw new HoodieHiveSyncException( - "Failed to initialize PartitionValueExtractor class " + syncConfig.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), e); + "Failed to initialize PartitionValueExtractor class " + className, e); } } @@ -94,23 +90,25 @@ public void createDatabase(String databaseName) { Database database = new Database(databaseName, "automatically created by hoodie", null, null); client.createDatabase(database); } catch (Exception e) { - LOG.error("Failed to create database " + databaseName, e); + LOG.error("Failed to create database {}", databaseName, e); throw new HoodieHiveSyncException("Failed to create database " + databaseName, e); } } @Override - public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map serdeProperties, - Map tableProperties) { + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, + String serdeClass, Map serdeProperties, Map tableProperties) { try { - LinkedHashMap mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false); + LinkedHashMap mapSchema = + HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false); List fieldSchema = HiveSchemaUtil.convertMapSchemaToHiveFieldSchema(mapSchema, syncConfig); - List partitionSchema = syncConfig.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> { + List partitionSchema = syncConfig.getMetaSyncPartitionFields().stream().map(partitionKey -> { String partitionKeyType = HiveSchemaUtil.getPartitionKeyType(mapSchema, partitionKey); return new FieldSchema(partitionKey, partitionKeyType.toLowerCase(), ""); }).collect(Collectors.toList()); + Table newTb = new Table(); newTb.setDbName(databaseName); newTb.setTableName(tableName); @@ -120,13 +118,13 @@ public void createTable(String tableName, MessageType storageSchema, String inpu storageDescriptor.setCols(fieldSchema); storageDescriptor.setInputFormat(inputFormatClass); storageDescriptor.setOutputFormat(outputFormatClass); - storageDescriptor.setLocation(syncConfig.getString(META_SYNC_BASE_PATH)); + storageDescriptor.setLocation(syncConfig.getMetaSyncBasePath()); serdeProperties.put("serialization.format", "1"); storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties)); newTb.setSd(storageDescriptor); newTb.setPartitionKeys(partitionSchema); - if (!syncConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)) { + if (syncConfig.getHiveCreateExternalTable()) { newTb.putToParameters("EXTERNAL", "TRUE"); newTb.setTableType(TableType.EXTERNAL_TABLE.toString()); } @@ -136,7 +134,7 @@ public void createTable(String tableName, MessageType storageSchema, String inpu } client.createTable(newTb); } catch (Exception e) { - LOG.error("failed to create table " + tableName, e); + LOG.error("failed to create table {}.", tableName, e); throw new HoodieHiveSyncException("failed to create table " + tableName, e); } } @@ -152,7 +150,7 @@ public void updateTableDefinition(String tableName, MessageType newSchema) { table.setSd(sd); EnvironmentContext environmentContext = new EnvironmentContext(); if (cascade) { - LOG.info("partition table,need cascade"); + LOG.info("partition table, need cascade"); environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE); } client.alter_table_with_environmentContext(databaseName, tableName, table, environmentContext); @@ -169,51 +167,48 @@ public Map getTableSchema(String tableName) { // get the Schema of the table. final long start = System.currentTimeMillis(); Table table = this.client.getTable(databaseName, tableName); - Map partitionKeysMap = - table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); - - Map columnsMap = - table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); - - Map schema = new HashMap<>(); - schema.putAll(columnsMap); - schema.putAll(partitionKeysMap); + List allColumns = new ArrayList<>(); + allColumns.addAll(table.getPartitionKeys()); + allColumns.addAll(table.getSd().getCols()); + Map schema = allColumns.stream().collect( + Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); final long end = System.currentTimeMillis(); - LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start))); + LOG.info("Time taken to getTableSchema: {} ms.", (end - start)); return schema; } catch (Exception e) { - throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e); + LOG.error("{}.{} get table schema failed.", databaseName, tableName); + throw new HoodieHiveSyncException(databaseName + "." + tableName + " get table schema failed", e); } } @Override - public void addPartitionsToTable(String tableName, List partitionsToAdd) { - if (partitionsToAdd.isEmpty()) { - LOG.info("No partitions to add for " + tableName); + public void addPartitionsToTable(String tableName, List addPartitions) { + if (addPartitions.isEmpty()) { + LOG.info("No partitions to add for {}.", tableName); return; } - LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName); + LOG.info("Adding partitions {} to table {}.", addPartitions.size(), tableName); try { StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); - int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); - for (List batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) { + int batchSyncPartitionNum = syncConfig.getHiveBatchSyncPartitionNum(); + for (List batch : CollectionUtils.batches(addPartitions, batchSyncPartitionNum)) { List partitionList = new ArrayList<>(); - batch.forEach(x -> { + batch.forEach(partition -> { StorageDescriptor partitionSd = new StorageDescriptor(); partitionSd.setCols(sd.getCols()); partitionSd.setInputFormat(sd.getInputFormat()); partitionSd.setOutputFormat(sd.getOutputFormat()); partitionSd.setSerdeInfo(sd.getSerdeInfo()); - String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.getString(META_SYNC_BASE_PATH), x).toString(); - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x); + String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.getString(META_SYNC_BASE_PATH), partition).toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); partitionSd.setLocation(fullPartitionPath); partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null)); }); client.add_partitions(partitionList, true, false); - LOG.info("HMSDDLExecutor add a batch partitions done: " + partitionList.size()); + LOG.info("HMSDDLExecutor add a batch partitions done: {}.", partitionList.size()); } } catch (TException e) { - LOG.error(databaseName + "." + tableName + " add partition failed", e); + LOG.error("{}.{} add partition failed.", databaseName, tableName); throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e); } } @@ -221,10 +216,10 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) @Override public void updatePartitionsToTable(String tableName, List changedPartitions) { if (changedPartitions.isEmpty()) { - LOG.info("No partitions to change for " + tableName); + LOG.info("No partitions to change for {}.", tableName); return; } - LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName); + LOG.info("Changing partitions {} on {}.", changedPartitions.size(), tableName); try { StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); List partitionList = changedPartitions.stream().map(partition -> { @@ -239,30 +234,26 @@ public void updatePartitionsToTable(String tableName, List changedPartit }).collect(Collectors.toList()); client.alter_partitions(databaseName, tableName, partitionList, null); } catch (TException e) { - LOG.error(databaseName + "." + tableName + " update partition failed", e); + LOG.error("{}.{} update partition failed.", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " update partition failed", e); } } @Override - public void dropPartitionsToTable(String tableName, List partitionsToDrop) { - if (partitionsToDrop.isEmpty()) { - LOG.info("No partitions to drop for " + tableName); + public void dropPartitionsToTable(String tableName, List dropPartitions) { + if (dropPartitions.isEmpty()) { + LOG.info("No partitions to drop for {}.", tableName); return; } - - LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName); + LOG.info("Dropping partitions {} on {}.", dropPartitions.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); - client.dropPartition(databaseName, tableName, partitionClause, false); - } - LOG.info("Drop partition " + dropPartition + " on " + tableName); + for (String dropPartition : dropPartitions) { + HivePartitionUtil.dropPartition(databaseName, tableName, dropPartition, + partitionValueExtractor, syncConfig, client); + LOG.info("Drop partition {} on {}.", dropPartition, tableName); } } catch (TException e) { - LOG.error(databaseName + "." + tableName + " drop partition failed", e); + LOG.error("{}.{} drop partition failed.", databaseName, tableName, e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e); } } @@ -283,8 +274,8 @@ public void updateTableComments(String tableName, Map updateHiveSQLs(List sqls) { - List responses = new ArrayList<>(); try { - for (String sql : sqls) { - if (hiveDriver != null) { - HoodieTimer timer = HoodieTimer.start(); - responses.add(hiveDriver.run(sql)); - LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, timer.endTimer())); - } + if (hiveDriver != null) { + HoodieTimer timer = HoodieTimer.start(); + hiveDriver.run(sql); + LOG.info("Time taken to execute [{}]: {} ms.", sql, timer.endTimer()); } } catch (Exception e) { - throw new HoodieHiveSyncException("Failed in executing SQL", e); + throw new HoodieHiveSyncException("Failed in executing SQL.", e); } - return responses; } - //TODO Duplicating it here from HMSDLExecutor as HiveQueryQL has no way of doing it on its own currently. Need to refactor it @Override public Map getTableSchema(String tableName) { - try { - // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to - // get the Schema of the table. - final long start = System.currentTimeMillis(); - Table table = metaStoreClient.getTable(databaseName, tableName); - Map partitionKeysMap = - table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); - - Map columnsMap = - table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); - - Map schema = new HashMap<>(); - schema.putAll(columnsMap); - schema.putAll(partitionKeysMap); - final long end = System.currentTimeMillis(); - LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start))); - return schema; - } catch (Exception e) { - throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e); - } + return hmsddlExecutor.getTableSchema(tableName); } @Override public void dropPartitionsToTable(String tableName, List partitionsToDrop) { - if (partitionsToDrop.isEmpty()) { - LOG.info("No partitions to drop for " + tableName); - return; - } - - LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName); - try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(metaStoreClient, tableName, dropPartition, partitionValueExtractor, - config)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); - metaStoreClient.dropPartition(databaseName, tableName, partitionClause, false); - } - LOG.info("Drop partition " + dropPartition + " on " + tableName); - } - } catch (Exception e) { - LOG.error(tableId(databaseName, tableName) + " drop partition failed", e); - throw new HoodieHiveSyncException(tableId(databaseName, tableName) + " drop partition failed", e); - } + hmsddlExecutor.dropPartitionsToTable(tableName, partitionsToDrop); } @Override public void close() { + if (hmsddlExecutor != null) { + hmsddlExecutor.close(); + } if (metaStoreClient != null) { Hive.closeCurrent(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 5b56bc97b654..d4d6ab7e873f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -36,7 +36,6 @@ import java.util.Map; import java.util.Objects; -import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER; @@ -161,7 +160,7 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro private List constructDropPartitions(String tableName, List partitions) { List result = new ArrayList<>(); - int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + int batchSyncPartitionNum = config.getHiveBatchSyncPartitionNum(); StringBuilder alterSQL = getAlterTableDropPrefix(tableName); for (int i = 0; i < partitions.size(); i++) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 1c4dcec592e7..4cc745804496 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.Map; -import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE; import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; @@ -63,7 +62,7 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) { this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); try { this.partitionValueExtractor = - (PartitionValueExtractor) Class.forName(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS)).newInstance(); + (PartitionValueExtractor) Class.forName(config.getMetaSyncPartitionExtractorClass()).newInstance(); } catch (Exception e) { throw new HoodieHiveSyncException( "Failed to initialize PartitionValueExtractor class " + config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS), e); @@ -76,19 +75,27 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) { */ public abstract void runSQL(String sql); + /** + * Create a database with the given name. + * + * @param databaseName name of database to be created. + */ @Override public void createDatabase(String databaseName) { - runSQL("create database if not exists " + databaseName); + String createTableSQL = HiveSchemaUtil.generateCreateDataBaseDDL(databaseName); + LOG.info("Creating database with {}.", createTableSQL); + runSQL(createTableSQL); } @Override - public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map serdeProperties, - Map tableProperties) { + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, + String outputFormatClass, String serdeClass, Map serdeProperties, + Map tableProperties) { try { String createSQLQuery = - HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, config, inputFormatClass, + HiveSchemaUtil.generateCreateTableDDL(config, databaseName, tableName, storageSchema, inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); - LOG.info("Creating table with " + createSQLQuery); + LOG.info("Creating table with {}.", createSQLQuery); runSQL(createSQLQuery); } catch (IOException e) { throw new HoodieHiveSyncException("Failed to create table " + tableName, e); @@ -127,10 +134,10 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) @Override public void updatePartitionsToTable(String tableName, List changedPartitions) { if (changedPartitions.isEmpty()) { - LOG.info("No partitions to change for " + tableName); + LOG.info("No partitions to change for {}.{}.", databaseName, tableName); return; } - LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName); + LOG.info("Changing partitions {} on {}.{}.", changedPartitions.size(), databaseName, tableName); List sqls = constructChangePartitions(tableName, changedPartitions); for (String sql : sqls) { runSQL(sql); @@ -145,19 +152,29 @@ public void updateTableComments(String tableName, Map constructAddPartitions(String tableName, List partitions) { List result = new ArrayList<>(); - int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + int batchSyncPartitionNum = config.getHiveBatchSyncPartitionNum(); StringBuilder alterSQL = getAlterTablePrefix(tableName); for (int i = 0; i < partitions.size(); i++) { String partitionClause = getPartitionClause(partitions.get(i)); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java index b46b91647e68..3e52b82467d9 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java @@ -76,4 +76,12 @@ public static Boolean partitionExists(IMetaStoreClient client, String tableName, } return newPartition != null; } + + public static void dropPartition(String databaseName, String tableName, String dropPartition, + PartitionValueExtractor partitionValueExtractor, HiveSyncConfig config, IMetaStoreClient client) throws TException { + if (partitionExists(client, tableName, dropPartition, partitionValueExtractor, config)) { + String partitionClause = getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); + client.dropPartition(databaseName, tableName, partitionClause, false); + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 17d3a5fa4952..24a8703689f8 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -18,19 +18,26 @@ package org.apache.hudi.hive.util; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.SchemaDifference; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hudi.internal.schema.Types; import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.stringtemplate.v4.ST; import java.io.IOException; import java.util.ArrayList; @@ -40,12 +47,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; -import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; /** @@ -54,9 +61,37 @@ public class HiveSchemaUtil { private static final Logger LOG = LoggerFactory.getLogger(HiveSchemaUtil.class); - public static final String HIVE_ESCAPE_CHARACTER = "`"; - public static final String BOOLEAN_TYPE_NAME = "boolean"; + private static final String DATABASE_NAME = "DATABASE_NAME"; + private static final String EXTERNAL = "external"; + private static final String TABLE_NAME = "TABLE_NAME"; + private static final String LIST_COLUMNS = "columns"; + private static final String PARTITIONS = "partitions"; + private static final String ROW_FORMAT = "row_format"; + private static final String LOCATION = "location"; + private static final String LOCATION_BLOCK = "location_block"; + private static final String PROPERTIES = "properties"; + + private static final String BUCKETS = "buckets"; + + private static final String CREATE_DATABASE_STMT = + "CREATE DATABASE IF NOT EXISTS <" + DATABASE_NAME + ">"; + + private static final String CREATE_TABLE_TEMPLATE = + "CREATE <" + EXTERNAL + ">TABLE `<" + DATABASE_NAME + ">`." + + "`<" + TABLE_NAME + ">`(\n" + + "<" + LIST_COLUMNS + ">)\n" + + "<" + PARTITIONS + ">\n" + + "<" + BUCKETS + ">\n" + + "<" + ROW_FORMAT + ">\n" + + "<" + LOCATION_BLOCK + ">" + + "TBLPROPERTIES (\n" + + "<" + PROPERTIES + ">)"; + + private static final String CREATE_TABLE_TEMPLATE_LOCATION = "LOCATION\n" + + "<" + LOCATION + ">\n"; + + public static final String HIVE_ESCAPE_CHARACTER = "`"; public static final String INT_TYPE_NAME = "int"; public static final String BIGINT_TYPE_NAME = "bigint"; public static final String FLOAT_TYPE_NAME = "float"; @@ -230,9 +265,8 @@ public static List convertMapSchemaToHiveFieldSchema(LinkedHashMap< private static String convertField(final Type parquetType, boolean supportTimestamp, boolean doFormat) { StringBuilder field = new StringBuilder(); if (parquetType.isPrimitive()) { - final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName = - parquetType.asPrimitiveType().getPrimitiveTypeName(); final OriginalType originalType = parquetType.getOriginalType(); + if (originalType == OriginalType.DECIMAL) { final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType().getDecimalMetadata(); return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(doFormat ? " , " : ",") @@ -243,52 +277,7 @@ private static String convertField(final Type parquetType, boolean supportTimest return field.append("TIMESTAMP").toString(); } - // TODO - fix the method naming here - return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter() { - @Override - public String convertBOOLEAN(PrimitiveType.PrimitiveTypeName primitiveTypeName) { - return BOOLEAN_TYPE_NAME; - } - - @Override - public String convertINT32(PrimitiveType.PrimitiveTypeName primitiveTypeName) { - return INT_TYPE_NAME; - } - - @Override - public String convertINT64(PrimitiveType.PrimitiveTypeName primitiveTypeName) { - return BIGINT_TYPE_NAME; - } - - @Override - public String convertINT96(PrimitiveType.PrimitiveTypeName primitiveTypeName) { - return "timestamp-millis"; - } - - @Override - public String convertFLOAT(PrimitiveType.PrimitiveTypeName primitiveTypeName) { - return FLOAT_TYPE_NAME; - } - - @Override - public String convertDOUBLE(PrimitiveType.PrimitiveTypeName primitiveTypeName) { - return DOUBLE_TYPE_NAME; - } - - @Override - public String convertFIXED_LEN_BYTE_ARRAY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { - return BINARY_TYPE_NAME; - } - - @Override - public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { - if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) { - return STRING_TYPE_NAME; - } else { - return BINARY_TYPE_NAME; - } - } - }); + return convertPrimitiveType(parquetType); } else { GroupType parquetGroupType = parquetType.asGroupType(); OriginalType originalType = parquetGroupType.getOriginalType(); @@ -340,6 +329,46 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { } } + private static String convertPrimitiveType(Type parquetType) { + + final PrimitiveType primitiveType = parquetType.asPrimitiveType(); + final PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName(); + final OriginalType originalType = parquetType.getOriginalType(); + + switch (primitiveTypeName) { + case INT96: + return Types.TimestampType.get().toString(); + case INT32: + return Types.IntType.get().toString(); + case INT64: + return Types.BigIntType.get().toString(); + case BOOLEAN: + return Types.BooleanType.get().toString(); + case FLOAT: + return Types.FloatType.get().toString(); + case DOUBLE: + return Types.DoubleType.get().toString(); + case BINARY: + if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) { + return Types.StringType.get().toString(); + } else { + return Types.BinaryType.get().toString(); + } + default: + throw new UnsupportedOperationException("Unhandled type" + primitiveTypeName.name()); + } + } + + private static String convertPrimitiveType(OriginalType originalType) { + /*switch (originalType) { + case DECIMAL: + + return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(doFormat ? " , " : ",") + .append(decimalMetadata.getScale()).append(")").toString(); + }*/ + return ""; + } + /** * Return a 'struct' Hive schema from a list of Parquet fields. * @@ -374,14 +403,20 @@ private static String hiveCompatibleFieldName(String fieldName, boolean isNested return doFormat ? tickSurround(result) : result; } - private static String tickSurround(String result) { - if (!result.startsWith("`")) { - result = "`" + result; + /** + * @param pString + * @return + */ + private static String tickSurround(String pString) { + StringBuilder result = new StringBuilder(); + if (!pString.startsWith("`")) { + result.append("`"); } - if (!result.endsWith("`")) { - result = result + "`"; + result.append(pString); + if (!pString.endsWith("`")) { + result.append("`"); } - return result; + return result.toString(); } private static String removeSurroundingTick(String result) { @@ -460,46 +495,53 @@ public static String generateSchemaString(MessageType storageSchema, List serdeProperties, - Map tableProperties) throws IOException { - Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE)); - String columns = generateSchemaString(storageSchema, config.getSplitStrings(META_SYNC_PARTITION_FIELDS), config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE)); - - List partitionFields = new ArrayList<>(); - for (String partitionKey : config.getSplitStrings(META_SYNC_PARTITION_FIELDS)) { - String partitionKeyWithTicks = tickSurround(partitionKey); - partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ") - .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString()); - } + /** + * Generate CreateDataBase SQL. + * + * @param databaseName databaseName. + * @return createDatabase SQL. + */ + public static String generateCreateDataBaseDDL(String databaseName) { + ST command = new ST(CREATE_DATABASE_STMT); + command.add(DATABASE_NAME, databaseName); + return command.render(); + } - String partitionsStr = String.join(",", partitionFields); - StringBuilder sb = new StringBuilder(); - if (config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) { - sb.append("CREATE TABLE IF NOT EXISTS "); - } else { - sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS "); - } - sb.append(HIVE_ESCAPE_CHARACTER).append(config.getStringOrDefault(META_SYNC_DATABASE_NAME)).append(HIVE_ESCAPE_CHARACTER) - .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER); - sb.append("( ").append(columns).append(")"); + /** + * Generate CreateTable SQL. + * + * @param config HiveSyncConfig. + * @param dataBaseName databaseName. + * @param tableName tableName. + * @param storageSchema parquet Schema. + * @param inputFormatClass inputFormatClass. + * @param outputFormatClass outputFormatClass. + * @param serdeClass serdeClass. + * @param serdeProperties serdeProperties. + * @param tableProperties tableProperties. + * @return createTable SQL. + * + * @throws IOException + */ + public static String generateCreateTableDDL(HiveSyncConfig config, String dataBaseName, String tableName, + MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, + Map serdeProperties, Map tableProperties) throws IOException { + ST command = new ST(CREATE_TABLE_TEMPLATE); + command.add(DATABASE_NAME, dataBaseName); + command.add(TABLE_NAME, tableName); + command.add(EXTERNAL, getExternal(config)); + command.add(LIST_COLUMNS, getColumns(storageSchema, config)); if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) { - sb.append(" PARTITIONED BY (").append(partitionsStr).append(")"); + command.add(PARTITIONS, getPartitions(storageSchema, config)); } + command.add(ROW_FORMAT, getRowFormat(serdeClass, serdeProperties, + inputFormatClass, outputFormatClass)); if (config.getString(HIVE_SYNC_BUCKET_SYNC_SPEC) != null) { - sb.append(' ' + config.getString(HIVE_SYNC_BUCKET_SYNC_SPEC) + ' '); - } - sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'"); - if (serdeProperties != null && !serdeProperties.isEmpty()) { - sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")"); + command.add(BUCKETS, config.getBuckets()); } - sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'"); - sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.getAbsoluteBasePath()).append("'"); - - if (tableProperties != null && !tableProperties.isEmpty()) { - sb.append(" TBLPROPERTIES(").append(propertyToString(tableProperties)).append(")"); - } - return sb.toString(); + command.add(LOCATION_BLOCK, getLocationBlock(config.getAbsoluteBasePath())); + command.add(PROPERTIES, propertyToString(tableProperties)); + return command.render(); } private static String propertyToString(Map properties) { @@ -527,4 +569,58 @@ public static String getPartitionKeyType(Map hiveSchema, String // Dont do that return STRING_TYPE_NAME; } + + private static String getExternal(HiveSyncConfig config) { + return config.getHiveCreateExternalTable() ? "EXTERNAL " : ""; + } + + private static String getRowFormat(String serdeClass, Map serdeParams, + String inputFormatClass, String outputFormatClass) { + StringBuilder rowFormat = new StringBuilder(); + rowFormat.append("ROW FORMAT SERDE \n").append(" '" + serdeClass + "' \n"); + if (!serdeParams.isEmpty()) { + appendSerdeParams(rowFormat, serdeParams); + rowFormat.append(" \n"); + } + rowFormat.append("STORED AS INPUTFORMAT \n '" + inputFormatClass + "' \n") + .append("OUTPUTFORMAT \n '" + outputFormatClass + "'"); + return rowFormat.toString(); + } + + public static void appendSerdeParams(StringBuilder builder, Map serdeParams) { + SortedMap sortedSerdeParams = new TreeMap<>(serdeParams); + List serdeCols = new ArrayList<>(); + for (Map.Entry entry : sortedSerdeParams.entrySet()) { + serdeCols.add(" '" + entry.getKey() + "'='" + entry.getValue() + "'"); + } + builder.append("WITH SERDEPROPERTIES ( \n") + .append(StringUtils.join(serdeCols, ", \n")) + .append(')'); + } + + private static String getPartitions(MessageType storageSchema, HiveSyncConfig config) + throws IOException { + Map hiveSchema = + convertParquetSchemaToHiveSchema(storageSchema, config.getIsHiveSupportTimestampType()); + List partitionFields = new ArrayList<>(); + for (String partitionKey : config.getSplitStrings(META_SYNC_PARTITION_FIELDS)) { + String partitionKeyWithTicks = tickSurround(partitionKey); + partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ") + .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString()); + } + String partitionsStr = String.join(",", partitionFields); + return "PARTITIONED BY ( \n" + partitionsStr + ")"; + } + + private static String getColumns(MessageType storageSchema, HiveSyncConfig config) + throws IOException { + return generateSchemaString(storageSchema, + config.getMetaSyncPartitionFields(), config.getIsHiveSupportTimestampType()); + } + + private static String getLocationBlock(String location) { + ST locationBlock = new ST(CREATE_TABLE_TEMPLATE_LOCATION); + locationBlock.add(LOCATION, " '" + location + "'"); + return locationBlock.render(); + } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java index 9c8ffc106db8..04d796b7cbd1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java @@ -145,4 +145,15 @@ public void testSchemaDiffForTimestampMicros() { schemaDifference.getAddColumnTypes(), Collections.emptyList(), true); assertTrue(schemaDifference.isEmpty()); } + + @Test + public void testGenerateCreateDataBaseDDL() { + String expectedCreateDataBaseSQL = "CREATE DATABASE IF NOT EXISTS test_database;"; + String testDataBase = HiveSchemaUtil.generateCreateDataBaseDDL("test_database"); + assertEquals(expectedCreateDataBaseSQL, testDataBase); + } + + public void testGenerateCreateTableDDL() { + + } } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConstant.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConstant.java new file mode 100644 index 000000000000..1ed9d58caaca --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConstant.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common; + +public class HoodieSyncConstant { +}