Skip to content

Commit

Permalink
Trace errors in orchestrator workers (airbytehq#18381)
Browse files Browse the repository at this point in the history
* Trace errors in orchestrator workers

* Trace cancel method of workers

* Formatting

* Restore final modifier
  • Loading branch information
jdpgrailsdev authored and jhammarstedt committed Oct 31, 2022
1 parent 98f4b71 commit 39eb14f
Show file tree
Hide file tree
Showing 36 changed files with 260 additions and 134 deletions.
1 change: 1 addition & 0 deletions airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-text:1.10.0'
implementation libs.bundles.datadog

implementation project(':airbyte-api')
implementation project(':airbyte-commons-protocol')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.workers.Worker;
import io.airbyte.workers.exception.WorkerException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -40,10 +47,12 @@ public DbtTransformationWorker(final String jobId,
this.cancelled = new AtomicBoolean(false);
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) throws WorkerException {
final long startTime = System.currentTimeMillis();
LineGobbler.startSection("DBT TRANSFORMATION");
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot));

try (dbtTransformationRunner) {
LOGGER.info("Running dbt transformation.");
Expand All @@ -59,6 +68,7 @@ public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) thr
throw new WorkerException("DBT Transformation Failed.");
}
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException("Dbt Transformation Failed.", e);
}
if (cancelled.get()) {
Expand All @@ -72,14 +82,16 @@ public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) thr
return null;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {
LOGGER.info("Cancelling Dbt Transformation runner...");
try {
cancelled.set(true);
dbtTransformationRunner.close();
} catch (final Exception e) {
e.printStackTrace();
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Unable to cancel Dbt Transformation runner.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
Expand All @@ -13,6 +17,7 @@
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -52,10 +57,11 @@ public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLaunche
this(integrationLauncher, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException {
LineGobbler.startSection("CHECK");

ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot));
try {
process = integrationLauncher.check(
jobRoot,
Expand Down Expand Up @@ -95,11 +101,13 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
}

} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Unexpected error while checking connection: ", e);
throw new WorkerException("Unexpected error while getting checking connection.", e);
}
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {
WorkerUtils.cancelProcess(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTOR_VERSION_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -23,6 +30,7 @@
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -56,8 +64,10 @@ public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
this(configRepository, integrationLauncher, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException {
ApmTraceUtils.addTagsToTrace(generateTraceTags(discoverSchemaInput, jobRoot));
try {
process = integrationLauncher.discover(
jobRoot,
Expand Down Expand Up @@ -101,12 +111,32 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
String.format("Discover job subprocess finished with exit code %s", exitCode));
}
} catch (final WorkerException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw e;
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException("Error while discovering schema", e);
}
}

private Map<String, Object> generateTraceTags(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) {
final Map<String, Object> tags = new HashMap<>();

tags.put(JOB_ROOT_KEY, jobRoot);

if (discoverSchemaInput != null) {
if (discoverSchemaInput.getSourceId() != null) {
tags.put(SOURCE_ID_KEY, discoverSchemaInput.getSourceId());
}
if (discoverSchemaInput.getConnectorVersion() != null) {
tags.put(CONNECTOR_VERSION_KEY, discoverSchemaInput.getConnectorVersion());
}
}

return tags;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {
WorkerUtils.cancelProcess(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand Down Expand Up @@ -47,8 +53,10 @@ public DefaultGetSpecWorker(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) throws WorkerException {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, config.getDockerImage()));
try {
process = integrationLauncher.spec(jobRoot);

Expand Down Expand Up @@ -90,6 +98,7 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot)

}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {
WorkerUtils.cancelProcess(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.FailureReason;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
Expand All @@ -19,6 +25,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,10 +57,13 @@ public DefaultNormalizationWorker(final String jobId,
this.cancelled = new AtomicBoolean(false);
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public NormalizationSummary run(final NormalizationInput input, final Path jobRoot) throws WorkerException {
final long startTime = System.currentTimeMillis();

ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot));

try (normalizationRunner) {
LineGobbler.startSection("DEFAULT NORMALIZATION");
normalizationRunner.start();
Expand All @@ -69,6 +79,7 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
buildFailureReasonsAndSetFailure();
}
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
buildFailureReasonsAndSetFailure();
}

Expand Down Expand Up @@ -105,14 +116,16 @@ private void buildFailureReasonsAndSetFailure() {
failed = true;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {
LOGGER.info("Cancelling normalization runner...");
try {
cancelled.set(true);
normalizationRunner.close();
} catch (final Exception e) {
e.printStackTrace();
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Unable to cancel normalization runner.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.FailureReason;
import io.airbyte.config.ReplicationAttemptSummary;
Expand All @@ -16,6 +22,7 @@
import io.airbyte.config.SyncStats;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -120,6 +127,7 @@ public DefaultReplicationWorker(final String jobId,
* @return output of the replication attempt (including state)
* @throws WorkerException
*/
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt);
Expand All @@ -146,6 +154,8 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path

final Map<String, String> mdc = MDC.getCopyOfContextMap();

ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot));

// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
try (destination; source) {
Expand Down Expand Up @@ -195,6 +205,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path

} catch (final Exception e) {
hasFailed.set(true);
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Sync worker failed.", e);
} finally {
executors.shutdownNow();
Expand Down Expand Up @@ -321,6 +332,7 @@ else if (hasFailed.get()) {
LineGobbler.endSection("REPLICATION");
return output;
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException("Sync failed", e);
}

Expand Down Expand Up @@ -489,33 +501,52 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de
};
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {
// Resources are closed in the opposite order they are declared.
LOGGER.info("Cancelling replication worker...");
try {
executors.awaitTermination(10, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
e.printStackTrace();
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Unable to cancel due to interruption.", e);
}
cancelled.set(true);

LOGGER.info("Cancelling destination...");
try {
destination.cancel();
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.info("Error cancelling destination: ", e);
}

LOGGER.info("Cancelling source...");
try {
source.cancel();
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.info("Error cancelling source: ", e);
}

}

private Map<String, Object> generateTraceTags(final WorkerDestinationConfig destinationConfig, final Path jobRoot) {
final Map<String, Object> tags = new HashMap<>();

tags.put(JOB_ID_KEY, jobId);
tags.put(JOB_ROOT_KEY, jobRoot);

if (destinationConfig != null) {
if (destinationConfig.getConnectionId() != null) {
tags.put(CONNECTION_ID_KEY, destinationConfig.getConnectionId());
}
}

return tags;
}

private static class SourceException extends RuntimeException {

SourceException(final String message) {
Expand Down
Loading

0 comments on commit 39eb14f

Please sign in to comment.