Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate ssh exception -> config error exception #19094

Merged
merged 6 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,18 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
}
} catch (final Exception e) {
final String displayMessage = getDisplayMessage(e);
// Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An attempt is made to
// find the root exception that corresponds to a configuration error. If that does not exist, we
// just return the original exception.
final Throwable rootThrowable = getRootConfigError(e);
final String displayMessage = getDisplayMessage(rootThrowable);
// If the source connector throws a config error, a trace message with the relevant message should
// be surfaced.
if (isConfigError(e)) {
if (isConfigError(rootThrowable)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage);
}
if (parsed.getCommand().equals(Command.CHECK)) {
// Currently, special handling is required for the SPEC case since the user display information in
// Currently, special handling is required for the CHECK case since the user display information in
// the trace message is
// not properly surfaced to the FE. In the future, we can remove this and just throw an exception.
outputRecordCollector
Expand All @@ -179,18 +183,34 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

private boolean isConfigError(final Exception e) {
/**
* Returns the first instance of an exception associated with a configuration error (if it exists).
* Otherwise, the original exception is returned.
*/
private Throwable getRootConfigError(final Exception e) {
Throwable current = e;
while (current != null) {
if (isConfigError(current)) {
return current;
} else {
current = current.getCause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like peeling an onion..

Copy link
Contributor

@ryankfu ryankfu Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find! Unfortunately, Throwables.getRootCause gets the root exception. Here, we actually want the root exception that is a possible configexception. The nuance here is that we could catch some SQLException, repackage it as a ConfigErrorException which then is repackaged up the stack as a RuntimeException.

We actually want the ConfigErrorException, so I think we have to manually unnest this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then can we add a javadoc comment that indicates this looks for the first instance of a ConfigErrorException? Also curious what happens if the Exception does not contain a ConfigErrorException, it seems would it be returning null, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh good catch :(
Will just return the original exception if there is no config error. Also updated the comments

}
}
return e;
}

private boolean isConfigError(final Throwable e) {
return e instanceof ConfigErrorException || e instanceof ConnectionErrorException;
}

private String getDisplayMessage(final Exception e) {
private String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e);
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else {
return "Could not connect with provided configuration. Error: " + e.getMessage();
return "Could not connect with provided configuration. Error: " + e.getMessage() != null ? e.getMessage() : "";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -311,7 +312,7 @@ KeyPair getPrivateKeyPair() throws IOException, GeneralSecurityException {
if (keyPairs != null && keyPairs.iterator().hasNext()) {
return keyPairs.iterator().next();
}
throw new RuntimeException("Unable to load private key pairs, verify key pairs are properly inputted");
throw new ConfigErrorException("Unable to load private key pairs, verify key pairs are properly inputted");
}

private String validateKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.base;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -19,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -222,6 +225,67 @@ void testRead() throws Exception {
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testReadException() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath,
statePath);
final ConfigErrorException configErrorException = new ConfigErrorException("Invalid configuration");

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenThrow(configErrorException);

final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
final Throwable throwable = catchThrowable(() -> new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS));

assertThat(throwable).isInstanceOf(ConfigErrorException.class);
verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE);
}

@Test
void testCheckNestedException() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.check(configPath);
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Invalid configuration");
final ConfigErrorException configErrorException = new ConfigErrorException("Invalid configuration");
final RuntimeException runtimeException = new RuntimeException(new RuntimeException(configErrorException));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.check(CONFIG)).thenThrow(runtimeException);

final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);
final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testCheckRuntimeException() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.check(configPath);
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Runtime Error");
final RuntimeException runtimeException = new RuntimeException("Runtime Error");

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.check(CONFIG)).thenThrow(runtimeException);

final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);
final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testWrite() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.write(configPath, configuredCatalogPath);
Expand Down