Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6079] Improve the code of HMSDDLExecutor, HiveQueryDDLExecutor. #8460

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface Type extends Serializable {
*/
enum TypeID {
RECORD, ARRAY, MAP, FIXED, STRING, BINARY,
INT, LONG, FLOAT, DOUBLE, DATE, BOOLEAN, TIME, TIMESTAMP, DECIMAL, UUID;
INT, LONG, FLOAT, DOUBLE, DATE, BOOLEAN, TIME, TIMESTAMP, DECIMAL, UUID, BIGINT;
private String name;

TypeID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,27 @@ public String toString() {
}
}

/**
* Long primitive type.
*/
public static class BigIntType extends PrimitiveType {
private static final BigIntType INSTANCE = new BigIntType();

public static BigIntType get() {
return INSTANCE;
}

@Override
public TypeID typeId() {
return TypeID.BIGINT;
}

@Override
public String toString() {
return "bigint";
}
}

/**
* Float primitive type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,13 @@ private void createAdbConnection() {

@Override
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String, String> tableProperties) {
String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties) {
try {
LOG.info("Creating table:{}", tableName);
String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema,
config, inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties);
LOG.info("Creating table {}.{}.", databaseName, tableName);
String createSQLQuery = HiveSchemaUtil.generateCreateTableDDL(config, databaseName,
tableName, storageSchema, inputFormatClass, outputFormatClass, serdeClass,
serdeProperties, tableProperties);
executeAdbSql(createSQLQuery);
} catch (IOException e) {
throw new HoodieException("Fail to create table:" + tableName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import javax.annotation.concurrent.Immutable;

import java.util.List;
import java.util.Properties;

/**
Expand Down Expand Up @@ -202,4 +203,36 @@ public TypedProperties toProps() {
public void validateParameters() {
ValidationUtils.checkArgument(getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) > 0, "batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}

public int getHiveBatchSyncPartitionNum() {
return getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
}

public List<String> getMetaSyncPartitionFields() {
return getSplitStrings(META_SYNC_PARTITION_FIELDS);
}

public String getMetaSyncBasePath() {
return getString(META_SYNC_BASE_PATH);
}

public boolean getHiveCreateManagedTable() {
return getBoolean(HIVE_CREATE_MANAGED_TABLE);
}

public boolean getHiveCreateExternalTable() {
return !getHiveCreateManagedTable();
}

public String getMetaSyncPartitionExtractorClass() {
return getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
}

public String getBuckets() {
return getString(HIVE_SYNC_BUCKET_SYNC_SPEC);
}

public boolean getIsHiveSupportTimestampType() {
return getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
client.alter_table(databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to update table properties for table: "
+ tableName, e);
+ tableId(databaseName, tableName), e);
}
}

Expand All @@ -152,7 +152,8 @@ public void updateSerdeProperties(String tableName, Map<String, String> serdePro
boolean different = serdeProperties.entrySet().stream().anyMatch(e ->
!parameters.containsKey(e.getKey()) || !parameters.get(e.getKey()).equals(e.getValue()));
if (!different) {
LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update serde properties.");
LOG.debug("Table {} serdeProperties already up to date, skip update serde properties.",
tableId(databaseName, tableName));
return;
}
}
Expand All @@ -163,7 +164,7 @@ public void updateSerdeProperties(String tableName, Map<String, String> serdePro
client.alter_table(databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to update table serde info for table: "
+ tableName, e);
+ tableId(databaseName, tableName), e);
}
}

Expand All @@ -180,7 +181,8 @@ public List<Partition> getAllPartitions(String tableName) {
.map(p -> new Partition(p.getValues(), p.getSd().getLocation()))
.collect(Collectors.toList());
} catch (TException e) {
throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(databaseName, tableName), e);
throw new HoodieHiveSyncException("Failed to get all partitions for table "
+ tableId(databaseName, tableName), e);
}
}

Expand Down Expand Up @@ -218,7 +220,8 @@ public boolean tableExists(String tableName) {
try {
return client.tableExists(databaseName, tableName);
} catch (TException e) {
throw new HoodieHiveSyncException("Failed to check if table exists " + tableName, e);
throw new HoodieHiveSyncException("Failed to check if table exists "
+ tableId(databaseName, tableName), e);
}
}

Expand Down Expand Up @@ -257,25 +260,28 @@ public Option<String> getLastReplicatedTime(String tableName) {
Table table = client.getTable(databaseName, tableName);
return Option.ofNullable(table.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP, null));
} catch (NoSuchObjectException e) {
LOG.warn("the said table not found in hms " + tableId(databaseName, tableName));
LOG.warn("the said table {} not found in hms.", tableId(databaseName, tableName));
return Option.empty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last replicated time from the table " + tableName, e);
throw new HoodieHiveSyncException("Failed to get the last replicated time from the table "
+ tableId(databaseName, tableName), e);
}
}

public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
if (getActiveTimeline().getInstantsAsStream().noneMatch(i -> i.getTimestamp().equals(timeStamp))) {
throw new HoodieHiveSyncException(
"Not a valid completed timestamp " + timeStamp + " for table " + tableName);
"Not a valid completed timestamp " + timeStamp + " for table "
+ tableId(databaseName, tableName));
}
try {
Table table = client.getTable(databaseName, tableName);
table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
client.alter_table(databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to update last replicated time to " + timeStamp + " for " + tableName, e);
"Failed to update last replicated time to " + timeStamp + " for " +
tableId(databaseName, tableName), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,69 +33,71 @@
public interface DDLExecutor extends AutoCloseable {

/**
* Create a database with the given name.
*
* @param databaseName name of database to be created.
*/
void createDatabase(String databaseName);

/**
* Creates a table with the following properties.
*
* @param tableName
* @param tableName table name.
* @param storageSchema
* @param inputFormatClass
* @param outputFormatClass
* @param serdeClass
* @param serdeProperties
* @param tableProperties
*/
void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String, String> tableProperties);
void createTable(String tableName, MessageType storageSchema,
String inputFormatClass, String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String, String> tableProperties);

/**
* Updates the table with the newSchema.
*
* @param tableName
* @param tableName table name.
* @param newSchema
*/
void updateTableDefinition(String tableName, MessageType newSchema);

/**
* Fetches tableSchema for a table.
*
* @param tableName
* @param tableName table name.
* @return
*/
Map<String, String> getTableSchema(String tableName);

/**
* Adds partition to table.
*
* @param tableName
* @param tableName table name.
* @param partitionsToAdd
*/
void addPartitionsToTable(String tableName, List<String> partitionsToAdd);

/**
* Updates partitions for a given table.
*
* @param tableName
* @param tableName table name.
* @param changedPartitions
*/
void updatePartitionsToTable(String tableName, List<String> changedPartitions);

/**
* Drop partitions for a given table.
*
* @param tableName
* @param tableName table name.
* @param partitionsToDrop
*/
void dropPartitionsToTable(String tableName, List<String> partitionsToDrop);

/**
* update table comments
*
* @param tableName
* @param tableName table name.
* @param newSchema Map key: field name, Map value: [field type, field comment]
*/
void updateTableComments(String tableName, Map<String, Pair<String, String>> newSchema);
Expand Down
Loading