Skip to content

Commit

Permalink
close ssh in case of exception during check in Postgres connector (#1…
Browse files Browse the repository at this point in the history
…0620)

* close ssh in case of exception

* remove unwanted change

* remove comment

* format

* do not close scanner

* fix semi-colon

* format
  • Loading branch information
subodh1810 authored Feb 28, 2022
1 parent 12ddcdf commit f71754d
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
try (messageIterator) {
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
AirbyteSentry.executeWithTracing("ReadSource", () -> messageIterator.forEachRemaining(outputRecordCollector::accept));
}
}
Expand All @@ -145,8 +144,9 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector);
AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer));
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer));
}
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
}
Expand All @@ -159,16 +159,14 @@ static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exc
// use a Scanner that only processes new line characters to strictly abide with the
// https://jsonlines.org/ standard
final Scanner input = new Scanner(System.in).useDelimiter("[\r\n]+");
try (consumer) {
consumer.start();
while (input.hasNext()) {
final String inputString = input.next();
final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (messageOptional.isPresent()) {
consumer.accept(messageOptional.get());
} else {
LOGGER.error("Received invalid message: " + inputString);
}
consumer.start();
while (input.hasNext()) {
final String inputString = input.next();
final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (messageOptional.isPresent()) {
consumer.accept(messageOptional.get());
} else {
LOGGER.error("Received invalid message: " + inputString);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SshWrappedSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(SshWrappedSource.class);
private final Source delegate;
private final List<String> hostKey;
private final List<String> portKey;
Expand Down Expand Up @@ -46,7 +49,15 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state)
throws Exception {
final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey);
return AutoCloseableIterators.appendOnClose(delegate.read(tunnel.getConfigInTunnel(), catalog, state), tunnel::close);
final AutoCloseableIterator<AirbyteMessage> delegateRead;
try {
delegateRead = delegate.read(tunnel.getConfigInTunnel(), catalog, state);
} catch (final Exception e) {
LOGGER.error("Exception occurred while getting the delegate read iterator, closing SSH tunnel", e);
tunnel.close();
throw e;
}
return AutoCloseableIterators.appendOnClose(delegateRead, tunnel::close);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,13 @@ void testDestinationConsumerLifecycleSuccess() throws Exception {
+ Jsons.serialize(message2) + "\n"
+ Jsons.serialize(stateMessage)).getBytes()));

final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class);
IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock);

final InOrder inOrder = inOrder(airbyteMessageConsumerMock);
inOrder.verify(airbyteMessageConsumerMock).accept(message1);
inOrder.verify(airbyteMessageConsumerMock).accept(message2);
inOrder.verify(airbyteMessageConsumerMock).accept(stateMessage);
inOrder.verify(airbyteMessageConsumerMock).close();
try (final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class)) {
IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock);
final InOrder inOrder = inOrder(airbyteMessageConsumerMock);
inOrder.verify(airbyteMessageConsumerMock).accept(message1);
inOrder.verify(airbyteMessageConsumerMock).accept(message2);
inOrder.verify(airbyteMessageConsumerMock).accept(stateMessage);
}
}

@Test
Expand All @@ -267,15 +266,13 @@ void testDestinationConsumerLifecycleFailure() throws Exception {
.withEmittedAt(EMITTED_AT));
System.setIn(new ByteArrayInputStream((Jsons.serialize(message1) + "\n" + Jsons.serialize(message2)).getBytes()));

final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class);
doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(message1);

assertThrows(IOException.class, () -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock));

final InOrder inOrder = inOrder(airbyteMessageConsumerMock);
inOrder.verify(airbyteMessageConsumerMock).accept(message1);
inOrder.verify(airbyteMessageConsumerMock).close();
inOrder.verifyNoMoreInteractions();
try (final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class)) {
doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(message1);
assertThrows(IOException.class, () -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock));
final InOrder inOrder = inOrder(airbyteMessageConsumerMock);
inOrder.verify(airbyteMessageConsumerMock).accept(message1);
inOrder.verifyNoMoreInteractions();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,18 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(config);

final JdbcDatabase database = Databases.createJdbcDatabase(
jdbcConfig.get("username").asText(),
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
jdbcConfig.get("jdbc_url").asText(),
driverClass,
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null,
sourceOperations);
jdbcConfig.get("username").asText(),
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
jdbcConfig.get("jdbc_url").asText(),
driverClass,
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null,
sourceOperations);

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);

return new CockroachJdbcDatabase(database, sourceOperations);
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges(JdbcDatabase database) {
return connection -> {
final PreparedStatement ps = connection.prepareStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;

import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand All @@ -22,17 +18,15 @@
import java.util.stream.Stream;

/**
* This implementation uses non-streamed queries to CockroachDB. CockroachDB
* does not currently support multiple active pgwire portals on the same session,
* which makes it impossible to replicate tables that have over ~1000 rows
* using StreamingJdbcDatabase. See: https://go.crdb.dev/issue-v/40195/v21.2
* and in particular, the comment:
* https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351
* The same situation as kafka-connect applies to StreamingJdbcDatabase
* This implementation uses non-streamed queries to CockroachDB. CockroachDB does not currently
* support multiple active pgwire portals on the same session, which makes it impossible to
* replicate tables that have over ~1000 rows using StreamingJdbcDatabase. See:
* https://go.crdb.dev/issue-v/40195/v21.2 and in particular, the comment:
* https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351 The
* same situation as kafka-connect applies to StreamingJdbcDatabase
*/
public class CockroachJdbcDatabase
extends JdbcDatabase
{
extends JdbcDatabase {

private final JdbcDatabase database;

Expand Down

0 comments on commit f71754d

Please sign in to comment.