Skip to content

Commit

Permalink
Reduce unnecessary is_empty rdd calls in StreamSync
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh1231 committed Nov 22, 2023
1 parent cda9dbc commit c8c49d5
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() t
.setBasePath(service.getCfg().targetBasePath)
.build();
String instantTime = InProcessTimeGenerator.createNewInstantTime();
InputBatch inputBatch = service.readFromSource(instantTime, metaClient).getLeft();
return Pair.of(inputBatch.getSchemaProvider(), Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD<HoodieRecord>) inputBatch.getBatch().get()));
Pair<SchemaProvider, Pair<String, Option<JavaRDD<HoodieRecord>>>> pair = service.readFromSource(instantTime);
return Pair.of(pair.getLeft(), Pair.of(pair.getRight().getLeft(), pair.getRight().getRight().orElse(jssc.emptyRDD())));
}

public StreamSync getDeltaSync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class SparkSampleWritesUtils {

private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);

public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig) {
public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieWriteConfig writeConfig) {
if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
LOG.debug("Skip overwriting record size estimate as it's disabled.");
return Option.empty();
Expand All @@ -76,7 +76,7 @@ public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(Jav
}
try {
String instantTime = getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault()));
Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
Pair<Boolean, String> result = doSampleWrites(jsc, recordsOpt, writeConfig, instantTime);
if (result.getLeft()) {
long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
LOG.info("Overwriting record size estimate to " + avgSize);
Expand All @@ -90,7 +90,7 @@ public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(Jav
return Option.empty();
}

private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime)
private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieWriteConfig writeConfig, String instantTime)
throws IOException {
final String sampleWritesBasePath = getSampleWritesBasePath(jsc, writeConfig, instantTime);
HoodieTableMetaClient.withPropertyBuilder()
Expand All @@ -109,25 +109,31 @@ private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRD
.withAutoCommit(true)
.withPath(sampleWritesBasePath)
.build();
Pair<Boolean, String> emptyRes = Pair.of(false, null);
try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
List<HoodieRecord> samples = records.coalesce(1).take(size);
sampleWriteClient.startCommitWithTime(instantTime);
JavaRDD<WriteStatus> writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Printing out the top 100 errors");
writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
LOG.trace("Global error :", ws.getGlobalError());
ws.getErrors().forEach((key, throwable) ->
LOG.trace(String.format("Error for key: %s", key), throwable));
});
return recordsOpt.map(records -> {
List<HoodieRecord> samples = records.coalesce(1).take(size);
if (samples.isEmpty()) {
return emptyRes;
}
return Pair.of(false, null);
} else {
return Pair.of(true, sampleWritesBasePath);
}
sampleWriteClient.startCommitWithTime(instantTime);
JavaRDD<WriteStatus> writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Printing out the top 100 errors");
writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
LOG.trace("Global error :", ws.getGlobalError());
ws.getErrors().forEach((key, throwable) ->
LOG.trace(String.format("Error for key: %s", key), throwable));
});
}
return emptyRes;
} else {
return Pair.of(true, sampleWritesBasePath);
}
}).orElse(emptyRes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,32 +402,26 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
.build();
String instantTime = metaClient.createNewInstantTime();

Pair<InputBatch,Boolean> inputBatchIsEmptyPair = readFromSource(instantTime, metaClient);
InputBatch inputBatch = readFromSource(instantTime, metaClient);

if (inputBatchIsEmptyPair != null) {
final JavaRDD<HoodieRecord> recordsFromSource;
if (useRowWriter) {
recordsFromSource = hoodieSparkContext.emptyRDD();
} else {
recordsFromSource = (JavaRDD<HoodieRecord>) inputBatchIsEmptyPair.getKey().getBatch().get();
}
if (inputBatch != null) {

// this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start
// compactor
if (writeClient == null) {
this.schemaProvider = inputBatchIsEmptyPair.getKey().getSchemaProvider();
this.schemaProvider = inputBatch.getSchemaProvider();
// Setup HoodieWriteClient and compaction now that we decided on schema
setupWriteClient(recordsFromSource);
setupWriteClient(inputBatch.getBatch());
} else {
Schema newSourceSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema();
Schema newTargetSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema();
Schema newSourceSchema = inputBatch.getSchemaProvider().getSourceSchema();
Schema newTargetSchema = inputBatch.getSchemaProvider().getTargetSchema();
if ((newSourceSchema != null && !processedSchema.isSchemaPresent(newSourceSchema))
|| (newTargetSchema != null && !processedSchema.isSchemaPresent(newTargetSchema))) {
String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : newSourceSchema.toString(true);
String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : newTargetSchema.toString(true);
LOG.info("Seeing new schema. Source: {0}, Target: {1}", sourceStr, targetStr);
// We need to recreate write client with new schema and register them.
reInitWriteClient(newSourceSchema, newTargetSchema, recordsFromSource);
reInitWriteClient(newSourceSchema, newTargetSchema, inputBatch.getBatch());
if (newSourceSchema != null) {
processedSchema.addSchema(newSourceSchema);
}
Expand All @@ -454,7 +448,7 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
}
}

result = writeToSinkAndDoMetaSync(instantTime, inputBatchIsEmptyPair.getKey(), inputBatchIsEmptyPair.getValue(), metrics, overallTimerContext);
result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics, overallTimerContext);
}

metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
Expand Down Expand Up @@ -484,7 +478,7 @@ private Option<String> getLastPendingCompactionInstant(Option<HoodieTimeline> co
* @throws Exception in case of any Exception
*/

public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
public InputBatch readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
if (commitsTimelineOpt.isPresent()) {
Expand All @@ -499,7 +493,7 @@ public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableM

int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
int curRetryCount = 0;
Pair<InputBatch, Boolean> sourceDataToSync = null;
InputBatch sourceDataToSync = null;
while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
try {
sourceDataToSync = fetchFromSourceAndPrepareRecords(resumeCheckpointStr, instantTime, metaClient);
Expand All @@ -519,7 +513,7 @@ public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableM
return sourceDataToSync;
}

private Pair<InputBatch, Boolean> fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String instantTime,
private InputBatch fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String instantTime,
HoodieTableMetaClient metaClient) {
HoodieRecordType recordType = createRecordMerger(props).getRecordType();
if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
Expand All @@ -544,17 +538,14 @@ private Pair<InputBatch, Boolean> fetchFromSourceAndPrepareRecords(Option<String

// handle empty batch with change in checkpoint
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty");
Pair<InputBatch, Boolean> preparedInputBatchIsEmptyPair = handleEmptyBatch(useRowWriter, inputBatch, checkpointStr, schemaProvider);
if (preparedInputBatchIsEmptyPair.getValue()) { // return if empty batch
return preparedInputBatchIsEmptyPair;
}


if (useRowWriter) { // no additional processing required for row writer.
return Pair.of(inputBatch, false);
return inputBatch;
} else {
JavaRDD<HoodieRecord> records = HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), schemaProvider,
recordType, autoGenerateRecordKeys, instantTime);
return Pair.of(new InputBatch(Option.of(records), checkpointStr, schemaProvider), false);
return new InputBatch(Option.of(records), checkpointStr, schemaProvider);
}
}

Expand Down Expand Up @@ -652,33 +643,6 @@ private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
}
}

/**
* Handles empty batch from input.
* @param useRowWriter true if row write code path.
* @param inputBatch {@link InputBatch} instance to use.
* @param checkpointForNextBatch checkpiont to use for next batch.
* @param schemaProvider {@link SchemaProvider} instance of interest.
* @return a Pair of InputBatch and boolean. boolean value is set to true on empty batch.
*/
private Pair<InputBatch, Boolean> handleEmptyBatch(boolean useRowWriter, InputBatch inputBatch,
String checkpointForNextBatch, SchemaProvider schemaProvider) {
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty");
if (useRowWriter) {
Option<Dataset<Row>> rowDatasetOptional = inputBatch.getBatch();
if ((!rowDatasetOptional.isPresent()) || (rowDatasetOptional.get().isEmpty())) {
LOG.info("No new data, perform empty commit.");
return Pair.of(new InputBatch<>(Option.of(sparkSession.emptyDataFrame()), checkpointForNextBatch, schemaProvider), true);
}
} else {
Option<JavaRDD<GenericRecord>> avroRDDOptional = inputBatch.getBatch();
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
LOG.info("No new data, perform empty commit.");
return Pair.of(new InputBatch(Option.of(hoodieSparkContext.emptyRDD()), checkpointForNextBatch, schemaProvider), true);
}
}
return Pair.of(inputBatch, false);
}

/**
* Apply schema reconcile and schema evolution rules(schema on read) and generate new target schema provider.
*
Expand Down Expand Up @@ -801,24 +765,25 @@ private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema writerSchema) {
*
* @param instantTime instant time to use for ingest.
* @param inputBatch input batch that contains the records, checkpoint, and schema provider
* @param inputIsEmpty true if input batch is empty.
* @param metrics Metrics
* @param overallTimerContext Timer Context
* @return Option Compaction instant if one is scheduled
*/
private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean inputIsEmpty,
private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
HoodieIngestionMetrics metrics,
Timer.Context overallTimerContext) {
Option<String> scheduledCompactionInstant = Option.empty();
// write to hudi and fetch result
Pair<WriteClientWriteResult, Boolean> writeClientWriteResultIsEmptyPair = writeToSink(inputBatch, instantTime, inputIsEmpty);
JavaRDD<WriteStatus> writeStatusRDD = writeClientWriteResultIsEmptyPair.getKey().getWriteStatusRDD();
Map<String, List<String>> partitionToReplacedFileIds = writeClientWriteResultIsEmptyPair.getKey().getPartitionToReplacedFileIds();
boolean isEmpty = writeClientWriteResultIsEmptyPair.getRight();
WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeClientWriteResult.getWriteStatusRDD();
Map<String, List<String>> partitionToReplacedFileIds = writeClientWriteResult.getPartitionToReplacedFileIds();

// process write status
long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
long totalSuccessfulRecords = totalRecords - totalErrorRecords;
LOG.info(String.format("instantTime=%s, totalRecords=%d, totalErrorRecords=%d, totalSuccessfulRecords=%d",
instantTime, totalRecords, totalErrorRecords, totalSuccessfulRecords));
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
Expand Down Expand Up @@ -863,8 +828,10 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Stri
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty());
}

if (!isEmpty || cfg.forceEmptyMetaSync) {
if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) {
runMetaSync();
} else {
LOG.info(String.format("Not running metaSync totalSuccessfulRecords=%d", totalSuccessfulRecords));
}
} else {
LOG.info("Commit " + instantTime + " failed!");
Expand Down Expand Up @@ -924,22 +891,20 @@ private String startCommit(String instantTime, boolean retryEnabled) {
throw lastException;
}

private Pair<WriteClientWriteResult, Boolean> writeToSink(InputBatch inputBatch, String instantTime, boolean inputIsEmpty) {
private WriteClientWriteResult writeToSink(InputBatch inputBatch, String instantTime) {
WriteClientWriteResult writeClientWriteResult = null;
instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
boolean isEmpty = inputIsEmpty;

if (useRowWriter) {
Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().get();
Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD());
HoodieWriteConfig hoodieWriteConfig = prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
BaseDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, writeClient, instantTime);
writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, !HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
} else {
JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) inputBatch.getBatch().get();
JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD());
// filter dupes if needed
if (cfg.filterDupes) {
records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), records, writeClient.getConfig());
isEmpty = records.isEmpty();
}

HoodieWriteResult writeResult = null;
Expand Down Expand Up @@ -973,7 +938,7 @@ private Pair<WriteClientWriteResult, Boolean> writeToSink(InputBatch inputBatch,
throw new HoodieStreamerException("Unknown operation : " + cfg.operation);
}
}
return Pair.of(writeClientWriteResult, isEmpty);
return writeClientWriteResult;
}

private String getSyncClassShortName(String syncClassName) {
Expand Down Expand Up @@ -1028,23 +993,23 @@ public void runMetaSync() {
* SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of
* this constraint.
*/
private void setupWriteClient(JavaRDD<HoodieRecord> records) throws IOException {
private void setupWriteClient(Option<JavaRDD<HoodieRecord>> recordsOpt) throws IOException {
if ((null != schemaProvider)) {
Schema sourceSchema = schemaProvider.getSourceSchema();
Schema targetSchema = schemaProvider.getTargetSchema();
reInitWriteClient(sourceSchema, targetSchema, records);
reInitWriteClient(sourceSchema, targetSchema, recordsOpt);
}
}

private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, JavaRDD<HoodieRecord> records) throws IOException {
private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, Option<JavaRDD<HoodieRecord>> recordsOpt) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
targetSchema = HoodieAvroUtils.removeFields(targetSchema, HoodieStreamerUtils.getPartitionColumns(props));
}
registerAvroSchemas(sourceSchema, targetSchema);
final HoodieWriteConfig initialWriteConfig = getHoodieClientConfig(targetSchema);
final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
.getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), records, initialWriteConfig)
.getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), recordsOpt, initialWriteConfig)
.orElse(initialWriteConfig);

if (writeConfig.isEmbeddedTimelineServerEnabled()) {
Expand Down
Loading

0 comments on commit c8c49d5

Please sign in to comment.