Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional sync timing information #17643

Merged
merged 14 commits into from
Oct 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
final WorkerDestinationConfig destinationConfig = WorkerUtils.syncToWorkerDestinationConfig(syncInput);
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));

final ThreadedTimeHolder timeHolder = new ThreadedTimeHolder();
final long startTime = System.currentTimeMillis();
final long replicationStartTime = System.currentTimeMillis();
long sourceReadStartTime = -1;
long destinationWriteStartTime = -1;

final AtomicReference<FailureReason> replicationRunnableFailureRef = new AtomicReference<>();
final AtomicReference<FailureReason> destinationRunnableFailureRef = new AtomicReference<>();

Expand All @@ -146,12 +151,14 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
// closed first (which is what we want).
try (destination; source) {
destination.start(destinationConfig, jobRoot);
sourceReadStartTime = System.currentTimeMillis();
source.start(sourceConfig, jobRoot);
destinationWriteStartTime = System.currentTimeMillis();

// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc),
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc, timeHolder),
executors).whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof DestinationException) {
Expand All @@ -163,7 +170,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
});

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter),
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeHolder),
executors).whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof SourceException) {
Expand Down Expand Up @@ -204,6 +211,8 @@ else if (hasFailed.get()) {
outputStatus = ReplicationStatus.COMPLETED;
}

final long replicationEndTime = System.currentTimeMillis();

final SyncStats totalSyncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
Expand All @@ -212,7 +221,13 @@ else if (hasFailed.get()) {
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage())
.withMaxSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null));
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withReplicationStartTime(replicationStartTime)
.withReplicationEndTime(replicationEndTime > 0 ? replicationEndTime : null)
.withSourceReadStartTime(sourceReadStartTime > 0 ? sourceReadStartTime : null)
.withSourceReadEndTime(timeHolder.getSourceReadEndTime() > 0 ? timeHolder.getSourceReadEndTime() : null)
.withDestinationWriteStartTime(destinationWriteStartTime > 0 ? destinationWriteStartTime : null)
.withDestinationWriteEndTime(timeHolder.getDestinationWriteEndTime() > 0 ? timeHolder.getDestinationWriteEndTime() : null);

if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
Expand Down Expand Up @@ -310,6 +325,29 @@ else if (hasFailed.get()) {

}

private class ThreadedTimeHolder {
evantahler marked this conversation as resolved.
Show resolved Hide resolved
evantahler marked this conversation as resolved.
Show resolved Hide resolved

private long sourceReadEndTime = -1;
private long destinationWriteEndTime = -1;
evantahler marked this conversation as resolved.
Show resolved Hide resolved

public synchronized void trackSourceReadEndTime() {
this.sourceReadEndTime = System.currentTimeMillis();
}

public synchronized void trackDestinationWriteEndTime() {
this.destinationWriteEndTime = System.currentTimeMillis();
}

public synchronized long getSourceReadEndTime() {
return this.sourceReadEndTime;
}

public synchronized long getDestinationWriteEndTime() {
davinchia marked this conversation as resolved.
Show resolved Hide resolved
return this.destinationWriteEndTime;
}

}

@SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
private static Runnable getReplicationRunnable(final AirbyteSource source,
final AirbyteDestination destination,
Expand All @@ -318,7 +356,8 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter) {
final WorkerMetricReporter metricReporter,
final ThreadedTimeHolder timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -362,6 +401,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
}
}
}
timeHolder.trackSourceReadEndTime();
LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted()));
if (!validationErrors.isEmpty()) {
validationErrors.forEach((stream, errorPair) -> {
Expand Down Expand Up @@ -431,7 +471,8 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid
private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
final Map<String, String> mdc) {
final Map<String, String> mdc,
final ThreadedTimeHolder timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Destination output thread started.");
evantahler marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -448,6 +489,7 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de
messageTracker.acceptFromDestination(messageOptional.get());
}
}
timeHolder.trackDestinationWriteEndTime();
if (!cancelled.get() && destination.getExitValue() != 0) {
throw new DestinationException("Destination process exited with non-zero exit code " + destination.getExitValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,11 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
Jsons.jsonNode(actual));
assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ","));

// remove times so we can do the rest of the object <> object comparison.
actual.getReplicationAttemptSummary().withStartTime(null);
actual.getReplicationAttemptSummary().withEndTime(null);
// remove times, so we can do the rest of the object <> object comparison.
actual.getReplicationAttemptSummary().withStartTime(null).withEndTime(null).getTotalStats().withReplicationStartTime(null)
.withReplicationEndTime(null)
.withSourceReadStartTime(null).withSourceReadEndTime(null)
.withDestinationWriteStartTime(null).withDestinationWriteEndTime(null);

assertEquals(replicationOutput, actual);
}
Expand Down Expand Up @@ -631,7 +633,9 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
.withDestinationStateMessagesEmitted(null)));

assertNotNull(actual);
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats());
// null out timing stats for assertion matching
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats().withReplicationStartTime(null).withReplicationEndTime(null)
.withSourceReadStartTime(null).withSourceReadEndTime(null).withDestinationWriteStartTime(null).withDestinationWriteEndTime(null));
assertEquals(expectedStreamStats, actual.getReplicationAttemptSummary().getStreamStats());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,21 @@ properties:
type: integer
meanSecondsBetweenStateMessageEmittedandCommitted:
type: integer
replicationStartTime:
evantahler marked this conversation as resolved.
Show resolved Hide resolved
type: integer
description: The start of the replication activity
replicationEndTime:
type: integer
description: The end of the replication activity
sourceReadStartTime:
type: integer
description: The boot time of the source container/pod
sourceReadEndTime:
type: integer
description: The exit time of the source container/pod
destinationWriteStartTime:
type: integer
description: The boot time of the destination container/pod
destinationWriteEndTime:
type: integer
description: The exit time of the destination container/pod
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.ScheduleData;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.Job;
Expand Down Expand Up @@ -110,6 +112,9 @@ public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
final JobOutput jobOutput = lastAttempt.getOutput().get();
if (jobOutput.getSync() != null) {
final StandardSyncSummary syncSummary = jobOutput.getSync().getStandardSyncSummary();
final SyncStats totalStats = syncSummary.getTotalStats();
evantahler marked this conversation as resolved.
Show resolved Hide resolved
final NormalizationSummary normalizationSummary = jobOutput.getSync().getNormalizationSummary();

if (syncSummary.getStartTime() != null)
metadata.put("sync_start_time", syncSummary.getStartTime());
if (syncSummary.getEndTime() != null && syncSummary.getStartTime() != null)
Expand All @@ -118,22 +123,42 @@ public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
metadata.put("volume_mb", syncSummary.getBytesSynced());
if (syncSummary.getRecordsSynced() != null)
metadata.put("volume_rows", syncSummary.getRecordsSynced());
if (syncSummary.getTotalStats().getSourceStateMessagesEmitted() != null)
if (totalStats.getSourceStateMessagesEmitted() != null)
evantahler marked this conversation as resolved.
Show resolved Hide resolved
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
if (syncSummary.getTotalStats().getDestinationStateMessagesEmitted() != null)
if (totalStats.getDestinationStateMessagesEmitted() != null)
metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
if (syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted() != null)
if (totalStats.getMaxSecondsBeforeSourceStateMessageEmitted() != null)
metadata.put("max_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted());
if (syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted() != null)
totalStats.getMaxSecondsBeforeSourceStateMessageEmitted());
if (totalStats.getMeanSecondsBeforeSourceStateMessageEmitted() != null)
metadata.put("mean_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted());
if (syncSummary.getTotalStats().getMaxSecondsBetweenStateMessageEmittedandCommitted() != null)
totalStats.getMeanSecondsBeforeSourceStateMessageEmitted());
if (totalStats.getMaxSecondsBetweenStateMessageEmittedandCommitted() != null)
metadata.put("max_seconds_between_state_message_emit_and_commit",
syncSummary.getTotalStats().getMaxSecondsBetweenStateMessageEmittedandCommitted());
if (syncSummary.getTotalStats().getMeanSecondsBetweenStateMessageEmittedandCommitted() != null)
totalStats.getMaxSecondsBetweenStateMessageEmittedandCommitted());
if (totalStats.getMeanSecondsBetweenStateMessageEmittedandCommitted() != null)
metadata.put("mean_seconds_between_state_message_emit_and_commit",
syncSummary.getTotalStats().getMeanSecondsBetweenStateMessageEmittedandCommitted());
totalStats.getMeanSecondsBetweenStateMessageEmittedandCommitted());

if (totalStats.getReplicationStartTime() != null)
metadata.put("replication_start_time", totalStats.getReplicationStartTime());
if (totalStats.getReplicationEndTime() != null)
metadata.put("replication_end_time", totalStats.getReplicationEndTime());
if (totalStats.getSourceReadStartTime() != null)
metadata.put("source_read_start_time", totalStats.getSourceReadStartTime());
if (totalStats.getSourceReadEndTime() != null)
metadata.put("source_read_end_time", totalStats.getSourceReadEndTime());
if (totalStats.getDestinationWriteStartTime() != null)
metadata.put("destination_write_start_time", totalStats.getDestinationWriteStartTime());
if (totalStats.getDestinationWriteEndTime() != null)
metadata.put("destination_write_end_time", totalStats.getDestinationWriteEndTime());

if (normalizationSummary != null) {
if (normalizationSummary.getStartTime() != null)
metadata.put("normalization_start_time", normalizationSummary.getStartTime());
if (normalizationSummary.getEndTime() != null)
metadata.put("normalization_end_time", normalizationSummary.getEndTime());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Metadata;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.Schedule;
import io.airbyte.config.Schedule.TimeUnit;
import io.airbyte.config.StandardCheckConnectionOutput;
Expand Down Expand Up @@ -124,6 +125,14 @@ class JobTrackerTest {
.put("mean_seconds_before_source_state_message_emitted", 4L)
.put("max_seconds_between_state_message_emit_and_commit", 7L)
.put("mean_seconds_between_state_message_emit_and_commit", 6L)
.put("replication_start_time", 7L)
.put("replication_end_time", 8L)
.put("source_read_start_time", 9L)
.put("source_read_end_time", 10L)
.put("destination_write_start_time", 11L)
.put("destination_write_end_time", 12L)
.put("normalization_start_time", 13L)
.put("normalization_end_time", 14L)
.build();
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
Expand Down Expand Up @@ -566,13 +575,15 @@ private Attempt getAttemptMock() {
final JobOutput jobOutput = mock(JobOutput.class);
final StandardSyncOutput syncOutput = mock(StandardSyncOutput.class);
final StandardSyncSummary syncSummary = mock(StandardSyncSummary.class);
final NormalizationSummary normalizationSummary = mock(NormalizationSummary.class);
final SyncStats syncStats = mock(SyncStats.class);

when(syncSummary.getStartTime()).thenReturn(SYNC_START_TIME);
when(syncSummary.getEndTime()).thenReturn(SYNC_END_TIME);
when(syncSummary.getBytesSynced()).thenReturn(SYNC_BYTES_SYNC);
when(syncSummary.getRecordsSynced()).thenReturn(SYNC_RECORDS_SYNC);
when(syncOutput.getStandardSyncSummary()).thenReturn(syncSummary);
when(syncOutput.getNormalizationSummary()).thenReturn(normalizationSummary);
when(syncSummary.getTotalStats()).thenReturn(syncStats);
when(jobOutput.getSync()).thenReturn(syncOutput);
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
Expand All @@ -582,6 +593,15 @@ private Attempt getAttemptMock() {
when(syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()).thenReturn(4L);
when(syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()).thenReturn(7L);
when(syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()).thenReturn(6L);
when(syncStats.getReplicationStartTime()).thenReturn(7L);
when(syncStats.getReplicationEndTime()).thenReturn(8L);
when(syncStats.getSourceReadStartTime()).thenReturn(9L);
when(syncStats.getSourceReadEndTime()).thenReturn(10L);
when(syncStats.getDestinationWriteStartTime()).thenReturn(11L);
when(syncStats.getDestinationWriteEndTime()).thenReturn(12L);
when(normalizationSummary.getStartTime()).thenReturn(13L);
when(normalizationSummary.getEndTime()).thenReturn(14L);

return attempt;
}

Expand Down
Loading