Skip to content

Commit

Permalink
🐛Destination-elasticsearch: enforce ssl connection on cloud (#18341)
Browse files Browse the repository at this point in the history
* [16992] Destination-elasticsearch: enforce ssl connection on cloud
  • Loading branch information
etsybaev authored Oct 26, 2022
1 parent 0423007 commit 77e4a51
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
- destinationDefinitionId: 68f351a7-2745-4bef-ad7f-996b8e51bb8c
name: ElasticSearch
dockerRepository: airbyte/destination-elasticsearch
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.com/integrations/destinations/elasticsearch
icon: elasticsearch.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-elasticsearch:0.1.5"
- dockerImage: "airbyte/destination-elasticsearch:0.1.6"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/elasticsearch"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-elasticsearch-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
testImplementation libs.connectors.testcontainers.elasticsearch
integrationTestJavaImplementation libs.connectors.testcontainers.elasticsearch

integrationTestJavaImplementation project(':airbyte-commons-worker')
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-elasticsearch')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,29 @@

package io.airbyte.integrations.destination.elasticsearch;

import static co.elastic.clients.elasticsearch.watcher.Input.HTTP;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.net.URL;
import java.util.Objects;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchStrictEncryptDestination extends SpecModifyingDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchStrictEncryptDestination.class);
private final ObjectMapper mapper = new ObjectMapper();
private static final String NON_EMPTY_URL_ERR_MSG = "Server Endpoint is a required field";
private static final String NON_SECURE_URL_ERR_MSG = "Server Endpoint requires HTTPS";

public ElasticsearchStrictEncryptDestination() {
super(new ElasticsearchDestination());
Expand All @@ -38,4 +48,27 @@ public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) th
return spec;
}

@Override
public AirbyteConnectionStatus check(JsonNode config) throws Exception {

final ConnectorConfiguration configObject = convertConfig(config);
if (Objects.isNull(configObject.getEndpoint())) {
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(NON_EMPTY_URL_ERR_MSG);
}

if (new URL(configObject.getEndpoint()).getProtocol().equals(HTTP)) {
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(NON_SECURE_URL_ERR_MSG);
}

return super.check(config);
}

private ConnectorConfiguration convertConfig(JsonNode config) {
return mapper.convertValue(config, ConnectorConfiguration.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

package io.airbyte.integrations.destination.elasticsearch;

import static org.junit.Assert.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
Expand All @@ -18,6 +21,7 @@
import java.util.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticsearchStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {
Expand Down Expand Up @@ -88,21 +92,29 @@ protected TestDataComparator getTestDataComparator() {

@Override
protected JsonNode getConfig() {

final JsonNode authConfig = Jsons.jsonNode(Map.of(
"method", "basic",
"username", "elastic",
"password", "s3cret"));

return Jsons.jsonNode(ImmutableMap.builder()
.put("endpoint", String.format("https://%s:%s", container.getHost(), container.getMappedPort(9200)))
.put("authenticationMethod", authConfig)
.put("authenticationMethod", getAuthConfig())
.put("ca_certificate", new String(container.copyFileFromContainer(
"/usr/share/elasticsearch/config/certs/http_ca.crt",
InputStream::readAllBytes), StandardCharsets.UTF_8))
.build());
}

protected JsonNode getUnsecureConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200)))
.put("authenticationMethod", getAuthConfig())
.build());
}

protected JsonNode getAuthConfig() {
return Jsons.jsonNode(Map.of(
"method", "basic",
"username", "elastic",
"password", "s3cret"));
}

@Override
protected JsonNode getFailCheckConfig() {
// should result in a failed connection check
Expand Down Expand Up @@ -135,4 +147,9 @@ protected void tearDown(DestinationAcceptanceTest.TestDestinationEnv testEnv) {
connection.allIndices().forEach(connection::deleteIndexIfPresent);
}

@Test
public void testCheckConnectionInvalidHttpProtocol() throws Exception {
assertEquals(Status.FAILED, runCheck(getUnsecureConfig()).getStatus());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-elasticsearch
1 change: 1 addition & 0 deletions docs/integrations/destinations/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ Using this feature requires additional configuration, when creating the source.

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.6 | 2022-10-26 | [18341](https://github.com/airbytehq/airbyte/pull/18341) | enforce ssl connection on cloud |
| 0.1.5 | 2022-10-24 | [18177](https://github.com/airbytehq/airbyte/pull/18177) | add custom CA certificate processing |
| 0.1.4 | 2022-10-14 | [17805](https://github.com/airbytehq/airbyte/pull/17805) | add SSH Tunneling |
| 0.1.3 | 2022-05-30 | [14640](https://github.com/airbytehq/airbyte/pull/14640) | Include lifecycle management |
Expand Down

0 comments on commit 77e4a51

Please sign in to comment.