Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
public (bdcee5037027) committed Jul 5, 2022
1 parent 404cb3a commit 65d1568
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -276,7 +277,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString("hoodie.datasource.write.reconcile.schema"))) {
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
if (historySchemaStr.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,15 +745,18 @@ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord,
* b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
*
* @param oldRecord oldRecord to be rewritten
* @param oldAvroSchema old avro schema.
* @param newSchema newSchema used to rewrite oldRecord
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @param fieldNames track the full name of visited field when we travel new schema.
* @return newRecord for new Schema
*/
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
if (oldRecord == null) {
return null;
}
// try to get real schema for union type
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
switch (newSchema.getType()) {
case RECORD:
if (!(oldRecord instanceof IndexedRecord)) {
Expand Down Expand Up @@ -797,7 +800,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
List<Object> newArray = new ArrayList();
fieldNames.push("element");
for (Object element : array) {
newArray.add(rewriteRecordWithNewSchema(element, getActualSchemaFromUnion(oldSchema.getElementType(), element), newSchema.getElementType(), renameCols, fieldNames));
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames));
}
fieldNames.pop();
return newArray;
Expand All @@ -809,8 +812,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
Map<Object, Object> newMap = new HashMap<>();
fieldNames.push("value");
for (Map.Entry<Object, Object> entry : map.entrySet()) {
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(),
getActualSchemaFromUnion(oldSchema.getValueType(), entry.getValue()), newSchema.getValueType(), renameCols, fieldNames));
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames));
}
fieldNames.pop();
return newMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ public class HoodieCommonConfig extends HoodieConfig {
.defaultValue(false)
.withDocumentation("Enables support for Schema Evolution feature");

public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
+ "injected to missing fields). If not, the write batch would fail.");

public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,7 @@ object DataSourceWriteOptions {
.defaultValue(classOf[HiveSyncTool].getName)
.withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.")

val RECONCILE_SCHEMA: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
+ "injected to missing fields). If not, the write batch would fail.")
val RECONCILE_SCHEMA: ConfigProperty[Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA

// HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
Expand Down

0 comments on commit 65d1568

Please sign in to comment.