From 91d19b6101a935b34adbcebf683f0eb71b365890 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 7 Nov 2022 13:05:07 -0800 Subject: [PATCH 1/4] Update SshTunnel.java Migrate ssh exception -> config error exception --- .../main/java/io/airbyte/integrations/base/ssh/SshTunnel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java index 261699be2168..2426bb7dbc61 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java @@ -311,7 +311,7 @@ KeyPair getPrivateKeyPair() throws IOException, GeneralSecurityException { if (keyPairs != null && keyPairs.iterator().hasNext()) { return keyPairs.iterator().next(); } - throw new RuntimeException("Unable to load private key pairs, verify key pairs are properly inputted"); + throw new ConfigErrorException("Unable to load private key pairs, verify key pairs are properly inputted"); } private String validateKey() { From e496bf326501b945ad788070aca9679278e1e2af Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 7 Nov 2022 15:52:41 -0800 Subject: [PATCH 2/4] Unnest exception --- .../integrations/base/IntegrationRunner.java | 30 ++++++++++--- .../integrations/base/ssh/SshTunnel.java | 1 + .../base/IntegrationRunnerTest.java | 44 +++++++++++++++++++ 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index f23e913d784c..2d612adf8a06 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -153,14 +153,18 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } } catch (final Exception e) { - final String displayMessage = getDisplayMessage(e); + // Many of the exceptions thrown are nested inside layers of RuntimeExceptions. We need to get the + // root exception to decide whether it corresponds to an exception that represents a configuration + // error. + final Throwable rootThrowable = getRootThrowable(e); + final String displayMessage = getDisplayMessage(rootThrowable); // If the source connector throws a config error, a trace message with the relevant message should // be surfaced. - if (isConfigError(e)) { + if (isConfigError(rootThrowable)) { AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage); } if (parsed.getCommand().equals(Command.CHECK)) { - // Currently, special handling is required for the SPEC case since the user display information in + // Currently, special handling is required for the CHECK case since the user display information in // the trace message is // not properly surfaced to the FE. In the future, we can remove this and just throw an exception. outputRecordCollector @@ -179,18 +183,30 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Completed integration: {}", integration.getClass().getName()); } - private boolean isConfigError(final Exception e) { + private Throwable getRootThrowable(final Exception e) { + Throwable current = e; + while (current != null) { + if (isConfigError(current)) { + return current; + } else { + current = current.getCause(); + } + } + return current; + } + + private boolean isConfigError(final Throwable e) { return e instanceof ConfigErrorException || e instanceof ConnectionErrorException; } - private String getDisplayMessage(final Exception e) { + private String getDisplayMessage(final Throwable e) { if (e instanceof ConfigErrorException) { return ((ConfigErrorException) e).getDisplayMessage(); } else if (e instanceof ConnectionErrorException) { final ConnectionErrorException connEx = (ConnectionErrorException) e; - return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e); + return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx); } else { - return "Could not connect with provided configuration. Error: " + e.getMessage(); + return "Could not connect with provided configuration. Error: " + e.getMessage() != null ? e.getMessage() : ""; } } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java index 2426bb7dbc61..2b24d2babb35 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 154481db5c59..73996174edd6 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.base; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -19,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterators; @@ -222,6 +225,47 @@ void testRead() throws Exception { verify(jsonSchemaValidator).validate(any(), any()); } + @Test + void testReadException() throws Exception { + final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, + statePath); + final ConfigErrorException configErrorException = new ConfigErrorException("Invalid configuration"); + + when(cliParser.parse(ARGS)).thenReturn(intConfig); + when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenThrow(configErrorException); + + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + + final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + final Throwable throwable = catchThrowable(() -> new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS)); + + assertThat(throwable).isInstanceOf(ConfigErrorException.class); + verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE); + } + + @Test + void testCheckNestedException() throws Exception { + final IntegrationConfig intConfig = IntegrationConfig.check(configPath); + final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Invalid configuration"); + final ConfigErrorException configErrorException = new ConfigErrorException("Invalid configuration"); + final RuntimeException runtimeException = new RuntimeException(new RuntimeException(configErrorException)); + + when(cliParser.parse(ARGS)).thenReturn(intConfig); + when(source.check(CONFIG)).thenThrow(runtimeException); + + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS); + + verify(source).check(CONFIG); + verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output)); + verify(jsonSchemaValidator).validate(any(), any()); + } + @Test void testWrite() throws Exception { final IntegrationConfig intConfig = IntegrationConfig.write(configPath, configuredCatalogPath); From 9f28f16297bf71636af42029aa87d873289a93c7 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 7 Nov 2022 17:04:33 -0800 Subject: [PATCH 3/4] Address PR comments --- .../integrations/base/IntegrationRunner.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 2d612adf8a06..6b37e65cd37f 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -153,10 +153,10 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } } catch (final Exception e) { - // Many of the exceptions thrown are nested inside layers of RuntimeExceptions. We need to get the - // root exception to decide whether it corresponds to an exception that represents a configuration - // error. - final Throwable rootThrowable = getRootThrowable(e); + // Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An attempt is made to + // find the root exception that corresponds to a configuration error. If that does not exist, we + // just return the original exception. + final Throwable rootThrowable = getRootConfigError(e); final String displayMessage = getDisplayMessage(rootThrowable); // If the source connector throws a config error, a trace message with the relevant message should // be surfaced. @@ -183,7 +183,11 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Completed integration: {}", integration.getClass().getName()); } - private Throwable getRootThrowable(final Exception e) { + /** + * Returns the first instance of an exception associated with a configuration error (if it exists). + * Otherwise, the original exception is returned. + */ + private Throwable getRootConfigError(final Exception e) { Throwable current = e; while (current != null) { if (isConfigError(current)) { @@ -192,7 +196,7 @@ private Throwable getRootThrowable(final Exception e) { current = current.getCause(); } } - return current; + return e; } private boolean isConfigError(final Throwable e) { From 9071287b853f861fe30cd4329bec8b3c28646794 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 7 Nov 2022 17:17:09 -0800 Subject: [PATCH 4/4] Add test case --- .../base/IntegrationRunnerTest.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 73996174edd6..5b9bd4cb183f 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -266,6 +266,26 @@ void testCheckNestedException() throws Exception { verify(jsonSchemaValidator).validate(any(), any()); } + @Test + void testCheckRuntimeException() throws Exception { + final IntegrationConfig intConfig = IntegrationConfig.check(configPath); + final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Runtime Error"); + final RuntimeException runtimeException = new RuntimeException("Runtime Error"); + + when(cliParser.parse(ARGS)).thenReturn(intConfig); + when(source.check(CONFIG)).thenThrow(runtimeException); + + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS); + + verify(source).check(CONFIG); + verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output)); + verify(jsonSchemaValidator).validate(any(), any()); + } + @Test void testWrite() throws Exception { final IntegrationConfig intConfig = IntegrationConfig.write(configPath, configuredCatalogPath);