Skip to content

Commit

Permalink
[FLINK-35894][pipeline-connector][es] Support for ElasticSearch 6, 7 …
Browse files Browse the repository at this point in the history
…versions

This closes #3495.
  • Loading branch information
proletarians authored and leonardBang committed Aug 12, 2024
1 parent ac14dba commit 8137f9d
Show file tree
Hide file tree
Showing 25 changed files with 1,374 additions and 518 deletions.
2 changes: 2 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,5 @@ doris-pipeline-connector:
- flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/**/*
starrocks-pipeline-connector:
- flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/**/*
elasticsearch-pipeline-connector:
- flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/**/*
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
Expand All @@ -20,31 +37,21 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<elasticsearch.version>8.12.1</elasticsearch.version>
<flink.version>1.18.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.binary.version>4.0</scala.binary.version>
<jackson.version>2.13.2</jackson.version>
<surefire.module.config>--add-opens=java.base/java.util=ALL-UNNAMED</surefire.module.config>
<testcontainers.version>1.16.0</testcontainers.version>
<flink.connector.elasticsearch.version>4.0-SNAPSHOT</flink.connector.elasticsearch.version>
<httpclient.version>4.5.13</httpclient.version>
<junit.jupiter.version>5.7.1</junit.jupiter.version>
<assertj.version>3.18.1</assertj.version>
<junit.version>4.13.2</junit.version>
<slf4j.version>1.7.32</slf4j.version>
<junit.platform.version>1.10.2</junit.platform.version>
<paimon.version>0.7.0-incubating</paimon.version>
<hadoop.version>2.8.5</hadoop.version>
<hive.version>2.3.9</hive.version>
<mockito.version>3.4.6</mockito.version>
<jakarta.json.version>2.0.2</jakarta.json.version>
</properties>

<dependencies>
<!-- Flink Dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down Expand Up @@ -108,7 +115,27 @@
<scope>test</scope>
</dependency>

<!-- Elasticsearch Clients -->
<!-- Elasticsearch 6 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-elasticsearch6</artifactId>
<version>${flink.connector.elasticsearch.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-elasticsearch7</artifactId>
<version>${flink.connector.elasticsearch.version}</version>
</dependency>


<!-- Elasticsearch 8 Client -->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
Expand Down Expand Up @@ -204,6 +231,16 @@
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class ElasticsearchSinkOptions implements Serializable {
private final long maxTimeInBufferMS;
private final long maxRecordSizeInBytes;
private final NetworkConfig networkConfig;
private final int version;
private final String username;
private final String password;

/** Constructor for ElasticsearchSinkOptions. */
public ElasticsearchSinkOptions(
Expand All @@ -43,14 +46,20 @@ public ElasticsearchSinkOptions(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
NetworkConfig networkConfig) {
NetworkConfig networkConfig,
int version,
String username,
String password) {
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxTimeInBufferMS = maxTimeInBufferMS;
this.maxRecordSizeInBytes = maxRecordSizeInBytes;
this.networkConfig = networkConfig;
this.version = version;
this.username = username;
this.password = password;
}

/** @return the maximum batch size */
Expand Down Expand Up @@ -92,4 +101,16 @@ public NetworkConfig getNetworkConfig() {
public List<HttpHost> getHosts() {
return networkConfig.getHosts();
}

public int getVersion() {
return version;
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.elasticsearch.serializer;

import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.delete.DeleteRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.index.IndexRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.Requests;

import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

/**
* A utility class that creates Elasticsearch 6.x specific requests.
*
* <p>This class provides methods to create {@link IndexRequest} and {@link DeleteRequest} objects
* that are compatible with Elasticsearch 6.x based on the operations provided.
*/
public class Elasticsearch6RequestCreator {

private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Creates an Elasticsearch 6 IndexRequest.
*
* @param operation IndexOperation object.
* @return IndexRequest object.
*/
public static IndexRequest createIndexRequest(IndexOperation<?> operation) {
// Convert the document to Map<String, Object>
Map<String, Object> documentMap =
objectMapper.convertValue(operation.document(), Map.class);

// Create and return IndexRequest, ensuring type field is set
return Requests.indexRequest()
.index(operation.index())
.type("_doc") // Assuming type is "_doc", adjust as necessary
.id(operation.id())
.source(documentMap);
}

/**
* Creates an Elasticsearch 6 DeleteRequest.
*
* @param operation DeleteOperation object.
* @return DeleteRequest object.
*/
public static DeleteRequest createDeleteRequest(DeleteOperation operation) {
String index = operation.index();
String id = operation.id();

// Create and return DeleteRequest, ensuring type field is set
return Requests.deleteRequest(index)
.type("_doc") // Assuming type is "_doc", adjust as necessary
.id(id);
}
}
Loading

0 comments on commit 8137f9d

Please sign in to comment.