Skip to content

Commit

Permalink
Filter DestroyWorkflowThreadError from trace (#18907)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Nov 3, 2022
1 parent 7693a30 commit fcf4758
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
@Slf4j
public class TemporalSdkInterceptor implements TraceInterceptor {

/**
* Trace resource name used to scope the filtering performed by this interceptor.
*/
static final String CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME = "ConnectionManagerWorkflowImpl.run";

/**
* Error message tag key name that contains the Temporal exit error message.
*/
Expand Down Expand Up @@ -73,7 +78,23 @@ boolean isExitTrace(final MutableSpan trace) {

return trace.isError() &&
EXIT_ERROR_MESSAGE.equalsIgnoreCase(trace.getTags().getOrDefault(ERROR_MESSAGE_TAG_KEY, "").toString()) &&
WORKFLOW_TRACE_OPERATION_NAME.equalsIgnoreCase(trace.getOperationName().toString());
(safeEquals(trace.getOperationName(), WORKFLOW_TRACE_OPERATION_NAME)
|| safeEquals(trace.getResourceName(), CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME));
}

/**
* Safely test if the provided {@link CharSequence} equals the provided expected string value.
*
* @param actual The {@link CharSequence} to test.
* @param expected The expected string value to be contained in the {@link CharSequence}.
* @return {@code true} if the strings are equal (ignoring case) or {@code false} otherwise.
*/
private boolean safeEquals(final CharSequence actual, final String expected) {
if (actual != null) {
return expected.equalsIgnoreCase(actual.toString());
} else {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.tracing;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MESSAGE_TAG_KEY;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.EXIT_ERROR_MESSAGE;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -33,18 +34,29 @@ void testOnTraceComplete() {
otherError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
otherError.setTag("error.message", "some other error");

final var temporalExitMsgError = new DummySpan();
temporalExitMsgError.setError(true);
temporalExitMsgError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalExitMsgError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
final var temporalExitMsgOperationNameError = new DummySpan();
temporalExitMsgOperationNameError.setError(true);
temporalExitMsgOperationNameError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalExitMsgOperationNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);

final var temporalExitMsgResourceNameError = new DummySpan();
temporalExitMsgResourceNameError.setError(true);
temporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
temporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);

final var temporalExitMsgOtherOperationError = new DummySpan();
temporalExitMsgOtherOperationError.setError(true);
temporalExitMsgOtherOperationError.setOperationName("OtherOperation");
temporalExitMsgOtherOperationError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);

final var temporalExitMsgOtherResourceError = new DummySpan();
temporalExitMsgOtherResourceError.setError(true);
temporalExitMsgOtherResourceError.setOperationName("OtherOperation");
temporalExitMsgOtherResourceError.setResourceName("OtherResource");
temporalExitMsgOtherResourceError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);

final var spans = List.of(
simple, noError, otherError, temporalExitMsgError, temporalExitMsgOtherResourceError);
simple, noError, otherError, temporalExitMsgOperationNameError, temporalExitMsgResourceNameError, temporalExitMsgOtherOperationError,
temporalExitMsgOtherResourceError);

final var interceptor = new TemporalSdkInterceptor();
final var actual = interceptor.onTraceComplete(spans);
Expand All @@ -53,7 +65,9 @@ void testOnTraceComplete() {
assertFalse(simple.isError());
assertFalse(noError.isError());
assertTrue(otherError.isError());
assertFalse(temporalExitMsgError.isError());
assertFalse(temporalExitMsgOperationNameError.isError());
assertFalse(temporalExitMsgResourceNameError.isError());
assertTrue(temporalExitMsgOtherOperationError.isError());
assertTrue(temporalExitMsgOtherResourceError.isError());
}

Expand All @@ -64,26 +78,47 @@ void testIsExitTrace() {
assertEquals(false, interceptor.isExitTrace(null));
assertEquals(false, interceptor.isExitTrace(new DummySpan()));

final var temporalTrace = new DummySpan();
temporalTrace.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTrace));

final var temporalTraceWithError = new DummySpan();
temporalTraceWithError.setError(true);
temporalTraceWithError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithError));

final var temporalTraceWithExitError = new DummySpan();
temporalTraceWithExitError.setError(true);
temporalTraceWithExitError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalTraceWithExitError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitError));

final var otherTemporalTraceWithExitError = new DummySpan();
otherTemporalTraceWithExitError.setError(true);
otherTemporalTraceWithExitError.setOperationName("OtherOperation");
otherTemporalTraceWithExitError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitError));
final var temporalTraceWithOperationName = new DummySpan();
temporalTraceWithOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithOperationName));

final var temporalTraceWithResourceName = new DummySpan();
temporalTraceWithResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithResourceName));

final var temporalTraceWithErrorAndOperationName = new DummySpan();
temporalTraceWithErrorAndOperationName.setError(true);
temporalTraceWithErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndOperationName));

final var temporalTraceWithErrorAndResourceName = new DummySpan();
temporalTraceWithErrorAndResourceName.setError(true);
temporalTraceWithErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndResourceName));

final var temporalTraceWithExitErrorAndOperationName = new DummySpan();
temporalTraceWithExitErrorAndOperationName.setError(true);
temporalTraceWithExitErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalTraceWithExitErrorAndOperationName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndOperationName));

final var temporalTraceWithExitErrorAndResourceName = new DummySpan();
temporalTraceWithExitErrorAndResourceName.setError(true);
temporalTraceWithExitErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
temporalTraceWithExitErrorAndResourceName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndResourceName));

final var otherTemporalTraceWithExitErrorAndOtherOperationName = new DummySpan();
otherTemporalTraceWithExitErrorAndOtherOperationName.setError(true);
otherTemporalTraceWithExitErrorAndOtherOperationName.setOperationName("OtherOperation");
otherTemporalTraceWithExitErrorAndOtherOperationName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherOperationName));

final var otherTemporalTraceWithExitErrorAndOtherResourceName = new DummySpan();
otherTemporalTraceWithExitErrorAndOtherResourceName.setError(true);
otherTemporalTraceWithExitErrorAndOtherResourceName.setResourceName("OtherResource");
otherTemporalTraceWithExitErrorAndOtherResourceName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherResourceName));
}

@Test
Expand Down

0 comments on commit fcf4758

Please sign in to comment.