Skip to content

Commit

Permalink
Record replication stats as part of trace (#18909)
Browse files Browse the repository at this point in the history
* Record replication stats as part of trace

* PR feedback
  • Loading branch information
jdpgrailsdev authored Nov 3, 2022
1 parent fcf4758 commit fad7ff2
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,23 @@ public static final class Tags {
*/
public static final String PROCESS_EXIT_VALUE_KEY = "process.exit_value";

/**
* Name of the APM trace tag that holds the replication bytes synced value associated with the
* trace.
*/
public static final String REPLICATION_BYTES_SYNCED_KEY = "replication.bytes_synced";

/**
* Name of the APM trace tag that holds the replication records synced value associated with the
* trace.
*/
public static final String REPLICATION_RECORDS_SYNCED_KEY = "replication.records_synced";

/**
* Name of the APM trace tag that holds the replication status value associated with the trace.
*/
public static final String REPLICATION_STATUS_KEY = "replication.status";

/**
* Name of the APM trace tag that holds the source Docker image value associated with the trace.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,13 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"count of the number of successful workflow syncs."),
TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER,
"temporal_workflow_failure",
"count of the number of workflow failures");
"count of the number of workflow failures"),
REPLICATION_BYTES_SYNCED(MetricEmittingApps.WORKER,
"replication_bytes_synced",
"number of bytes synced during replication"),
REPLICATION_RECORDS_SYNCED(MetricEmittingApps.WORKER,
"replication_records_synced",
"number of records synced during replication");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_BYTES_SYNCED_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_RECORDS_SYNCED_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_STATUS_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY;

import datadog.trace.api.Trace;
Expand Down Expand Up @@ -36,6 +39,7 @@
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
Expand All @@ -59,11 +63,13 @@
import io.airbyte.workers.sync.ReplicationLauncherWorker;
import io.airbyte.workers.temporal.TemporalAttemptExecution;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.util.CollectionUtils;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -185,7 +191,7 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,
final String standardSyncOutputString = standardSyncOutput.toString();
LOGGER.info("sync summary: {}", standardSyncOutputString);
if (standardSyncOutputString.length() > MAX_TEMPORAL_MESSAGE_SIZE) {
LOGGER.error("Sync ouput exceeds the max temporal message size of {}, actual is {}.", MAX_TEMPORAL_MESSAGE_SIZE,
LOGGER.error("Sync output exceeds the max temporal message size of {}, actual is {}.", MAX_TEMPORAL_MESSAGE_SIZE,
standardSyncOutputString.length());
} else {
LOGGER.info("Sync summary length: {}", standardSyncOutputString.length());
Expand All @@ -201,6 +207,8 @@ private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutpu
final StandardSyncSummary syncSummary = new StandardSyncSummary();
final ReplicationAttemptSummary replicationSummary = output.getReplicationAttemptSummary();

traceReplicationSummary(replicationSummary);

syncSummary.setBytesSynced(replicationSummary.getBytesSynced());
syncSummary.setRecordsSynced(replicationSummary.getRecordsSynced());
syncSummary.setStartTime(replicationSummary.getStartTime());
Expand All @@ -217,6 +225,28 @@ private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutpu
return standardSyncOutput;
}

private static void traceReplicationSummary(final ReplicationAttemptSummary replicationSummary) {
if (replicationSummary == null) {
return;
}

final Map<String, Object> tags = new HashMap<>();
if (replicationSummary.getBytesSynced() != null) {
tags.put(REPLICATION_BYTES_SYNCED_KEY, replicationSummary.getBytesSynced());
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_BYTES_SYNCED, replicationSummary.getBytesSynced());
}
if (replicationSummary.getRecordsSynced() != null) {
tags.put(REPLICATION_RECORDS_SYNCED_KEY, replicationSummary.getRecordsSynced());
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_RECORDS_SYNCED, replicationSummary.getRecordsSynced());
}
if (replicationSummary.getStatus() != null) {
tags.put(REPLICATION_STATUS_KEY, replicationSummary.getStatus().value());
}
if (!tags.isEmpty()) {
ApmTraceUtils.addTagsToTrace(tags);
}
}

private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception> getLegacyWorkerFactory(final IntegrationLauncherConfig sourceLauncherConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig,
Expand Down

0 comments on commit fad7ff2

Please sign in to comment.