Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉Source Clickhouse: added option to connect via SSH tunnel (aka Bastion server) #7327

Merged
merged 16 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "bad83517-5e54-4a3d-9b53-63e85fbd4d7c",
"name": "ClickHouse",
"dockerRepository": "airbyte/source-clickhouse",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/clickhouse"
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
- name: ClickHouse
sourceDefinitionId: bad83517-5e54-4a3d-9b53-63e85fbd4d7c
dockerRepository: airbyte/source-clickhouse
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/clickhouse
sourceType: database
- name: Close.com
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final
}

public ImmutableMap.Builder<Object, Object> getBasicDbConfigBuider(final JdbcDatabaseContainer<?> db) {
return getBasicDbConfigBuider(db, db.getDatabaseName());
}

public ImmutableMap.Builder<Object, Object> getBasicDbConfigBuider(final JdbcDatabaseContainer<?> db, final String schemaName) {
return ImmutableMap.builder()
.put("host", Objects.requireNonNull(db.getContainerInfo().getNetworkSettings()
.getNetworks()
Expand All @@ -62,7 +66,7 @@ public ImmutableMap.Builder<Object, Object> getBasicDbConfigBuider(final JdbcDat
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("port", db.getExposedPorts().get(0))
.put("database", db.getDatabaseName())
.put("database", schemaName)
.put("ssl", false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-clickhouse-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.clickhouse.ClickHouseSource;
import io.airbyte.integrations.source.clickhouse.ClickHouseStrictEncryptSource;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
Expand Down Expand Up @@ -138,8 +139,8 @@ public Source getSource() {
void testSpec() throws Exception {
final ConnectorSpecification actual = source.spec();
final ConnectorSpecification expected =
Jsons.deserialize(MoreResources.readResource("expected_spec.json"),
ConnectorSpecification.class);
SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_spec.json"),
ConnectorSpecification.class));
assertEquals(expected, actual);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-clickhouse
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.db.jdbc.NoOpJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
Expand Down Expand Up @@ -66,7 +67,7 @@ protected Map<String, List<String>> discoverPrimaryKeys(final JdbcDatabase datab
public static final String DRIVER_CLASS = "ru.yandex.clickhouse.ClickHouseDriver";

public static Source getWrappedSource() {
return new ClickHouseSource();
return new SshWrappedSource(new ClickHouseSource(), List.of("host"), List.of("port"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.source.clickhouse.ClickHouseSource;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.jooq.SQLDialect;
import org.testcontainers.containers.ClickHouseContainer;

public abstract class AbstractSshClickHouseSourceAcceptanceTest extends SourceAcceptanceTest {

private ClickHouseContainer db;
private final SshBastionContainer bastion = new SshBastionContainer();
private static JsonNode config;
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String SCHEMA_NAME = "default";

public abstract SshTunnel.TunnelMethod getTunnelMethod();

@Override
protected String getImageName() {
return "airbyte/source-clickhouse:dev";
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return SshHelpers.getSpecAndInjectSsh();
}

@Override
protected JsonNode getConfig() {
return config;
}

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", config.get("database").asText(), STREAM_NAME),
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", config.get("database").asText(), STREAM_NAME2),
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected List<String> getRegexTests() {
return Collections.emptyList();
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
startTestContainers();
config = bastion.getTunnelConfig(getTunnelMethod(), bastion.getBasicDbConfigBuider(db, "default"));
populateDatabaseTestData();

}
private void startTestContainers() {
bastion.initAndStartBastion();
initAndStartJdbcContainer();
}

private void initAndStartJdbcContainer() {
db = (ClickHouseContainer) new ClickHouseContainer("yandex/clickhouse-server:21.8.8.29-alpine").withNetwork(bastion.getNetWork());
db.start();
}

private static void populateDatabaseTestData() throws Exception {
final JdbcDatabase database = Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:clickhouse://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()),
ClickHouseSource.DRIVER_CLASS);

final String table1 = JdbcUtils.getFullyQualifiedTableName(SCHEMA_NAME, STREAM_NAME);
final String createTable1 =
String.format("CREATE TABLE IF NOT EXISTS %s (id INTEGER, name VARCHAR(200)) ENGINE = TinyLog \n", table1);
final String table2 = JdbcUtils.getFullyQualifiedTableName(SCHEMA_NAME, STREAM_NAME2);
final String createTable2 =
String.format("CREATE TABLE IF NOT EXISTS %s (id INTEGER, name VARCHAR(200)) ENGINE = TinyLog \n", table2);
database.execute(connection -> {
connection.createStatement().execute(createTable1);
connection.createStatement().execute(createTable2);
});

final String insertTestData = String.format("INSERT INTO %s (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');\n", table1);
final String insertTestData2 = String.format("INSERT INTO %s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');\n", table2);
database.execute(connection -> {
connection.createStatement().execute(insertTestData);
connection.createStatement().execute(insertTestData2);
});

database.close();
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
bastion.stopAndCloseContainers(db);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.clickhouse.ClickHouseSource;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
Expand Down Expand Up @@ -43,7 +44,7 @@ protected String getImageName() {

@Override
protected ConnectorSpecification getSpec() throws Exception {
return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
return SshHelpers.getSpecAndInjectSsh();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import io.airbyte.integrations.base.ssh.SshTunnel;

public class SshKeyClickhouseSourceAcceptanceTest extends
AbstractSshClickHouseSourceAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_KEY_AUTH;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import io.airbyte.integrations.base.ssh.SshTunnel;

public class SshPasswordClickhouseSourceAcceptanceTest extends
AbstractSshClickHouseSourceAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH;
}

}
25 changes: 23 additions & 2 deletions docs/integrations/sources/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ The ClickHouse source does not alter the schema present in your warehouse. Depen
| Incremental Sync | Yes | |
| Replicate Incremental Deletes | Coming soon | |
| Logical Replication \(WAL\) | Coming soon | |
| SSL Support | No | |
| SSH Tunnel Connection | Coming soon | |
| SSL Support | Yes | |
| SSH Tunnel Connection | Yes | |
| Namespaces | Yes | Enabled by default |

## Getting started
Expand Down Expand Up @@ -55,10 +55,30 @@ You can limit this grant down to specific tables instead of the whole database.

Your database user should now be ready for use with Airbyte.

## Connection via SSH Tunnel

Airbyte has the ability to connect to a Clickhouse instance via an SSH Tunnel. The reason you might want to do this because it is not possible \(or against security policy\) to connect to the database directly \(e.g. it does not have a public IP address\).

When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. a bastion sever\) that _does_ have direct access to the database. Airbyte connects to the bastion and then asks the bastion to connect directly to the server.

Using this feature requires additional configuration, when creating the source. We will talk through what each piece of configuration means.

1. Configure all fields for the source as you normally would, except `SSH Tunnel Method`.
2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`.
1. Choose `Key Authentication` if you will be using an RSA private key as your secret for establishing the SSH Tunnel \(see below for more information on generating this key\).
2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel.
3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will connect to. This should be a hostname or an IP Address.
4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default.
5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the Clickhouse username.
6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` leave this blank. Again, this is not the Clickhouse password, but the password for the OS-user that Airbyte is using to perform commands on the bastion.
7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`.


## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.4 | 20.10.2021 | [\#7327](https://github.com/airbytehq/airbyte/pull/7327) | Added support for connection via SSH tunnel(aka Bastion server). |
| 0.1.3 | 20.10.2021 | [\#7127](https://github.com/airbytehq/airbyte/pull/7127) | Added SSL connections support. |
| 0.1.2 | 13.08.2021 | [\#4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator. |

Expand All @@ -67,4 +87,5 @@ Your database user should now be ready for use with Airbyte.

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.1 | 20.10.2021 | [\#7327](https://github.com/airbytehq/airbyte/pull/7327) | Added support for connection via SSH tunnel(aka Bastion server). |
| 0.1.0 | 20.10.2021 | [\#7127](https://github.com/airbytehq/airbyte/pull/7127) | Added source-clickhouse-strict-encrypt that supports SSL connections only. |