From b323ccd66addd73b0a8f78599dd8dd56b33a58f1 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Tue, 25 Oct 2022 11:45:12 -0400 Subject: [PATCH] Additional worker APM tracing (#18427) --- .../workers/internal/AirbyteMessageTracker.java | 5 +++++ .../internal/AirbyteProtocolPredicate.java | 4 ++++ .../internal/DefaultAirbyteDestination.java | 11 +++++++++++ .../workers/internal/DefaultAirbyteSource.java | 9 +++++++++ .../internal/DefaultAirbyteStreamFactory.java | 4 ++++ .../workers/internal/EmptyAirbyteSource.java | 9 +++++++++ .../workers/internal/StateDeltaTracker.java | 6 ++++++ .../internal/VersionedAirbyteStreamFactory.java | 8 ++++++-- .../state_aggregator/SingleStateAggregator.java | 5 +++++ .../state_aggregator/StreamStateAggregator.java | 5 +++++ .../process/AirbyteIntegrationLauncher.java | 17 +++++++++++++++++ 11 files changed, 81 insertions(+), 2 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index 259d8b3f192c..b6a03567c017 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -4,12 +4,15 @@ package io.airbyte.workers.internal; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import datadog.trace.api.Trace; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.config.FailureReason; @@ -98,6 +101,7 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, this.stateAggregator = stateAggregator; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void acceptFromSource(final AirbyteMessage message) { logMessageAsJSON("source", message); @@ -110,6 +114,7 @@ public void acceptFromSource(final AirbyteMessage message) { } } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void acceptFromDestination(final AirbyteMessage message) { logMessageAsJSON("destination", message); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteProtocolPredicate.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteProtocolPredicate.java index bd61e714b670..d07104b88994 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteProtocolPredicate.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteProtocolPredicate.java @@ -4,7 +4,10 @@ package io.airbyte.workers.internal; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.fasterxml.jackson.databind.JsonNode; +import datadog.trace.api.Trace; import io.airbyte.protocol.models.AirbyteProtocolSchema; import io.airbyte.validation.json.JsonSchemaValidator; import java.util.function.Predicate; @@ -23,6 +26,7 @@ public AirbyteProtocolPredicate() { schema = JsonSchemaValidator.getSchema(AirbyteProtocolSchema.PROTOCOL.getFile(), "AirbyteMessage"); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public boolean test(final JsonNode s) { return jsonSchemaValidator.test(schema, s); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java index b866d9dbf740..8520fbbfa4ca 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java @@ -4,8 +4,11 @@ package io.airbyte.workers.internal; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import datadog.trace.api.Trace; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; @@ -58,6 +61,7 @@ public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher, this.streamFactory = streamFactory; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void start(final WorkerDestinationConfig destinationConfig, final Path jobRoot) throws IOException, WorkerException { Preconditions.checkState(destinationProcess == null); @@ -79,6 +83,7 @@ public void start(final WorkerDestinationConfig destinationConfig, final Path jo .iterator(); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void accept(final AirbyteMessage message) throws IOException { Preconditions.checkState(destinationProcess != null && !inputHasEnded.get()); @@ -87,6 +92,7 @@ public void accept(final AirbyteMessage message) throws IOException { writer.newLine(); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void notifyEndOfInput() throws IOException { Preconditions.checkState(destinationProcess != null && !inputHasEnded.get()); @@ -96,6 +102,7 @@ public void notifyEndOfInput() throws IOException { inputHasEnded.set(true); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void close() throws Exception { if (destinationProcess == null) { @@ -116,6 +123,7 @@ public void close() throws Exception { } } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() throws Exception { LOGGER.info("Attempting to cancel destination process..."); @@ -129,6 +137,7 @@ public void cancel() throws Exception { } } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public boolean isFinished() { Preconditions.checkState(destinationProcess != null); @@ -139,6 +148,7 @@ public boolean isFinished() { return !messageIterator.hasNext() && !destinationProcess.isAlive(); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public int getExitValue() { Preconditions.checkState(destinationProcess != null, "Destination process is null, cannot retrieve exit value."); @@ -151,6 +161,7 @@ public int getExitValue() { return exitValue; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Optional attemptRead() { Preconditions.checkState(destinationProcess != null); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java index 8a94d3a05599..1f7767a0ae97 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java @@ -4,8 +4,11 @@ package io.airbyte.workers.internal; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import datadog.trace.api.Trace; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; @@ -63,6 +66,7 @@ public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) { this.heartbeatMonitor = heartbeatMonitor; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) throws Exception { Preconditions.checkState(sourceProcess == null); @@ -85,6 +89,7 @@ public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) thr .iterator(); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public boolean isFinished() { Preconditions.checkState(sourceProcess != null); @@ -96,6 +101,7 @@ public boolean isFinished() { return !messageIterator.hasNext() && !sourceProcess.isAlive(); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public int getExitValue() throws IllegalStateException { Preconditions.checkState(sourceProcess != null, "Source process is null, cannot retrieve exit value."); @@ -108,6 +114,7 @@ public int getExitValue() throws IllegalStateException { return exitValue; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Optional attemptRead() { Preconditions.checkState(sourceProcess != null); @@ -115,6 +122,7 @@ public Optional attemptRead() { return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void close() throws Exception { if (sourceProcess == null) { @@ -134,6 +142,7 @@ public void close() throws Exception { } } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() throws Exception { LOGGER.info("Attempting to cancel source process..."); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java index c8bd56efa0ab..21ae3f239313 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java @@ -4,7 +4,10 @@ package io.airbyte.workers.internal; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.fasterxml.jackson.databind.JsonNode; +import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.MdcScope; import io.airbyte.metrics.lib.MetricClientFactory; @@ -52,6 +55,7 @@ public DefaultAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder this.containerLogMdcBuilder = containerLogMdcBuilder; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Stream create(final BufferedReader bufferedReader) { final var metricClient = MetricClientFactory.getMetricClient(); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java index 9bbeb99be12a..ad46f140cf75 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java @@ -4,6 +4,9 @@ package io.airbyte.workers.internal; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + +import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ResetSourceConfiguration; import io.airbyte.config.StateType; @@ -50,6 +53,7 @@ public EmptyAirbyteSource(final boolean useStreamCapableState) { this.useStreamCapableState = useStreamCapableState; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoot) throws Exception { @@ -105,16 +109,19 @@ public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoo } // always finished. it has no data to send. + @Trace(operationName = WORKER_OPERATION_NAME) @Override public boolean isFinished() { return hasEmittedState.get(); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public int getExitValue() { return 0; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Optional attemptRead() { if (!isStarted) { @@ -134,11 +141,13 @@ public Optional attemptRead() { } } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void close() throws Exception { // no op. } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() throws Exception { // no op. diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/StateDeltaTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/StateDeltaTracker.java index 480b4678f076..beab1e0b63e1 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/StateDeltaTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/StateDeltaTracker.java @@ -4,7 +4,10 @@ package io.airbyte.workers.internal; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.google.common.annotations.VisibleForTesting; +import datadog.trace.api.Trace; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -72,6 +75,7 @@ public StateDeltaTracker(final long memoryLimitBytes) { * @throws StateDeltaTrackerException thrown when the memory footprint of stateDeltas exceeds * available capacity. */ + @Trace(operationName = WORKER_OPERATION_NAME) public void addState(final int stateHash, final Map streamIndexToRecordCount) throws StateDeltaTrackerException { synchronized (this) { final int size = STATE_HASH_BYTES + (streamIndexToRecordCount.size() * BYTES_PER_STREAM); @@ -104,6 +108,7 @@ public void addState(final int stateHash, final Map streamIndexToRe * @throws StateDeltaTrackerException thrown when committed counts can no longer be reliably * computed. */ + @Trace(operationName = WORKER_OPERATION_NAME) public void commitStateHash(final int stateHash) throws StateDeltaTrackerException { synchronized (this) { if (capacityExceeded) { @@ -139,6 +144,7 @@ public void commitStateHash(final int stateHash) throws StateDeltaTrackerExcepti } } + @Trace(operationName = WORKER_OPERATION_NAME) public Map getStreamToCommittedRecords() { return streamToCommittedRecords; } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index b49190dc8658..65a2cd461fe1 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -4,8 +4,11 @@ package io.airbyte.workers.internal; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.MdcScope; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; @@ -74,6 +77,7 @@ public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProv * If detectVersion is set to true, it will decide which protocol version to use from the content of * the stream rather than the one passed from the constructor. */ + @Trace(operationName = WORKER_OPERATION_NAME) @SneakyThrows @Override public Stream create(final BufferedReader bufferedReader) { @@ -123,7 +127,7 @@ private Optional detectVersion(final BufferedReader bufferedReader) thr } bufferedReader.reset(); return Optional.empty(); - } catch (IOException e) { + } catch (final IOException e) { logger.warn( "Protocol version detection failed, it is likely than the connector sent more than {}B without an complete SPEC message." + " A SPEC message that is too long could be the root cause here.", @@ -156,7 +160,7 @@ protected Stream toAirbyteMessage(final JsonNode json) { try { final io.airbyte.protocol.models.v0.AirbyteMessage message = migrator.upgrade(deserializer.deserialize(json)); return Stream.of(convert(message)); - } catch (RuntimeException e) { + } catch (final RuntimeException e) { logger.warn("Failed to upgrade a message from version {}: {}", protocolVersion, Jsons.serialize(json), e); return Stream.empty(); } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java index 0cfe422ea1f7..7eaca78a662a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java @@ -4,6 +4,9 @@ package io.airbyte.workers.internal.state_aggregator; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + +import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; import io.airbyte.config.State; import io.airbyte.protocol.models.AirbyteStateMessage; @@ -14,11 +17,13 @@ class SingleStateAggregator implements StateAggregator { AirbyteStateMessage state; + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void ingest(final AirbyteStateMessage stateMessage) { state = stateMessage; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public State getAggregated() { if (state.getType() == null || state.getType() == AirbyteStateType.LEGACY) { diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java index 4d3247b2549d..dfe046eaf2c9 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java @@ -4,6 +4,9 @@ package io.airbyte.workers.internal.state_aggregator; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + +import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; import io.airbyte.config.State; import io.airbyte.protocol.models.AirbyteStateMessage; @@ -15,6 +18,7 @@ class StreamStateAggregator implements StateAggregator { Map aggregatedState = new HashMap<>(); + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void ingest(final AirbyteStateMessage stateMessage) { /** @@ -27,6 +31,7 @@ public void ingest(final AirbyteStateMessage stateMessage) { aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public State getAggregated() { diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 405293663946..7f8542f9b116 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -4,13 +4,20 @@ package io.airbyte.workers.process; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import datadog.trace.api.Trace; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.WorkerEnvConstants; +import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.workers.exception.WorkerException; import java.nio.file.Path; import java.util.Collections; @@ -63,8 +70,10 @@ public AirbyteIntegrationLauncher(final String jobId, this.featureFlags = new EnvVariableFeatureFlags(); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Process spec(final Path jobRoot) throws WorkerException { + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, imageName)); return processFactory.create( SPEC_JOB, jobId, @@ -81,8 +90,10 @@ public Process spec(final Path jobRoot) throws WorkerException { "spec"); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Process check(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException { + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, imageName)); return processFactory.create( CHECK_JOB, jobId, @@ -100,8 +111,10 @@ public Process check(final Path jobRoot, final String configFilename, final Stri CONFIG, configFilename); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Process discover(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException { + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, imageName)); return processFactory.create( DISCOVER_JOB, jobId, @@ -119,6 +132,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S CONFIG, configFilename); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Process read(final Path jobRoot, final String configFilename, @@ -128,6 +142,7 @@ public Process read(final Path jobRoot, final String stateFilename, final String stateContents) throws WorkerException { + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, imageName)); final List arguments = Lists.newArrayList( "read", CONFIG, configFilename, @@ -161,6 +176,7 @@ public Process read(final Path jobRoot, arguments.toArray(new String[arguments.size()])); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Process write(final Path jobRoot, final String configFilename, @@ -168,6 +184,7 @@ public Process write(final Path jobRoot, final String catalogFilename, final String catalogContents) throws WorkerException { + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, imageName)); final Map files = ImmutableMap.of( configFilename, configContents, catalogFilename, catalogContents);