Skip to content

Commit

Permalink
[HUDI-7120] Performance improvements in deltastreamer executor code p…
Browse files Browse the repository at this point in the history
…ath (apache#10135)
  • Loading branch information
lokeshj1703 authored Nov 23, 2023
1 parent 405be17 commit b77eff2
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ private List<Pair<HoodieKey, Long>> fetchRecordKeysWithPositions(HoodieBaseFile

public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
String commitTime = baseFile.getCommitTime();
String fileId = baseFile.getFileId();
return fetchRecordKeysWithPositions(baseFile).stream()
.map(entry -> Pair.of(entry.getLeft(),
new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId(), entry.getRight())));
new HoodieRecordLocation(commitTime, fileId, entry.getRight())));
}

public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi

import org.apache.avro.Schema.Type
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
Expand Down Expand Up @@ -242,4 +243,12 @@ object AvroConversionUtils {
val nameParts = qualifiedName.split('.')
(nameParts.last, nameParts.init.mkString("."))
}

private def handleUnion(schema: Schema): Schema = {
if (schema.getType == Type.UNION) {
val index = if (schema.getTypes.get(0).getType == Schema.Type.NULL) 1 else 0
return schema.getTypes.get(index)
}
schema
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullNam
}

List<Schema> innerTypes = schema.getTypes();
if (innerTypes.size() == 2 && isNullable(schema)) {
// this is a basic nullable field so handle it more efficiently
return resolveNullableSchema(schema);
}

Schema nonNullType =
innerTypes.stream()
.filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName))
Expand Down Expand Up @@ -286,18 +291,19 @@ public static Schema resolveNullableSchema(Schema schema) {
}

List<Schema> innerTypes = schema.getTypes();
Schema nonNullType =
innerTypes.stream()
.filter(it -> it.getType() != Schema.Type.NULL)
.findFirst()
.orElse(null);

if (innerTypes.size() != 2 || nonNullType == null) {
if (innerTypes.size() != 2) {
throw new AvroRuntimeException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
}

return nonNullType;
Schema firstInnerType = innerTypes.get(0);
Schema secondInnerType = innerTypes.get(1);
if ((firstInnerType.getType() != Schema.Type.NULL && secondInnerType.getType() != Schema.Type.NULL)
|| (firstInnerType.getType() == Schema.Type.NULL && secondInnerType.getType() == Schema.Type.NULL)) {
throw new AvroRuntimeException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
}
return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType : firstInnerType;
}

/**
Expand Down
58 changes: 27 additions & 31 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ public static Schema addMetadataFields(Schema schema) {
* @param withOperationField Whether to include the '_hoodie_operation' field
*/
public static Schema addMetadataFields(Schema schema, boolean withOperationField) {
List<Schema.Field> parentFields = new ArrayList<>();
int newFieldsSize = HoodieRecord.HOODIE_META_COLUMNS.size() + (withOperationField ? 1 : 0);
List<Schema.Field> parentFields = new ArrayList<>(schema.getFields().size() + newFieldsSize);

Schema.Field commitTimeField =
new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
Expand Down Expand Up @@ -439,12 +440,6 @@ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSch
copyOldValueOrSetDefault(oldRecord, newRecord, f);
}
}

if (!ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema);
}

return newRecord;
}

Expand All @@ -455,10 +450,6 @@ public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecor
}
// do not preserve FILENAME_METADATA_FIELD
newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName);
if (!GenericData.get().validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);
}
return newRecord;
}

Expand Down Expand Up @@ -494,7 +485,7 @@ public static GenericRecord removeFields(GenericRecord record, Set<String> field
private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) {
Schema oldSchema = oldRecord.getSchema();
Field oldSchemaField = oldSchema.getField(field.name());
Object fieldValue = oldSchemaField == null ? null : oldRecord.get(field.name());
Object fieldValue = oldSchemaField == null ? null : oldRecord.get(oldSchemaField.pos());

if (fieldValue != null) {
// In case field's value is a nested record, we have to rewrite it as well
Expand All @@ -508,11 +499,14 @@ private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRec
} else {
newFieldValue = fieldValue;
}
newRecord.put(field.name(), newFieldValue);
newRecord.put(field.pos(), newFieldValue);
} else if (field.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(field.name(), null);
newRecord.put(field.pos(), null);
} else {
newRecord.put(field.name(), field.defaultVal());
if (!isNullable(field.schema()) && field.defaultVal() == null) {
throw new SchemaCompatibilityException("Field " + field.name() + " has no default value and is null in old record");
}
newRecord.put(field.pos(), field.defaultVal());
}
}

Expand Down Expand Up @@ -562,7 +556,8 @@ public static Object getFieldVal(GenericRecord record, String key) {
* it is consistent with avro after 1.10
*/
public static Object getFieldVal(GenericRecord record, String key, boolean returnNullIfNotFound) {
if (record.getSchema().getField(key) == null) {
Schema.Field field = record.getSchema().getField(key);
if (field == null) {
if (returnNullIfNotFound) {
return null;
} else {
Expand All @@ -572,7 +567,7 @@ public static Object getFieldVal(GenericRecord record, String key, boolean retur
throw new AvroRuntimeException("Not a valid schema field: " + key);
}
} else {
return record.get(key);
return record.get(field.pos());
}
}

Expand Down Expand Up @@ -874,15 +869,16 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
}
// try to get real schema for union type
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames, validate);
Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames);
// validation is recursive so it only needs to be called on the original input
if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema);
}
return newRecord;
}

private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames, boolean validate) {
private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
switch (newSchema.getType()) {
case RECORD:
ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type");
Expand All @@ -893,17 +889,17 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem
Schema.Field field = fields.get(i);
String fieldName = field.name();
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null && !renameCols.containsKey(field.name())) {
Schema.Field oldField = oldSchema.getField(field.name());
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
Schema.Field oldField = oldSchema.getField(field.name());
if (oldField != null && !renameCols.containsKey(field.name())) {
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, false));
} else {
String fieldFullName = createFullName(fieldNames);
String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
String fieldNameFromOldSchema = renameCols.get(fieldFullName);
// deal with rename
if (oldSchema.getField(fieldNameFromOldSchema) != null) {
Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ? null : oldSchema.getField(fieldNameFromOldSchema);
if (oldFieldRenamed != null) {
// find rename
Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema);
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldFieldRenamed.pos()), oldFieldRenamed.schema(), fields.get(i).schema(), renameCols, fieldNames, false));
} else {
// deal with default value
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
Expand All @@ -927,25 +923,25 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem
case ARRAY:
ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type");
Collection array = (Collection) oldRecord;
List<Object> newArray = new ArrayList(array.size());
List<Object> newArray = new ArrayList<>(array.size());
fieldNames.push("element");
for (Object element : array) {
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames, validate));
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames, false));
}
fieldNames.pop();
return newArray;
case MAP:
ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type");
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>(map.size(), 1);
Map<Object, Object> newMap = new HashMap<>(map.size(), 1.0f);
fieldNames.push("value");
for (Map.Entry<Object, Object> entry : map.entrySet()) {
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames, validate));
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames, false));
}
fieldNames.pop();
return newMap;
case UNION:
return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames, validate);
return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames, false);
default:
return rewritePrimaryType(oldRecord, oldSchema, newSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class FSUtils {
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";

private static final String LOG_FILE_EXTENSION = ".log";

private static final PathFilter ALLOW_ALL_FILTER = file -> true;

public static Configuration prepareHadoopConf(Configuration conf) {
Expand Down Expand Up @@ -474,8 +476,11 @@ public static boolean isLogFile(Path logPath) {
}

public static boolean isLogFile(String fileName) {
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
return fileName.contains(".log") && matcher.find();
if (fileName.contains(LOG_FILE_EXTENSION)) {
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
return matcher.find();
}
return false;
}

/**
Expand Down
Loading

0 comments on commit b77eff2

Please sign in to comment.