From 65d15683ec3b8084330a6df7e121ca4218b83b2f Mon Sep 17 00:00:00 2001 From: "public (bdcee5037027)" Date: Tue, 5 Jul 2022 15:17:54 +0800 Subject: [PATCH] fix comments --- .../org/apache/hudi/client/BaseHoodieWriteClient.java | 3 ++- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 10 ++++++---- .../apache/hudi/common/config/HoodieCommonConfig.java | 7 +++++++ .../main/scala/org/apache/hudi/DataSourceOptions.scala | 7 +------ 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index cea145bcfdf68..24400c5eda90a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -39,6 +39,7 @@ import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.HoodiePendingRollbackInfo; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -276,7 +277,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient()); String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse(""); FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient()); - if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString("hoodie.datasource.write.reconcile.schema"))) { + if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) { InternalSchema internalSchema; Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema())); if (historySchemaStr.isEmpty()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index a2d94239ec097..fa65461bfdb0e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -745,15 +745,18 @@ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema * * @param oldRecord oldRecord to be rewritten + * @param oldAvroSchema old avro schema. * @param newSchema newSchema used to rewrite oldRecord * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema) * @param fieldNames track the full name of visited field when we travel new schema. * @return newRecord for new Schema */ - private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames) { + private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvroSchema, Schema newSchema, Map renameCols, Deque fieldNames) { if (oldRecord == null) { return null; } + // try to get real schema for union type + Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); switch (newSchema.getType()) { case RECORD: if (!(oldRecord instanceof IndexedRecord)) { @@ -797,7 +800,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch List newArray = new ArrayList(); fieldNames.push("element"); for (Object element : array) { - newArray.add(rewriteRecordWithNewSchema(element, getActualSchemaFromUnion(oldSchema.getElementType(), element), newSchema.getElementType(), renameCols, fieldNames)); + newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames)); } fieldNames.pop(); return newArray; @@ -809,8 +812,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch Map newMap = new HashMap<>(); fieldNames.push("value"); for (Map.Entry entry : map.entrySet()) { - newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), - getActualSchemaFromUnion(oldSchema.getValueType(), entry.getValue()), newSchema.getValueType(), renameCols, fieldNames)); + newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames)); } fieldNames.pop(); return newMap; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index cc62bcc32824f..917cfe621f11e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -36,6 +36,13 @@ public class HoodieCommonConfig extends HoodieConfig { .defaultValue(false) .withDocumentation("Enables support for Schema Evolution feature"); + public static final ConfigProperty RECONCILE_SCHEMA = ConfigProperty + .key("hoodie.datasource.write.reconcile.schema") + .defaultValue(false) + .withDocumentation("When a new batch of write has records with old schema, but latest table schema got " + + "evolved, this config will upgrade the records to leverage latest table schema(default values will be " + + "injected to missing fields). If not, the write batch would fail."); + public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty .key("hoodie.common.spillable.diskmap.type") .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 819c4b55a9e4f..89b6423a455f1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -399,12 +399,7 @@ object DataSourceWriteOptions { .defaultValue(classOf[HiveSyncTool].getName) .withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.") - val RECONCILE_SCHEMA: ConfigProperty[Boolean] = ConfigProperty - .key("hoodie.datasource.write.reconcile.schema") - .defaultValue(false) - .withDocumentation("When a new batch of write has records with old schema, but latest table schema got " - + "evolved, this config will upgrade the records to leverage latest table schema(default values will be " - + "injected to missing fields). If not, the write batch would fail.") + val RECONCILE_SCHEMA: ConfigProperty[Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA // HIVE SYNC SPECIFIC CONFIGS // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes