diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java b/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java index 64486137d866..8ec652f10da8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java @@ -21,6 +21,11 @@ @Slf4j public class TemporalSdkInterceptor implements TraceInterceptor { + /** + * Trace resource name used to scope the filtering performed by this interceptor. + */ + static final String CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME = "ConnectionManagerWorkflowImpl.run"; + /** * Error message tag key name that contains the Temporal exit error message. */ @@ -73,7 +78,23 @@ boolean isExitTrace(final MutableSpan trace) { return trace.isError() && EXIT_ERROR_MESSAGE.equalsIgnoreCase(trace.getTags().getOrDefault(ERROR_MESSAGE_TAG_KEY, "").toString()) && - WORKFLOW_TRACE_OPERATION_NAME.equalsIgnoreCase(trace.getOperationName().toString()); + (safeEquals(trace.getOperationName(), WORKFLOW_TRACE_OPERATION_NAME) + || safeEquals(trace.getResourceName(), CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME)); + } + + /** + * Safely test if the provided {@link CharSequence} equals the provided expected string value. + * + * @param actual The {@link CharSequence} to test. + * @param expected The expected string value to be contained in the {@link CharSequence}. + * @return {@code true} if the strings are equal (ignoring case) or {@code false} otherwise. + */ + private boolean safeEquals(final CharSequence actual, final String expected) { + if (actual != null) { + return expected.equalsIgnoreCase(actual.toString()); + } else { + return false; + } } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java index 5565d63b31e5..811fef3bdd39 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java @@ -5,6 +5,7 @@ package io.airbyte.workers.tracing; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; +import static io.airbyte.workers.tracing.TemporalSdkInterceptor.CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME; import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MESSAGE_TAG_KEY; import static io.airbyte.workers.tracing.TemporalSdkInterceptor.EXIT_ERROR_MESSAGE; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -33,18 +34,29 @@ void testOnTraceComplete() { otherError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); otherError.setTag("error.message", "some other error"); - final var temporalExitMsgError = new DummySpan(); - temporalExitMsgError.setError(true); - temporalExitMsgError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - temporalExitMsgError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + final var temporalExitMsgOperationNameError = new DummySpan(); + temporalExitMsgOperationNameError.setError(true); + temporalExitMsgOperationNameError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + temporalExitMsgOperationNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + + final var temporalExitMsgResourceNameError = new DummySpan(); + temporalExitMsgResourceNameError.setError(true); + temporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); + temporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + + final var temporalExitMsgOtherOperationError = new DummySpan(); + temporalExitMsgOtherOperationError.setError(true); + temporalExitMsgOtherOperationError.setOperationName("OtherOperation"); + temporalExitMsgOtherOperationError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); final var temporalExitMsgOtherResourceError = new DummySpan(); temporalExitMsgOtherResourceError.setError(true); - temporalExitMsgOtherResourceError.setOperationName("OtherOperation"); + temporalExitMsgOtherResourceError.setResourceName("OtherResource"); temporalExitMsgOtherResourceError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); final var spans = List.of( - simple, noError, otherError, temporalExitMsgError, temporalExitMsgOtherResourceError); + simple, noError, otherError, temporalExitMsgOperationNameError, temporalExitMsgResourceNameError, temporalExitMsgOtherOperationError, + temporalExitMsgOtherResourceError); final var interceptor = new TemporalSdkInterceptor(); final var actual = interceptor.onTraceComplete(spans); @@ -53,7 +65,9 @@ void testOnTraceComplete() { assertFalse(simple.isError()); assertFalse(noError.isError()); assertTrue(otherError.isError()); - assertFalse(temporalExitMsgError.isError()); + assertFalse(temporalExitMsgOperationNameError.isError()); + assertFalse(temporalExitMsgResourceNameError.isError()); + assertTrue(temporalExitMsgOtherOperationError.isError()); assertTrue(temporalExitMsgOtherResourceError.isError()); } @@ -64,26 +78,47 @@ void testIsExitTrace() { assertEquals(false, interceptor.isExitTrace(null)); assertEquals(false, interceptor.isExitTrace(new DummySpan())); - final var temporalTrace = new DummySpan(); - temporalTrace.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - assertEquals(false, interceptor.isExitTrace(temporalTrace)); - - final var temporalTraceWithError = new DummySpan(); - temporalTraceWithError.setError(true); - temporalTraceWithError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - assertEquals(false, interceptor.isExitTrace(temporalTraceWithError)); - - final var temporalTraceWithExitError = new DummySpan(); - temporalTraceWithExitError.setError(true); - temporalTraceWithExitError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - temporalTraceWithExitError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); - assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitError)); - - final var otherTemporalTraceWithExitError = new DummySpan(); - otherTemporalTraceWithExitError.setError(true); - otherTemporalTraceWithExitError.setOperationName("OtherOperation"); - otherTemporalTraceWithExitError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); - assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitError)); + final var temporalTraceWithOperationName = new DummySpan(); + temporalTraceWithOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithOperationName)); + + final var temporalTraceWithResourceName = new DummySpan(); + temporalTraceWithResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithResourceName)); + + final var temporalTraceWithErrorAndOperationName = new DummySpan(); + temporalTraceWithErrorAndOperationName.setError(true); + temporalTraceWithErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndOperationName)); + + final var temporalTraceWithErrorAndResourceName = new DummySpan(); + temporalTraceWithErrorAndResourceName.setError(true); + temporalTraceWithErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndResourceName)); + + final var temporalTraceWithExitErrorAndOperationName = new DummySpan(); + temporalTraceWithExitErrorAndOperationName.setError(true); + temporalTraceWithExitErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + temporalTraceWithExitErrorAndOperationName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndOperationName)); + + final var temporalTraceWithExitErrorAndResourceName = new DummySpan(); + temporalTraceWithExitErrorAndResourceName.setError(true); + temporalTraceWithExitErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); + temporalTraceWithExitErrorAndResourceName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndResourceName)); + + final var otherTemporalTraceWithExitErrorAndOtherOperationName = new DummySpan(); + otherTemporalTraceWithExitErrorAndOtherOperationName.setError(true); + otherTemporalTraceWithExitErrorAndOtherOperationName.setOperationName("OtherOperation"); + otherTemporalTraceWithExitErrorAndOtherOperationName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherOperationName)); + + final var otherTemporalTraceWithExitErrorAndOtherResourceName = new DummySpan(); + otherTemporalTraceWithExitErrorAndOtherResourceName.setError(true); + otherTemporalTraceWithExitErrorAndOtherResourceName.setResourceName("OtherResource"); + otherTemporalTraceWithExitErrorAndOtherResourceName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherResourceName)); } @Test