Skip to content

Commit

Permalink
[HUDI-5633] Fixing performance regression in HoodieSparkRecord (#7769)
Browse files Browse the repository at this point in the history
This change addresses a few performance regressions in `HoodieSparkRecord` identified during our recent benchmarking::

1. `HoodieSparkRecord` rewrites records using `rewriteRecord` and `rewriteRecordWithNewSchema` which do Schema traversals for every record. Instead we should do schema traversal only once and produce a transformer that will directly create new record from the old one.

2. `HoodieRecord`s currently could be rewritten multiple times even in cases when just meta-fields need to be mixed into the schema (in that case, `HoodieSparkRecord` simply wraps source `InternalRow` into `HoodieInternalRow` holding the meta-fields). This is problematic due to a) `UnsafeProjection` re-using mutable row (as a buffer) to avoid allocation of small objects leading to b) recursive overwriting of the same row.

3. Records are currently copied for every Executor even for Simple one which actually is not buffering any records and therefore doesn't require records to be copied.

To address aforementioned gaps following changes have been implemented:

 1. Row writing utils have been revisited to decouple `RowWriter` generation from actual application (to the source row; that way actual application is much more efficient). Additionally, considerable number of row-writing utilities have been eliminated as these are purely duplicative.
 
 2. `HoodieRecord.rewriteRecord` API is renamed into `prependMetaFields` to clearly disambiguate it from `rewriteRecordWithSchema`

 3. `WriteHandle` and `HoodieMergeHelper` implementations are substantially simplified and streamlined accommodating being rebased onto `prependMetaFields`
  • Loading branch information
alexeykudinkin authored Jan 31, 2023
1 parent d857693 commit 628dc8c
Show file tree
Hide file tree
Showing 34 changed files with 904 additions and 843 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.ExecutorFactory;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -104,7 +104,7 @@ private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRe
// it since these records will be subsequently buffered (w/in the in-memory queue);
// Only case when we don't need to make a copy is when using [[SimpleExecutor]] which
// is guaranteed to not hold on to references to any records
boolean shouldClone = writeConfig.getExecutorType() != ExecutorType.SIMPLE;
boolean shouldClone = ExecutorFactory.isBufferingRecords(writeConfig);

return record -> {
HoodieRecord<T> clonedRecord = shouldClone ? record.copy() : record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,29 +236,33 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
// If the format can not record the operation field, nullify the DELETE payload manually.
boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
Option<HoodieRecord> finalRecord = nullifyPayload ? Option.empty() : Option.of(hoodieRecord);

Option<HoodieRecord> finalRecordOpt = nullifyPayload ? Option.empty() : Option.of(hoodieRecord);
// Check for delete
if (finalRecord.isPresent() && !finalRecord.get().isDelete(schema, recordProperties)) {
// Check for ignore ExpressionPayload
if (finalRecord.get().shouldIgnore(schema, recordProperties)) {
return finalRecord;
if (finalRecordOpt.isPresent() && !finalRecordOpt.get().isDelete(schema, recordProperties)) {
HoodieRecord finalRecord = finalRecordOpt.get();
// Check if the record should be ignored (special case for [[ExpressionPayload]])
if (finalRecord.shouldIgnore(schema, recordProperties)) {
return finalRecordOpt;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(schema, recordProperties, writeSchemaWithMetaFields)
: finalRecord.get().rewriteRecord(schema, recordProperties, writeSchemaWithMetaFields);

// Prepend meta-fields into the record
MetadataValues metadataValues = populateMetadataFields(finalRecord);
HoodieRecord populatedRecord =
finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties);

// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into the recordList(List).
HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord.copy(), writeSchemaWithMetaFields, recordProperties);
finalRecord = Option.of(populatedRecord);
finalRecordOpt = Option.of(populatedRecord.copy());
if (isUpdateRecord || isLogCompaction) {
updatedRecordsWritten++;
} else {
insertRecordsWritten++;
}
recordsWritten++;
} else {
finalRecord = Option.empty();
finalRecordOpt = Option.empty();
recordsDeleted++;
}

Expand All @@ -267,15 +271,15 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
// part of marking
// record successful.
hoodieRecord.deflate();
return finalRecord;
return finalRecordOpt;
} catch (Exception e) {
LOG.error("Error writing record " + hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e, recordMetadata);
}
return Option.empty();
}

private HoodieRecord populateMetadataFields(HoodieRecord<T> hoodieRecord, Schema schema, Properties prop) throws IOException {
private MetadataValues populateMetadataFields(HoodieRecord<T> hoodieRecord) {
MetadataValues metadataValues = new MetadataValues();
if (config.populateMetaFields()) {
String seqId =
Expand All @@ -292,7 +296,7 @@ private HoodieRecord populateMetadataFields(HoodieRecord<T> hoodieRecord, Schema
metadataValues.setOperation(hoodieRecord.getOperation().getName());
}

return hoodieRecord.updateMetadataValues(schema, prop, metadataValues);
return metadataValues;
}

private void initNewStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;

/**
* This class is essentially same as Create Handle but overrides two things
* 1) Schema : Metadata bootstrap writes only metadata fields as part of write. So, setup the writer schema accordingly.
Expand All @@ -34,14 +40,28 @@
*/
public class HoodieBootstrapHandle<T, I, K, O> extends HoodieCreateHandle<T, I, K, O> {

// NOTE: We have to use schema containing all the meta-fields in here b/c unlike for [[HoodieAvroRecord]],
// [[HoodieSparkRecord]] requires records to always bear either all or no meta-fields in the
// record schema (ie partial inclusion of the meta-fields in the schema is not allowed)
public static final Schema METADATA_BOOTSTRAP_RECORD_SCHEMA = createMetadataBootstrapRecordSchema();

public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, commitTime, hoodieTable, partitionPath, fileId,
Option.of(HoodieAvroUtils.RECORD_KEY_SCHEMA), taskContextSupplier);
Option.of(METADATA_BOOTSTRAP_RECORD_SCHEMA), taskContextSupplier);
}

@Override
public boolean canWrite(HoodieRecord record) {
return true;
}

private static Schema createMetadataBootstrapRecordSchema() {
List<Schema.Field> fields =
HoodieRecord.HOODIE_META_COLUMNS.stream()
.map(metaField ->
new Schema.Field(metaField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE))
.collect(Collectors.toList());
return Schema.createRecord("HoodieRecordKey", "", "", false, fields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,22 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props
if (record.shouldIgnore(schema, config.getProps())) {
return;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
HoodieRecord rewriteRecord;
if (schemaOnReadEnabled) {
rewriteRecord = record.rewriteRecordWithNewSchema(schema, config.getProps(), writeSchemaWithMetaFields);
} else {
rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
}

MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
HoodieRecord populatedRecord =
record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, config.getProps());

if (preserveMetadata) {
fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields);
} else {
fileWriter.writeWithMetadata(record.getKey(), rewriteRecord, writeSchemaWithMetaFields);
fileWriter.writeWithMetadata(record.getKey(), populatedRecord, writeSchemaWithMetaFields);
}
// update the new location of record, so we know where to find it next

// Update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
record.seal();

recordsWritten++;
insertRecordsWritten++;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,20 +374,16 @@ public void write(HoodieRecord<T> oldRecord) {
}

protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema schema, Properties prop, boolean shouldPreserveRecordMetadata) throws IOException {
HoodieRecord rewriteRecord;
if (schemaOnReadEnabled) {
rewriteRecord = record.rewriteRecordWithNewSchema(schema, prop, writeSchemaWithMetaFields);
} else {
rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
}
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
// file holding this record even in cases when overall metadata is preserved
MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
HoodieRecord populatedRecord =
record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, prop);

if (shouldPreserveRecordMetadata) {
fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
fileWriter.write(key.getRecordKey(), populatedRecord, writeSchemaWithMetaFields);
} else {
fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields);
fileWriter.writeWithMetadata(key, populatedRecord, writeSchemaWithMetaFields);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
this.schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
this.recordMerger = config.getRecordMerger();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.MappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -94,8 +92,8 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,

// In case Advanced Schema Evolution is enabled we might need to rewrite currently
// persisted records to adhere to an evolved schema
Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Schema>> schemaEvolutionTransformerOpt =
composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, table.getMetaClient());
Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt =
composeSchemaEvolutionTransformer(readerSchema, writerSchema, baseFile, writeConfig, table.getMetaClient());

// Check whether the writer schema is simply a projection of the file's one, ie
// - Its field-set is a proper subset (of the reader schema)
Expand Down Expand Up @@ -130,29 +128,27 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
(left, right) ->
left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields()));
recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
} else if (schemaEvolutionTransformerOpt.isPresent()) {
recordIterator = new MappingIterator<>(baseFileRecordIterator,
schemaEvolutionTransformerOpt.get().getLeft().apply(isPureProjection ? writerSchema : readerSchema));
recordSchema = schemaEvolutionTransformerOpt.get().getRight();
} else {
recordIterator = baseFileRecordIterator;
recordSchema = isPureProjection ? writerSchema : readerSchema;
}

boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig);

wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
HoodieRecord newRecord;
if (schemaEvolutionTransformerOpt.isPresent()) {
newRecord = schemaEvolutionTransformerOpt.get().apply(record);
} else if (shouldRewriteInWriterSchema) {
newRecord = record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema);
} else {
newRecord = record;
}

// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into queue of QueueBasedExecutorFactory.
if (shouldRewriteInWriterSchema) {
try {
return record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema).copy();
} catch (IOException e) {
LOG.error("Error rewrite record with new schema", e);
throw new HoodieException(e);
}
} else {
return record.copy();
}
return isBufferingRecords ? newRecord.copy() : newRecord;
}, table.getPreExecuteRunnable());

wrapper.execute();
Expand All @@ -173,10 +169,11 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
}
}

private Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Schema>> composeSchemaEvolutionTransformer(Schema writerSchema,
HoodieBaseFile baseFile,
HoodieWriteConfig writeConfig,
HoodieTableMetaClient metaClient) {
private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTransformer(Schema recordSchema,
Schema writerSchema,
HoodieBaseFile baseFile,
HoodieWriteConfig writeConfig,
HoodieTableMetaClient metaClient) {
Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(writeConfig.getInternalSchema());
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
Expand Down Expand Up @@ -214,18 +211,12 @@ private Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Sche
|| SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
if (needToReWriteRecord) {
Map<String, String> renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
return Option.of(Pair.of(
(schema) -> (record) -> {
try {
return record.rewriteRecordWithNewSchema(
schema,
writeConfig.getProps(),
newWriterSchema, renameCols);
} catch (IOException e) {
LOG.error("Error rewrite record with new schema", e);
throw new HoodieException(e);
}
}, newWriterSchema));
return Option.of(record -> {
return record.rewriteRecordWithNewSchema(
recordSchema,
writeConfig.getProps(),
newWriterSchema, renameCols);
});
} else {
return Option.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,47 @@

public class ExecutorFactory {

public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction) {
return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
return create(config, inputItr, consumer, transformFunction, Functions.noop());
}

public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction,
Runnable preExecuteRunnable) {
ExecutorType executorType = hoodieConfig.getExecutorType();

ExecutorType executorType = config.getExecutorType();
switch (executorType) {
case BOUNDED_IN_MEMORY:
return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
return new BoundedInMemoryExecutor<>(config.getWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, preExecuteRunnable);
case DISRUPTOR:
return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
return new DisruptorExecutor<>(config.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, config.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
case SIMPLE:
return new SimpleExecutor<>(inputItr, consumer, transformFunction);
default:
throw new HoodieException("Unsupported Executor Type " + executorType);
}
}

/**
* Checks whether configured {@link HoodieExecutor} buffer records (for ex, by holding them
* in the queue)
*/
public static boolean isBufferingRecords(HoodieWriteConfig config) {
ExecutorType executorType = config.getExecutorType();
switch (executorType) {
case BOUNDED_IN_MEMORY:
case DISRUPTOR:
return true;
case SIMPLE:
return false;
default:
throw new HoodieException("Unsupported Executor Type " + executorType);
}
}
}
Loading

0 comments on commit 628dc8c

Please sign in to comment.