Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉Destination-elasticsearch: added custom sertificate support #18177

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,43 @@

package io.airbyte.integrations.destination.elasticsearch;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticsearchStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private final ObjectMapper mapper = new ObjectMapper();
private static ElasticsearchContainer container;
private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:8.3.3";
private final ObjectMapper mapper = new ObjectMapper();

@BeforeAll
public static void beforeAll() {

container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1")
.withPassword("MagicWord");
container = new ElasticsearchContainer(IMAGE_NAME)
.withEnv("discovery.type", "single-node")
.withEnv("network.host", "0.0.0.0")
.withEnv("logger.org.elasticsearch", "INFO")
.withEnv("ingest.geoip.downloader.enabled", "false")
.withExposedPorts(9200)
.withPassword("s3cret");

container.start();
}
Expand Down Expand Up @@ -81,6 +94,22 @@ protected TestDataComparator getTestDataComparator() {
@Override
protected JsonNode getConfig() {

final JsonNode authConfig = Jsons.jsonNode(Map.of(
"method", "basic",
"username", "elastic",
"password", "s3cret"));

return Jsons.jsonNode(ImmutableMap.builder()
.put("endpoint", String.format("https://%s:%s", container.getHost(), container.getMappedPort(9200)))
.put("authenticationMethod", authConfig)
.put("certAsBytes", container.copyFileFromContainer(
"/usr/share/elasticsearch/config/certs/http_ca.crt",
InputStream::readAllBytes))
.build());
}

protected JsonNode getUnsecureConfig() {
etsybaev marked this conversation as resolved.
Show resolved Hide resolved

final JsonNode authConfig = Jsons.jsonNode(Map.of(
"method", "basic",
"username", "elastic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
"description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.",
"default": true
},
"certAsBytes": {
"title": "CA certificate (Base64 encoded)",
"type": "string",
"description": "CA certificate for Elasticsearch server"
},
"authenticationMethod": {
"title": "Authentication Method",
"type": "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ConnectorConfiguration {

private String endpoint;
private boolean upsert;
private byte[] certAsBytes;
private AuthenticationMethod authenticationMethod = new AuthenticationMethod();

public ConnectorConfiguration() {}
Expand All @@ -34,6 +36,15 @@ public AuthenticationMethod getAuthenticationMethod() {
return this.authenticationMethod;
}

public byte[] getCertAsBytes() {
return certAsBytes;
}

public ConnectorConfiguration setCertAsBytes(byte[] certAsBytes) {
this.certAsBytes = certAsBytes;
return this;
}

public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
Expand All @@ -48,24 +59,43 @@ public void setAuthenticationMethod(AuthenticationMethod authenticationMethod) {

@Override
public boolean equals(Object o) {
if (this == o)
if (this == o) {
return true;
if (o == null || getClass() != o.getClass())
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ConnectorConfiguration that = (ConnectorConfiguration) o;
return upsert == that.upsert && Objects.equals(endpoint, that.endpoint) && Objects.equals(authenticationMethod, that.authenticationMethod);

if (upsert != that.upsert) {
return false;
}
if (endpoint != null ? !endpoint.equals(that.endpoint) : that.endpoint != null) {
return false;
}
if (!Arrays.equals(certAsBytes, that.certAsBytes)) {
return false;
}
return authenticationMethod != null ? authenticationMethod.equals(that.authenticationMethod)
etsybaev marked this conversation as resolved.
Show resolved Hide resolved
: that.authenticationMethod == null;
}

@Override
public int hashCode() {
return Objects.hash(endpoint, upsert, authenticationMethod);
int result = endpoint != null ? endpoint.hashCode() : 0;
etsybaev marked this conversation as resolved.
Show resolved Hide resolved
result = 31 * result + (upsert ? 1 : 0);
result = 31 * result + Arrays.hashCode(certAsBytes);
result = 31 * result + (authenticationMethod != null ? authenticationMethod.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ConnectorConfiguration{" +
"endpoint='" + endpoint + '\'' +
", upsert=" + upsert +
", certAsBytes=" + Arrays.toString(certAsBytes) +
", authenticationMethod=" + authenticationMethod +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,7 +57,17 @@ public ElasticsearchConnection(ConnectorConfiguration config) {

// Create the low-level client
httpHost = HttpHost.create(config.getEndpoint());
restClient = RestClient.builder(httpHost)
final RestClientBuilder builder = RestClient.builder(httpHost);

// Set custom user's certificate if provided
if (config.getCertAsBytes() != null && config.getCertAsBytes().length != 0){
etsybaev marked this conversation as resolved.
Show resolved Hide resolved
builder.setHttpClientConfigCallback(clientBuilder -> {
clientBuilder.setSSLContext(SslUtils.createContextFromCaCert(config.getCertAsBytes()));
return clientBuilder;
});
}

restClient = builder
.setDefaultHeaders(configureHeaders(config))
.setFailureListener(new FailureListener())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.airbyte.integrations.destination.elasticsearch;

import java.io.ByteArrayInputStream;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import javax.net.ssl.SSLContext;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;

public class SslUtils {

public static SSLContext createContextFromCaCert(byte[] certAsBytes) {
etsybaev marked this conversation as resolved.
Show resolved Hide resolved
try {
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa = factory.generateCertificate(
new ByteArrayInputStream(certAsBytes)
);
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder =
SSLContexts.custom().loadTrustMaterial(trustStore, null);
return sslContextBuilder.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
"description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.",
"default": true
},
"certAsBytes": {
etsybaev marked this conversation as resolved.
Show resolved Hide resolved
"title": "CA certificate (Base64 encoded)",
"type": "string",
"description": "CA certificate for Elasticsearch server"
},
"authenticationMethod": {
"title": "Authentication Method",
"type": "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,28 @@
import java.util.List;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticsearchDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:8.3.3";
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchDestinationAcceptanceTest.class);

private ObjectMapper mapper = new ObjectMapper();
private static ElasticsearchContainer container;

@BeforeAll
public static void beforeAll() {
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1")
.withEnv("ES_JAVA_OPTS", "-Xms512m -Xms512m")
container = new ElasticsearchContainer(IMAGE_NAME)
.withEnv("discovery.type", "single-node")
.withEnv("network.host", "0.0.0.0")
.withEnv("logger.org.elasticsearch", "INFO")
.withEnv("ingest.geoip.downloader.enabled", "false")
.withEnv("xpack.security.enabled", "false")
.withPassword("s3cret")
.withExposedPorts(9200)
.withEnv("xpack.security.enabled", "false")
.withStartupTimeout(Duration.ofSeconds(60));
container.start();
}
Expand Down
3 changes: 2 additions & 1 deletion deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ connectors-testcontainers-scylla = "1.16.2"
connectors-testcontainers-tidb = "1.16.3"
connectors-destination-testcontainers-clickhouse = "1.17.3"
connectors-destination-testcontainers-oracle-xe = "1.17.3"
connectors-destination-testcontainers-elasticsearch = "1.17.3"
connectors-source-testcontainers-clickhouse = "1.17.3"
platform-testcontainers = "1.17.3"

Expand Down Expand Up @@ -54,7 +55,7 @@ connectors-testcontainers = { module = "org.testcontainers:testcontainers", vers
connectors-testcontainers-cassandra = { module = "org.testcontainers:cassandra", version.ref = "connectors-testcontainers-cassandra" }
connectors-testcontainers-cockroachdb = { module = "org.testcontainers:cockroachdb", version.ref = "connectors-testcontainers" }
connectors-testcontainers-db2 = { module = "org.testcontainers:db2", version.ref = "connectors-testcontainers" }
connectors-testcontainers-elasticsearch = { module = "org.testcontainers:elasticsearch", version.ref = "connectors-testcontainers" }
connectors-testcontainers-elasticsearch = { module = "org.testcontainers:elasticsearch", version.ref = "connectors-destination-testcontainers-elasticsearch" }
connectors-testcontainers-jdbc = { module = "org.testcontainers:jdbc", version.ref = "connectors-testcontainers" }
connectors-testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "connectors-testcontainers" }
connectors-testcontainers-mariadb = { module = "org.testcontainers:mariadb", version.ref = "connectors-testcontainers-mariadb" }
Expand Down
8 changes: 8 additions & 0 deletions docs/integrations/destinations/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,18 @@ The connector should be enhanced to support variable batch sizes.
* Endpoint URL [ex. https://elasticsearch.savantly.net:9423]
* Username [optional] (basic auth)
* Password [optional] (basic auth)
* CA certificate (Base64 encoded) [optional]
* Api key ID [optional]
* Api key secret [optional]
* If authentication is used, the user should have permission to create an index if it doesn't exist, and/or be able to `create` documents

### CA certificate
Ca certificate may be fetched from the Elasticsearch server from /usr/share/elasticsearch/config/certs/http_ca.crt
Fetching example from dockerized Elasticsearch:
`docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .` where es01 is a container's name. For more details please visit https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

Airbyte accepts only CA certificates encoded in Base64 format. You may use any convertors to encode it locally or
online service like https://www.base64encode.org. It's always better to use local encoders for sensitive data to prevent data leaks.

### Setup guide
Enter the endpoint URL, select authentication method, and whether to use 'upsert' method when indexing new documents.
Expand Down