From fad7ff2fdb2afd1d0ed03d0fc74e9fa33bcf4dde Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Thu, 3 Nov 2022 12:02:04 -0400 Subject: [PATCH] Record replication stats as part of trace (#18909) * Record replication stats as part of trace * PR feedback --- .../metrics/lib/ApmTraceConstants.java | 17 ++++++++++ .../metrics/lib/OssMetricsRegistry.java | 8 ++++- .../sync/ReplicationActivityImpl.java | 32 ++++++++++++++++++- 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java index 4fa0cf52e6f6..ae0cd6bd2c5b 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java @@ -73,6 +73,23 @@ public static final class Tags { */ public static final String PROCESS_EXIT_VALUE_KEY = "process.exit_value"; + /** + * Name of the APM trace tag that holds the replication bytes synced value associated with the + * trace. + */ + public static final String REPLICATION_BYTES_SYNCED_KEY = "replication.bytes_synced"; + + /** + * Name of the APM trace tag that holds the replication records synced value associated with the + * trace. + */ + public static final String REPLICATION_RECORDS_SYNCED_KEY = "replication.records_synced"; + + /** + * Name of the APM trace tag that holds the replication status value associated with the trace. + */ + public static final String REPLICATION_STATUS_KEY = "replication.status"; + /** * Name of the APM trace tag that holds the source Docker image value associated with the trace. */ diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java index 6962b7c56c01..8ce51860395e 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java @@ -128,7 +128,13 @@ public enum OssMetricsRegistry implements MetricsRegistry { "count of the number of successful workflow syncs."), TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER, "temporal_workflow_failure", - "count of the number of workflow failures"); + "count of the number of workflow failures"), + REPLICATION_BYTES_SYNCED(MetricEmittingApps.WORKER, + "replication_bytes_synced", + "number of bytes synced during replication"), + REPLICATION_RECORDS_SYNCED(MetricEmittingApps.WORKER, + "replication_records_synced", + "number of records synced during replication"); private final MetricEmittingApp application; private final String metricName; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 7aaed5e278b7..2c7369b4d858 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -7,6 +7,9 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_BYTES_SYNCED_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_RECORDS_SYNCED_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_STATUS_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY; import datadog.trace.api.Trace; @@ -36,6 +39,7 @@ import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricClientFactory; import io.airbyte.metrics.lib.MetricEmittingApps; +import io.airbyte.metrics.lib.OssMetricsRegistry; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; @@ -59,11 +63,13 @@ import io.airbyte.workers.sync.ReplicationLauncherWorker; import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.micronaut.context.annotation.Value; +import io.micronaut.core.util.CollectionUtils; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; import jakarta.inject.Named; import jakarta.inject.Singleton; import java.nio.file.Path; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -185,7 +191,7 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig, final String standardSyncOutputString = standardSyncOutput.toString(); LOGGER.info("sync summary: {}", standardSyncOutputString); if (standardSyncOutputString.length() > MAX_TEMPORAL_MESSAGE_SIZE) { - LOGGER.error("Sync ouput exceeds the max temporal message size of {}, actual is {}.", MAX_TEMPORAL_MESSAGE_SIZE, + LOGGER.error("Sync output exceeds the max temporal message size of {}, actual is {}.", MAX_TEMPORAL_MESSAGE_SIZE, standardSyncOutputString.length()); } else { LOGGER.info("Sync summary length: {}", standardSyncOutputString.length()); @@ -201,6 +207,8 @@ private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutpu final StandardSyncSummary syncSummary = new StandardSyncSummary(); final ReplicationAttemptSummary replicationSummary = output.getReplicationAttemptSummary(); + traceReplicationSummary(replicationSummary); + syncSummary.setBytesSynced(replicationSummary.getBytesSynced()); syncSummary.setRecordsSynced(replicationSummary.getRecordsSynced()); syncSummary.setStartTime(replicationSummary.getStartTime()); @@ -217,6 +225,28 @@ private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutpu return standardSyncOutput; } + private static void traceReplicationSummary(final ReplicationAttemptSummary replicationSummary) { + if (replicationSummary == null) { + return; + } + + final Map tags = new HashMap<>(); + if (replicationSummary.getBytesSynced() != null) { + tags.put(REPLICATION_BYTES_SYNCED_KEY, replicationSummary.getBytesSynced()); + MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_BYTES_SYNCED, replicationSummary.getBytesSynced()); + } + if (replicationSummary.getRecordsSynced() != null) { + tags.put(REPLICATION_RECORDS_SYNCED_KEY, replicationSummary.getRecordsSynced()); + MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_RECORDS_SYNCED, replicationSummary.getRecordsSynced()); + } + if (replicationSummary.getStatus() != null) { + tags.put(REPLICATION_STATUS_KEY, replicationSummary.getStatus().value()); + } + if (!tags.isEmpty()) { + ApmTraceUtils.addTagsToTrace(tags); + } + } + private CheckedSupplier, Exception> getLegacyWorkerFactory(final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig,