Skip to content

Commit

Permalink
Trace launcher worker run method (airbytehq#18441)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored and jhammarstedt committed Oct 31, 2022
1 parent 81f6038 commit 5fcc634
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@

package io.airbyte.workers.sync;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.PROCESS_EXIT_VALUE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.google.common.base.Stopwatch;
import datadog.trace.api.Trace;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.sync.OrchestratorConstants;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.Worker;
Expand Down Expand Up @@ -91,6 +99,7 @@ public LauncherWorker(final UUID connectionId,
this.temporalUtils = temporalUtils;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException {
final AtomicBoolean isCanceled = new AtomicBoolean(false);
Expand Down Expand Up @@ -132,6 +141,8 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
podName,
mainContainerInfo);

ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobRunConfig.getJobId(), JOB_ROOT_KEY, jobRoot));

// Use the configuration to create the process.
process = new AsyncOrchestratorPodProcess(
kubePodInfo,
Expand Down Expand Up @@ -165,6 +176,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
fileMap,
portMap);
} catch (final KubernetesClientException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException(
"Failed to create pod " + podName + ", pre-existing pod exists which didn't advance out of the NOT_STARTED state.", e);
}
Expand All @@ -174,18 +186,24 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
process.waitFor();

if (cancelled.get()) {
throw new CancellationException();
final CancellationException e = new CancellationException();
ApmTraceUtils.addExceptionToTrace(e);
throw e;
}

final int asyncProcessExitValue = process.exitValue();
if (asyncProcessExitValue != 0) {
throw new WorkerException("Orchestrator process exited with non-zero exit code: " + asyncProcessExitValue);
final WorkerException e = new WorkerException("Orchestrator process exited with non-zero exit code: " + asyncProcessExitValue);
ApmTraceUtils.addTagsToTrace(Map.of(PROCESS_EXIT_VALUE_KEY, asyncProcessExitValue));
ApmTraceUtils.addExceptionToTrace(e);
throw e;
}

final var output = process.getOutput();

return output.map(s -> Jsons.deserialize(s, outputClass)).orElse(null);
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
if (cancelled.get()) {
try {
log.info("Destroying process due to cancellation.");
Expand Down Expand Up @@ -231,7 +249,9 @@ private void killRunningPodsForConnection() {
if (runningPods.isEmpty()) {
log.info("Successfully deleted all running pods for the connection!");
} else {
throw new RuntimeException("Unable to delete pods: " + getPodNames(runningPods).toString());
final RuntimeException e = new RuntimeException("Unable to delete pods: " + getPodNames(runningPods).toString());
ApmTraceUtils.addExceptionToTrace(e);
throw e;
}
}

Expand All @@ -250,6 +270,7 @@ private List<Pod> getNonTerminalPodsWithLabels() {
.collect(Collectors.toList());
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {
cancelled.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static final class Tags {
*/
public static final String JOB_ROOT_KEY = "job_root";

/**
* Name of the APM trace tag that holds the process exit value associated with the trace.
*/
public static final String PROCESS_EXIT_VALUE_KEY = "process.exit_value";

/**
* Name of the APM trace tag that holds the source Docker image value associated with the trace.
*/
Expand Down

0 comments on commit 5fcc634

Please sign in to comment.