diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index f23e913d784c..6b37e65cd37f 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -153,14 +153,18 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } } catch (final Exception e) { - final String displayMessage = getDisplayMessage(e); + // Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An attempt is made to + // find the root exception that corresponds to a configuration error. If that does not exist, we + // just return the original exception. + final Throwable rootThrowable = getRootConfigError(e); + final String displayMessage = getDisplayMessage(rootThrowable); // If the source connector throws a config error, a trace message with the relevant message should // be surfaced. - if (isConfigError(e)) { + if (isConfigError(rootThrowable)) { AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage); } if (parsed.getCommand().equals(Command.CHECK)) { - // Currently, special handling is required for the SPEC case since the user display information in + // Currently, special handling is required for the CHECK case since the user display information in // the trace message is // not properly surfaced to the FE. In the future, we can remove this and just throw an exception. outputRecordCollector @@ -179,18 +183,34 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Completed integration: {}", integration.getClass().getName()); } - private boolean isConfigError(final Exception e) { + /** + * Returns the first instance of an exception associated with a configuration error (if it exists). + * Otherwise, the original exception is returned. + */ + private Throwable getRootConfigError(final Exception e) { + Throwable current = e; + while (current != null) { + if (isConfigError(current)) { + return current; + } else { + current = current.getCause(); + } + } + return e; + } + + private boolean isConfigError(final Throwable e) { return e instanceof ConfigErrorException || e instanceof ConnectionErrorException; } - private String getDisplayMessage(final Exception e) { + private String getDisplayMessage(final Throwable e) { if (e instanceof ConfigErrorException) { return ((ConfigErrorException) e).getDisplayMessage(); } else if (e instanceof ConnectionErrorException) { final ConnectionErrorException connEx = (ConnectionErrorException) e; - return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e); + return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx); } else { - return "Could not connect with provided configuration. Error: " + e.getMessage(); + return "Could not connect with provided configuration. Error: " + e.getMessage() != null ? e.getMessage() : ""; } } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java index 261699be2168..2b24d2babb35 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; @@ -311,7 +312,7 @@ KeyPair getPrivateKeyPair() throws IOException, GeneralSecurityException { if (keyPairs != null && keyPairs.iterator().hasNext()) { return keyPairs.iterator().next(); } - throw new RuntimeException("Unable to load private key pairs, verify key pairs are properly inputted"); + throw new ConfigErrorException("Unable to load private key pairs, verify key pairs are properly inputted"); } private String validateKey() { diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 154481db5c59..5b9bd4cb183f 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.base; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -19,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterators; @@ -222,6 +225,67 @@ void testRead() throws Exception { verify(jsonSchemaValidator).validate(any(), any()); } + @Test + void testReadException() throws Exception { + final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, + statePath); + final ConfigErrorException configErrorException = new ConfigErrorException("Invalid configuration"); + + when(cliParser.parse(ARGS)).thenReturn(intConfig); + when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenThrow(configErrorException); + + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + + final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + final Throwable throwable = catchThrowable(() -> new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS)); + + assertThat(throwable).isInstanceOf(ConfigErrorException.class); + verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE); + } + + @Test + void testCheckNestedException() throws Exception { + final IntegrationConfig intConfig = IntegrationConfig.check(configPath); + final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Invalid configuration"); + final ConfigErrorException configErrorException = new ConfigErrorException("Invalid configuration"); + final RuntimeException runtimeException = new RuntimeException(new RuntimeException(configErrorException)); + + when(cliParser.parse(ARGS)).thenReturn(intConfig); + when(source.check(CONFIG)).thenThrow(runtimeException); + + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS); + + verify(source).check(CONFIG); + verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output)); + verify(jsonSchemaValidator).validate(any(), any()); + } + + @Test + void testCheckRuntimeException() throws Exception { + final IntegrationConfig intConfig = IntegrationConfig.check(configPath); + final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Runtime Error"); + final RuntimeException runtimeException = new RuntimeException("Runtime Error"); + + when(cliParser.parse(ARGS)).thenReturn(intConfig); + when(source.check(CONFIG)).thenThrow(runtimeException); + + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS); + + verify(source).check(CONFIG); + verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output)); + verify(jsonSchemaValidator).validate(any(), any()); + } + @Test void testWrite() throws Exception { final IntegrationConfig intConfig = IntegrationConfig.write(configPath, configuredCatalogPath);