diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java index 03c9bbdefc71..c433f922ce50 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java @@ -4,12 +4,20 @@ package io.airbyte.workers.sync; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.PROCESS_EXIT_VALUE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.google.common.base.Stopwatch; +import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.commons.temporal.sync.OrchestratorConstants; import io.airbyte.config.ResourceRequirements; +import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; import io.airbyte.workers.Worker; @@ -91,6 +99,7 @@ public LauncherWorker(final UUID connectionId, this.temporalUtils = temporalUtils; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException { final AtomicBoolean isCanceled = new AtomicBoolean(false); @@ -132,6 +141,8 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException podName, mainContainerInfo); + ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobRunConfig.getJobId(), JOB_ROOT_KEY, jobRoot)); + // Use the configuration to create the process. process = new AsyncOrchestratorPodProcess( kubePodInfo, @@ -165,6 +176,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException fileMap, portMap); } catch (final KubernetesClientException e) { + ApmTraceUtils.addExceptionToTrace(e); throw new WorkerException( "Failed to create pod " + podName + ", pre-existing pod exists which didn't advance out of the NOT_STARTED state.", e); } @@ -174,18 +186,24 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException process.waitFor(); if (cancelled.get()) { - throw new CancellationException(); + final CancellationException e = new CancellationException(); + ApmTraceUtils.addExceptionToTrace(e); + throw e; } final int asyncProcessExitValue = process.exitValue(); if (asyncProcessExitValue != 0) { - throw new WorkerException("Orchestrator process exited with non-zero exit code: " + asyncProcessExitValue); + final WorkerException e = new WorkerException("Orchestrator process exited with non-zero exit code: " + asyncProcessExitValue); + ApmTraceUtils.addTagsToTrace(Map.of(PROCESS_EXIT_VALUE_KEY, asyncProcessExitValue)); + ApmTraceUtils.addExceptionToTrace(e); + throw e; } final var output = process.getOutput(); return output.map(s -> Jsons.deserialize(s, outputClass)).orElse(null); } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); if (cancelled.get()) { try { log.info("Destroying process due to cancellation."); @@ -231,7 +249,9 @@ private void killRunningPodsForConnection() { if (runningPods.isEmpty()) { log.info("Successfully deleted all running pods for the connection!"); } else { - throw new RuntimeException("Unable to delete pods: " + getPodNames(runningPods).toString()); + final RuntimeException e = new RuntimeException("Unable to delete pods: " + getPodNames(runningPods).toString()); + ApmTraceUtils.addExceptionToTrace(e); + throw e; } } @@ -250,6 +270,7 @@ private List getNonTerminalPodsWithLabels() { .collect(Collectors.toList()); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() { cancelled.set(true); diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java index c636a9d935d6..4fa0cf52e6f6 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java @@ -68,6 +68,11 @@ public static final class Tags { */ public static final String JOB_ROOT_KEY = "job_root"; + /** + * Name of the APM trace tag that holds the process exit value associated with the trace. + */ + public static final String PROCESS_EXIT_VALUE_KEY = "process.exit_value"; + /** * Name of the APM trace tag that holds the source Docker image value associated with the trace. */