Skip to content

Commit

Permalink
[MINOR] Misc fixes in deltastreamer (apache#10067)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 23, 2023
1 parent 2793004 commit 6b91cfb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
while (!isShutdownRequested()) {
try {
long start = System.currentTimeMillis();
// Send a heartbeat metrics event to track the active ingestion job for this table.
streamSync.getMetrics().updateStreamerHeartbeatTimestamp(start);
// check if deltastreamer need to update the configuration before the sync
if (configurationHotUpdateStrategyOpt.isPresent()) {
Option<TypedProperties> newProps = configurationHotUpdateStrategyOpt.get().updateProperties(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
// configured via this option. The column is then used to trigger error events.
StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)
.add(new StructField(ERROR_TABLE_CURRUPT_RECORD_COL_NAME, DataTypes.StringType, true, Metadata.empty()));
StructType nullableStruct = dataType.asNullable();
Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> source.getSparkSession().read()
.option("columnNameOfCorruptRecord", ERROR_TABLE_CURRUPT_RECORD_COL_NAME).schema(dataType.asNullable())
.option("columnNameOfCorruptRecord", ERROR_TABLE_CURRUPT_RECORD_COL_NAME)
.schema(nullableStruct)
.option("mode", "PERMISSIVE")
.json(rdd));
Option<Dataset<Row>> eventsDataset = processErrorEvents(dataset,
ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public class StreamSync implements Serializable, Closeable {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class);
private static final String NULL_PLACEHOLDER = "[null]";

/**
* Delta Sync Config.
Expand Down Expand Up @@ -419,14 +420,19 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
} else {
Schema newSourceSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema();
Schema newTargetSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema();
if (!(processedSchema.isSchemaPresent(newSourceSchema))
|| !(processedSchema.isSchemaPresent(newTargetSchema))) {
LOG.info("Seeing new schema. Source :" + newSourceSchema.toString(true)
+ ", Target :" + newTargetSchema.toString(true));
if ((newSourceSchema != null && !processedSchema.isSchemaPresent(newSourceSchema))
|| (newTargetSchema != null && !processedSchema.isSchemaPresent(newTargetSchema))) {
String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : newSourceSchema.toString(true);
String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : newTargetSchema.toString(true);
LOG.info("Seeing new schema. Source: {0}, Target: {1}", sourceStr, targetStr);
// We need to recreate write client with new schema and register them.
reInitWriteClient(newSourceSchema, newTargetSchema, recordsFromSource);
processedSchema.addSchema(newSourceSchema);
processedSchema.addSchema(newTargetSchema);
if (newSourceSchema != null) {
processedSchema.addSchema(newSourceSchema);
}
if (newTargetSchema != null) {
processedSchema.addSchema(newTargetSchema);
}
}
}

Expand Down Expand Up @@ -575,7 +581,8 @@ private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);

checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null
&& this.userProvidedSchemaProvider.getTargetSchema() != InputBatch.NULL_SCHEMA) {
if (useRowWriter) {
inputBatchForWriter = new InputBatch(transformed, checkpointStr, this.userProvidedSchemaProvider);
} else {
Expand Down Expand Up @@ -982,6 +989,7 @@ public void runMetaSync() {
LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward compatibility");
}
if (cfg.enableMetaSync) {
LOG.debug("[MetaSync] Starting sync");
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, hoodieSparkContext.hadoopConfiguration());

TypedProperties metaProps = new TypedProperties();
Expand All @@ -995,14 +1003,19 @@ public void runMetaSync() {
Map<String, HoodieException> failedMetaSyncs = new HashMap<>();
for (String impl : syncClientToolClasses) {
Timer.Context syncContext = metrics.getMetaSyncTimerContext();
boolean success = false;
try {
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs, cfg.targetBasePath, cfg.baseFileFormat);
success = true;
} catch (HoodieMetaSyncException e) {
LOG.warn("SyncTool class " + impl.trim() + " failed with exception", e);
LOG.error("SyncTool class {0} failed with exception {1}", impl.trim(), e);
failedMetaSyncs.put(impl, e);
}
long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
metrics.updateStreamerMetaSyncMetrics(getSyncClassShortName(impl), metaSyncTimeMs);
if (success) {
LOG.info("[MetaSync] SyncTool class {0} completed successfully and took {1} ", impl.trim(), metaSyncTimeMs);
}
}
if (!failedMetaSyncs.isEmpty()) {
throw getHoodieMetaSyncException(failedMetaSyncs);
Expand Down Expand Up @@ -1174,13 +1187,14 @@ private void registerAvroSchemas(SchemaProvider schemaProvider) {
*/
private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) {
// register the schemas, so that shuffle does not serialize the full schemas
if (null != sourceSchema) {
List<Schema> schemas = new ArrayList<>();
List<Schema> schemas = new ArrayList<>();
if (sourceSchema != null) {
schemas.add(sourceSchema);
if (targetSchema != null) {
schemas.add(targetSchema);
}

}
if (targetSchema != null) {
schemas.add(targetSchema);
}
if (!schemas.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Registering Schema: " + schemas);
}
Expand Down

0 comments on commit 6b91cfb

Please sign in to comment.