Skip to content

Commit

Permalink
Additional worker APM tracing (#18427)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Oct 25, 2022
1 parent 86cb666 commit b323ccd
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
Expand Down Expand Up @@ -98,6 +101,7 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
this.stateAggregator = stateAggregator;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void acceptFromSource(final AirbyteMessage message) {
logMessageAsJSON("source", message);
Expand All @@ -110,6 +114,7 @@ public void acceptFromSource(final AirbyteMessage message) {
}
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void acceptFromDestination(final AirbyteMessage message) {
logMessageAsJSON("destination", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.protocol.models.AirbyteProtocolSchema;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.util.function.Predicate;
Expand All @@ -23,6 +26,7 @@ public AirbyteProtocolPredicate() {
schema = JsonSchemaValidator.getSchema(AirbyteProtocolSchema.PROTOCOL.getFile(), "AirbyteMessage");
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public boolean test(final JsonNode s) {
return jsonSchemaValidator.test(schema, s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -58,6 +61,7 @@ public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
this.streamFactory = streamFactory;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void start(final WorkerDestinationConfig destinationConfig, final Path jobRoot) throws IOException, WorkerException {
Preconditions.checkState(destinationProcess == null);
Expand All @@ -79,6 +83,7 @@ public void start(final WorkerDestinationConfig destinationConfig, final Path jo
.iterator();
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void accept(final AirbyteMessage message) throws IOException {
Preconditions.checkState(destinationProcess != null && !inputHasEnded.get());
Expand All @@ -87,6 +92,7 @@ public void accept(final AirbyteMessage message) throws IOException {
writer.newLine();
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void notifyEndOfInput() throws IOException {
Preconditions.checkState(destinationProcess != null && !inputHasEnded.get());
Expand All @@ -96,6 +102,7 @@ public void notifyEndOfInput() throws IOException {
inputHasEnded.set(true);
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void close() throws Exception {
if (destinationProcess == null) {
Expand All @@ -116,6 +123,7 @@ public void close() throws Exception {
}
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() throws Exception {
LOGGER.info("Attempting to cancel destination process...");
Expand All @@ -129,6 +137,7 @@ public void cancel() throws Exception {
}
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public boolean isFinished() {
Preconditions.checkState(destinationProcess != null);
Expand All @@ -139,6 +148,7 @@ public boolean isFinished() {
return !messageIterator.hasNext() && !destinationProcess.isAlive();
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public int getExitValue() {
Preconditions.checkState(destinationProcess != null, "Destination process is null, cannot retrieve exit value.");
Expand All @@ -151,6 +161,7 @@ public int getExitValue() {
return exitValue;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(destinationProcess != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
Expand Down Expand Up @@ -63,6 +66,7 @@ public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
this.heartbeatMonitor = heartbeatMonitor;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) throws Exception {
Preconditions.checkState(sourceProcess == null);
Expand All @@ -85,6 +89,7 @@ public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) thr
.iterator();
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public boolean isFinished() {
Preconditions.checkState(sourceProcess != null);
Expand All @@ -96,6 +101,7 @@ public boolean isFinished() {
return !messageIterator.hasNext() && !sourceProcess.isAlive();
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public int getExitValue() throws IllegalStateException {
Preconditions.checkState(sourceProcess != null, "Source process is null, cannot retrieve exit value.");
Expand All @@ -108,13 +114,15 @@ public int getExitValue() throws IllegalStateException {
return exitValue;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(sourceProcess != null);

return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void close() throws Exception {
if (sourceProcess == null) {
Expand All @@ -134,6 +142,7 @@ public void close() throws Exception {
}
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() throws Exception {
LOGGER.info("Attempting to cancel source process...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.metrics.lib.MetricClientFactory;
Expand Down Expand Up @@ -52,6 +55,7 @@ public DefaultAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder
this.containerLogMdcBuilder = containerLogMdcBuilder;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
final var metricClient = MetricClientFactory.getMetricClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.StateType;
Expand Down Expand Up @@ -50,6 +53,7 @@ public EmptyAirbyteSource(final boolean useStreamCapableState) {
this.useStreamCapableState = useStreamCapableState;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoot) throws Exception {

Expand Down Expand Up @@ -105,16 +109,19 @@ public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoo
}

// always finished. it has no data to send.
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public boolean isFinished() {
return hasEmittedState.get();
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public int getExitValue() {
return 0;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public Optional<AirbyteMessage> attemptRead() {
if (!isStarted) {
Expand All @@ -134,11 +141,13 @@ public Optional<AirbyteMessage> attemptRead() {
}
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void close() throws Exception {
// no op.
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() throws Exception {
// no op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -72,6 +75,7 @@ public StateDeltaTracker(final long memoryLimitBytes) {
* @throws StateDeltaTrackerException thrown when the memory footprint of stateDeltas exceeds
* available capacity.
*/
@Trace(operationName = WORKER_OPERATION_NAME)
public void addState(final int stateHash, final Map<Short, Long> streamIndexToRecordCount) throws StateDeltaTrackerException {
synchronized (this) {
final int size = STATE_HASH_BYTES + (streamIndexToRecordCount.size() * BYTES_PER_STREAM);
Expand Down Expand Up @@ -104,6 +108,7 @@ public void addState(final int stateHash, final Map<Short, Long> streamIndexToRe
* @throws StateDeltaTrackerException thrown when committed counts can no longer be reliably
* computed.
*/
@Trace(operationName = WORKER_OPERATION_NAME)
public void commitStateHash(final int stateHash) throws StateDeltaTrackerException {
synchronized (this) {
if (capacityExceeded) {
Expand Down Expand Up @@ -139,6 +144,7 @@ public void commitStateHash(final int stateHash) throws StateDeltaTrackerExcepti
}
}

@Trace(operationName = WORKER_OPERATION_NAME)
public Map<Short, Long> getStreamToCommittedRecords() {
return streamToCommittedRecords;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import datadog.trace.api.Trace;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
Expand Down Expand Up @@ -74,6 +77,7 @@ public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProv
* If detectVersion is set to true, it will decide which protocol version to use from the content of
* the stream rather than the one passed from the constructor.
*/
@Trace(operationName = WORKER_OPERATION_NAME)
@SneakyThrows
@Override
public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
Expand Down Expand Up @@ -123,7 +127,7 @@ private Optional<Version> detectVersion(final BufferedReader bufferedReader) thr
}
bufferedReader.reset();
return Optional.empty();
} catch (IOException e) {
} catch (final IOException e) {
logger.warn(
"Protocol version detection failed, it is likely than the connector sent more than {}B without an complete SPEC message." +
" A SPEC message that is too long could be the root cause here.",
Expand Down Expand Up @@ -156,7 +160,7 @@ protected Stream<AirbyteMessage> toAirbyteMessage(final JsonNode json) {
try {
final io.airbyte.protocol.models.v0.AirbyteMessage message = migrator.upgrade(deserializer.deserialize(json));
return Stream.of(convert(message));
} catch (RuntimeException e) {
} catch (final RuntimeException e) {
logger.warn("Failed to upgrade a message from version {}: {}", protocolVersion, Jsons.serialize(json), e);
return Stream.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.workers.internal.state_aggregator;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteStateMessage;
Expand All @@ -14,11 +17,13 @@ class SingleStateAggregator implements StateAggregator {

AirbyteStateMessage state;

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void ingest(final AirbyteStateMessage stateMessage) {
state = stateMessage;
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public State getAggregated() {
if (state.getType() == null || state.getType() == AirbyteStateType.LEGACY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.workers.internal.state_aggregator;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteStateMessage;
Expand All @@ -15,6 +18,7 @@ class StreamStateAggregator implements StateAggregator {

Map<StreamDescriptor, AirbyteStateMessage> aggregatedState = new HashMap<>();

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void ingest(final AirbyteStateMessage stateMessage) {
/**
Expand All @@ -27,6 +31,7 @@ public void ingest(final AirbyteStateMessage stateMessage) {
aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage);
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public State getAggregated() {

Expand Down
Loading

0 comments on commit b323ccd

Please sign in to comment.