Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace launcher worker run method #18441

Merged
merged 3 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@

package io.airbyte.workers.sync;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.PROCESS_EXIT_VALUE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.google.common.base.Stopwatch;
import datadog.trace.api.Trace;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.sync.OrchestratorConstants;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.Worker;
Expand Down Expand Up @@ -91,6 +99,7 @@ public LauncherWorker(final UUID connectionId,
this.temporalUtils = temporalUtils;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException {
final AtomicBoolean isCanceled = new AtomicBoolean(false);
Expand Down Expand Up @@ -132,6 +141,8 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
podName,
mainContainerInfo);

ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobRunConfig.getJobId(), JOB_ROOT_KEY, jobRoot));

// Use the configuration to create the process.
process = new AsyncOrchestratorPodProcess(
kubePodInfo,
Expand Down Expand Up @@ -165,6 +176,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
fileMap,
portMap);
} catch (final KubernetesClientException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException(
"Failed to create pod " + podName + ", pre-existing pod exists which didn't advance out of the NOT_STARTED state.", e);
}
Expand All @@ -174,18 +186,24 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
process.waitFor();

if (cancelled.get()) {
throw new CancellationException();
final CancellationException e = new CancellationException();
ApmTraceUtils.addExceptionToTrace(e);
throw e;
}

final int asyncProcessExitValue = process.exitValue();
if (asyncProcessExitValue != 0) {
throw new WorkerException("Orchestrator process exited with non-zero exit code: " + asyncProcessExitValue);
final WorkerException e = new WorkerException("Orchestrator process exited with non-zero exit code: " + asyncProcessExitValue);
ApmTraceUtils.addTagsToTrace(Map.of(PROCESS_EXIT_VALUE_KEY, asyncProcessExitValue));
ApmTraceUtils.addExceptionToTrace(e);
throw e;
}

final var output = process.getOutput();

return output.map(s -> Jsons.deserialize(s, outputClass)).orElse(null);
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to consider this an exception? It looks like if someone cancels a sync we get into here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alovew Maybe? I'm starting from the position of let's record them all and see what makes sense. Right now, we are not seeing anything in DataDog which is a bit suspicious. I do suspect that once we get this into production we will see some errors that are not really exception and can be removed.

if (cancelled.get()) {
try {
log.info("Destroying process due to cancellation.");
Expand Down Expand Up @@ -231,7 +249,9 @@ private void killRunningPodsForConnection() {
if (runningPods.isEmpty()) {
log.info("Successfully deleted all running pods for the connection!");
} else {
throw new RuntimeException("Unable to delete pods: " + getPodNames(runningPods).toString());
final RuntimeException e = new RuntimeException("Unable to delete pods: " + getPodNames(runningPods).toString());
ApmTraceUtils.addExceptionToTrace(e);
throw e;
}
}

Expand All @@ -250,6 +270,7 @@ private List<Pod> getNonTerminalPodsWithLabels() {
.collect(Collectors.toList());
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {
cancelled.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static final class Tags {
*/
public static final String JOB_ROOT_KEY = "job_root";

/**
* Name of the APM trace tag that holds the process exit value associated with the trace.
*/
public static final String PROCESS_EXIT_VALUE_KEY = "process.exit_value";

/**
* Name of the APM trace tag that holds the source Docker image value associated with the trace.
*/
Expand Down