Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional sync timing information #17643

Merged
merged 14 commits into from
Oct 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.helper.ThreadedTimeTracker;
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.AirbyteMapper;
import io.airbyte.workers.internal.AirbyteSource;
Expand Down Expand Up @@ -129,7 +130,10 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
final WorkerDestinationConfig destinationConfig = WorkerUtils.syncToWorkerDestinationConfig(syncInput);
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));

final ThreadedTimeTracker timeTracker = new ThreadedTimeTracker();
final long startTime = System.currentTimeMillis();
timeTracker.trackReplicationStartTime();

final AtomicReference<FailureReason> replicationRunnableFailureRef = new AtomicReference<>();
final AtomicReference<FailureReason> destinationRunnableFailureRef = new AtomicReference<>();

Expand All @@ -146,12 +150,14 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
// closed first (which is what we want).
try (destination; source) {
destination.start(destinationConfig, jobRoot);
timeTracker.trackSourceReadStartTime();
source.start(sourceConfig, jobRoot);
timeTracker.trackDestinationWriteStartTime();

// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc),
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc, timeTracker),
executors).whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof DestinationException) {
Expand All @@ -163,7 +169,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
});

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter),
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeTracker),
executors).whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof SourceException) {
Expand Down Expand Up @@ -204,6 +210,8 @@ else if (hasFailed.get()) {
outputStatus = ReplicationStatus.COMPLETED;
}

timeTracker.trackReplicationEndTime();

final SyncStats totalSyncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
Expand All @@ -212,7 +220,13 @@ else if (hasFailed.get()) {
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage())
.withMaxSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null));
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withReplicationStartTime(timeTracker.getReplicationStartTime())
.withReplicationEndTime(timeTracker.getReplicationEndTime())
.withSourceReadStartTime(timeTracker.getSourceReadStartTime())
.withSourceReadEndTime(timeTracker.getSourceReadEndTime())
.withDestinationWriteStartTime(timeTracker.getDestinationWriteStartTime())
.withDestinationWriteEndTime(timeTracker.getDestinationWriteEndTime());

if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
Expand Down Expand Up @@ -318,7 +332,8 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter) {
final WorkerMetricReporter metricReporter,
final ThreadedTimeTracker timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -362,6 +377,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
}
}
}
timeHolder.trackSourceReadEndTime();
LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted()));
if (!validationErrors.isEmpty()) {
validationErrors.forEach((stream, errorPair) -> {
Expand Down Expand Up @@ -431,7 +447,8 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid
private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
final Map<String, String> mdc) {
final Map<String, String> mdc,
final ThreadedTimeTracker timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Destination output thread started.");
evantahler marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -448,6 +465,7 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de
messageTracker.acceptFromDestination(messageOptional.get());
}
}
timeHolder.trackDestinationWriteEndTime();
if (!cancelled.get() && destination.getExitValue() != 0) {
throw new DestinationException("Destination process exited with non-zero exit code " + destination.getExitValue());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;

/**
* This class exists to track timing information for the sync. It needs to be thread-safe as
* multiple threads (source, destination, and worker) will be accessing it.
*/
public class ThreadedTimeTracker {

private long replicationStartTime;
private long replicationEndTime;
private long sourceReadStartTime;
private long sourceReadEndTime;
private long destinationWriteStartTime;
private long destinationWriteEndTime;

public synchronized void trackReplicationStartTime() {
this.replicationStartTime = System.currentTimeMillis();
}

public synchronized void trackReplicationEndTime() {
this.replicationEndTime = System.currentTimeMillis();
}

public synchronized void trackSourceReadStartTime() {
this.sourceReadStartTime = System.currentTimeMillis();
}

public synchronized void trackSourceReadEndTime() {
this.sourceReadEndTime = System.currentTimeMillis();
}

public synchronized void trackDestinationWriteStartTime() {
this.destinationWriteStartTime = System.currentTimeMillis();
}

public synchronized void trackDestinationWriteEndTime() {
this.destinationWriteEndTime = System.currentTimeMillis();
}

public synchronized long getReplicationStartTime() {
return this.replicationStartTime;
}

public synchronized long getReplicationEndTime() {
return this.replicationEndTime;
}

public synchronized long getSourceReadStartTime() {
return this.sourceReadStartTime;
}

public synchronized long getSourceReadEndTime() {
return this.sourceReadEndTime;
}

public synchronized long getDestinationWriteStartTime() {
return this.destinationWriteStartTime;
}

public synchronized long getDestinationWriteEndTime() {
return this.destinationWriteEndTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,11 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
Jsons.jsonNode(actual));
assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ","));

// remove times so we can do the rest of the object <> object comparison.
actual.getReplicationAttemptSummary().withStartTime(null);
actual.getReplicationAttemptSummary().withEndTime(null);
// remove times, so we can do the rest of the object <> object comparison.
actual.getReplicationAttemptSummary().withStartTime(null).withEndTime(null).getTotalStats().withReplicationStartTime(null)
.withReplicationEndTime(null)
.withSourceReadStartTime(null).withSourceReadEndTime(null)
.withDestinationWriteStartTime(null).withDestinationWriteEndTime(null);

assertEquals(replicationOutput, actual);
}
Expand Down Expand Up @@ -631,7 +633,9 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
.withDestinationStateMessagesEmitted(null)));

assertNotNull(actual);
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats());
// null out timing stats for assertion matching
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats().withReplicationStartTime(null).withReplicationEndTime(null)
.withSourceReadStartTime(null).withSourceReadEndTime(null).withDestinationWriteStartTime(null).withDestinationWriteEndTime(null));
assertEquals(expectedStreamStats, actual.getReplicationAttemptSummary().getStreamStats());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,21 @@ properties:
type: integer
meanSecondsBetweenStateMessageEmittedandCommitted:
type: integer
replicationStartTime:
evantahler marked this conversation as resolved.
Show resolved Hide resolved
type: integer
description: The start of the replication activity
replicationEndTime:
type: integer
description: The end of the replication activity
sourceReadStartTime:
type: integer
description: The boot time of the source container/pod
sourceReadEndTime:
type: integer
description: The exit time of the source container/pod
destinationWriteStartTime:
type: integer
description: The boot time of the destination container/pod
destinationWriteEndTime:
type: integer
description: The exit time of the destination container/pod
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.ScheduleData;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.Job;
Expand Down Expand Up @@ -110,6 +112,9 @@ public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
final JobOutput jobOutput = lastAttempt.getOutput().get();
if (jobOutput.getSync() != null) {
final StandardSyncSummary syncSummary = jobOutput.getSync().getStandardSyncSummary();
final SyncStats totalStats = syncSummary.getTotalStats();
evantahler marked this conversation as resolved.
Show resolved Hide resolved
final NormalizationSummary normalizationSummary = jobOutput.getSync().getNormalizationSummary();

if (syncSummary.getStartTime() != null)
metadata.put("sync_start_time", syncSummary.getStartTime());
if (syncSummary.getEndTime() != null && syncSummary.getStartTime() != null)
Expand All @@ -118,22 +123,42 @@ public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
metadata.put("volume_mb", syncSummary.getBytesSynced());
if (syncSummary.getRecordsSynced() != null)
metadata.put("volume_rows", syncSummary.getRecordsSynced());
if (syncSummary.getTotalStats().getSourceStateMessagesEmitted() != null)
if (totalStats.getSourceStateMessagesEmitted() != null)
evantahler marked this conversation as resolved.
Show resolved Hide resolved
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
if (syncSummary.getTotalStats().getDestinationStateMessagesEmitted() != null)
if (totalStats.getDestinationStateMessagesEmitted() != null)
metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
if (syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted() != null)
if (totalStats.getMaxSecondsBeforeSourceStateMessageEmitted() != null)
metadata.put("max_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted());
if (syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted() != null)
totalStats.getMaxSecondsBeforeSourceStateMessageEmitted());
if (totalStats.getMeanSecondsBeforeSourceStateMessageEmitted() != null)
metadata.put("mean_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted());
if (syncSummary.getTotalStats().getMaxSecondsBetweenStateMessageEmittedandCommitted() != null)
totalStats.getMeanSecondsBeforeSourceStateMessageEmitted());
if (totalStats.getMaxSecondsBetweenStateMessageEmittedandCommitted() != null)
metadata.put("max_seconds_between_state_message_emit_and_commit",
syncSummary.getTotalStats().getMaxSecondsBetweenStateMessageEmittedandCommitted());
if (syncSummary.getTotalStats().getMeanSecondsBetweenStateMessageEmittedandCommitted() != null)
totalStats.getMaxSecondsBetweenStateMessageEmittedandCommitted());
if (totalStats.getMeanSecondsBetweenStateMessageEmittedandCommitted() != null)
metadata.put("mean_seconds_between_state_message_emit_and_commit",
syncSummary.getTotalStats().getMeanSecondsBetweenStateMessageEmittedandCommitted());
totalStats.getMeanSecondsBetweenStateMessageEmittedandCommitted());

if (totalStats.getReplicationStartTime() != null)
metadata.put("replication_start_time", totalStats.getReplicationStartTime());
if (totalStats.getReplicationEndTime() != null)
metadata.put("replication_end_time", totalStats.getReplicationEndTime());
if (totalStats.getSourceReadStartTime() != null)
metadata.put("source_read_start_time", totalStats.getSourceReadStartTime());
if (totalStats.getSourceReadEndTime() != null)
metadata.put("source_read_end_time", totalStats.getSourceReadEndTime());
if (totalStats.getDestinationWriteStartTime() != null)
metadata.put("destination_write_start_time", totalStats.getDestinationWriteStartTime());
if (totalStats.getDestinationWriteEndTime() != null)
metadata.put("destination_write_end_time", totalStats.getDestinationWriteEndTime());

if (normalizationSummary != null) {
if (normalizationSummary.getStartTime() != null)
metadata.put("normalization_start_time", normalizationSummary.getStartTime());
if (normalizationSummary.getEndTime() != null)
metadata.put("normalization_end_time", normalizationSummary.getEndTime());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Metadata;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.Schedule;
import io.airbyte.config.Schedule.TimeUnit;
import io.airbyte.config.StandardCheckConnectionOutput;
Expand Down Expand Up @@ -124,6 +125,14 @@ class JobTrackerTest {
.put("mean_seconds_before_source_state_message_emitted", 4L)
.put("max_seconds_between_state_message_emit_and_commit", 7L)
.put("mean_seconds_between_state_message_emit_and_commit", 6L)
.put("replication_start_time", 7L)
.put("replication_end_time", 8L)
.put("source_read_start_time", 9L)
.put("source_read_end_time", 10L)
.put("destination_write_start_time", 11L)
.put("destination_write_end_time", 12L)
.put("normalization_start_time", 13L)
.put("normalization_end_time", 14L)
.build();
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
Expand Down Expand Up @@ -566,13 +575,15 @@ private Attempt getAttemptMock() {
final JobOutput jobOutput = mock(JobOutput.class);
final StandardSyncOutput syncOutput = mock(StandardSyncOutput.class);
final StandardSyncSummary syncSummary = mock(StandardSyncSummary.class);
final NormalizationSummary normalizationSummary = mock(NormalizationSummary.class);
final SyncStats syncStats = mock(SyncStats.class);

when(syncSummary.getStartTime()).thenReturn(SYNC_START_TIME);
when(syncSummary.getEndTime()).thenReturn(SYNC_END_TIME);
when(syncSummary.getBytesSynced()).thenReturn(SYNC_BYTES_SYNC);
when(syncSummary.getRecordsSynced()).thenReturn(SYNC_RECORDS_SYNC);
when(syncOutput.getStandardSyncSummary()).thenReturn(syncSummary);
when(syncOutput.getNormalizationSummary()).thenReturn(normalizationSummary);
when(syncSummary.getTotalStats()).thenReturn(syncStats);
when(jobOutput.getSync()).thenReturn(syncOutput);
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
Expand All @@ -582,6 +593,15 @@ private Attempt getAttemptMock() {
when(syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()).thenReturn(4L);
when(syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()).thenReturn(7L);
when(syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()).thenReturn(6L);
when(syncStats.getReplicationStartTime()).thenReturn(7L);
when(syncStats.getReplicationEndTime()).thenReturn(8L);
when(syncStats.getSourceReadStartTime()).thenReturn(9L);
when(syncStats.getSourceReadEndTime()).thenReturn(10L);
when(syncStats.getDestinationWriteStartTime()).thenReturn(11L);
when(syncStats.getDestinationWriteEndTime()).thenReturn(12L);
when(normalizationSummary.getStartTime()).thenReturn(13L);
when(normalizationSummary.getEndTime()).thenReturn(14L);

return attempt;
}

Expand Down
Loading