Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35894] Add Elasticsearch Sink Connector for Flink CDC Pipeline #3495

Merged
merged 2 commits into from
Aug 12, 2024

Conversation

proletarians
Copy link
Contributor

This commit introduces the Elasticsearch Sink Connector for Flink CDC Pipeline. It includes:

  • Configuration options for Elasticsearch sink
  • Serialization logic for Elasticsearch events
  • Data type conversion utilities
  • Elasticsearch sink implementation
  • Factory for creating Elasticsearch data sinks

These changes enable Flink CDC to efficiently stream data changes to Elasticsearch.

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connectors</artifactId>
<version>3.2-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use <version>${revision}</version> like other connectors.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>3.2-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use <version>${project.version}</version>

import java.io.Serializable;
import java.util.List;

/** DorisDataSink Options reference {@link ElasticsearchSinkOptions}. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ElasticsearchDataSink

<jackson.version>2.13.2</jackson.version>
<surefire.module.config>--add-opens=java.base/java.util=ALL-UNNAMED</surefire.module.config>
<testcontainers.version>1.16.0</testcontainers.version>
</properties>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some properties were already provided, this can be simplified by:

    <properties>
        <elasticsearch.version>8.12.1</elasticsearch.version>
    </properties>


private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) {
List<HttpHost> hosts = parseHosts(cdcConfig.get(HOSTS));
NetworkConfig networkConfig = new NetworkConfig(hosts, null, null, null, null, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that username and password were not passed.

appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a org.apache.flink.cdc.common.factories.Factory file for SPI like other connectors.

* A class representing a record with multiple fields of various types. Provides methods to access
* fields by position and type.
*/
public class RecordData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anywhere to use this class, is it necessary?

}
Schema updatedSchema =
SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent);
schemaMaps.put(tableId, updatedSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we modify column name in upstream, do we need to call createSchemaIndexOperation method


runJobWithEvents(events);

verifyInsertedData(tableId, "2", 2, 2.0, "value2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for processing all supported data types.

}

private void verifyInsertedData(
TableId tableId, String id, int expectedId, double expectedNumber, String expectedName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass List<String> for column name and List<Object> for expected object to simplify verification.

@lvyanquan
Copy link
Contributor

Thanks @proletarians for this contribution, left some comments.

*
* @param <InputT> type of Operations
*/
public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, Operation> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support writing to es7 using this Writer?

case CHAR:
case VARCHAR:
return recordData.getString(index);
default:
Copy link
Contributor

@lvyanquan lvyanquan Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Decimal/TimeStamp/Date type were missed, do you plan to support that?

for (int i = 0; i < recordData.getArity(); i++) {
Column column = columns.get(i);
ColumnType columnType = ColumnType.valueOf(column.getType().getTypeRoot().name());
ElasticsearchRowConverter.SerializationConverter converter =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we cache TableId and list of converter to avoid creating convert everytime we meet a old TableId?

schema.getColumns().stream()
.map(Column::asSummaryString)
.collect(Collectors.toList()));
schemaMap.put("primaryKeys", schema.primaryKeys());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did these keys come from, Is this something we customized.

ConfigOptions.key("index")
.stringType()
.noDefaultValue()
.withDescription("The Elasticsearch index name to write to.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't see any place that uses this option.
What about setting all index name to this value if this config was set, otherwise use tableId as index name, like topic in Kafka connector.

.build();
}

private BulkOperationVariant applyDataChangeEvent(DataChangeEvent event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to named this method createBulkOperationVariant.

"Primary key column not found: "
+ primaryKey));
int index = schema.getColumns().indexOf(column);
return getFieldValue(recordData, column.getType(), index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be replaced with

converterCache.get(TableId.tableId("your_tableId")).get(index).serialize(index, recordData);

here you could pass TableId of this RecordData.

if (schemaChangeEvent instanceof CreateTableEvent) {
Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema();
schemaMaps.put(tableId, schema);
return createSchemaIndexOperation(tableId, schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this logic to ElasticsearchMetadataApplier as there will have more than one subtasks to receive the SchemaChangeEvent.

new ArrayList<>();
for (Column column : schema.getColumns()) {
ColumnType columnType =
ColumnType.valueOf(column.getType().getTypeRoot().name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversion will lead to precise lost, and it's unnecessary.

// Decimal type
case DECIMAL:
return (pos, data) -> {
DecimalData decimalData = data.getDecimal(pos, 17, 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should get the precise from original data type.

private final long maxTimeInBufferMS;
private final long maxRecordSizeInBytes;
private final NetworkConfig networkConfig;
private final int version; // 新增字段
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using Chinese language in comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments are still existed.

<slf4j.version>1.7.32</slf4j.version>
<junit.platform.version>1.10.2</junit.platform.version>
<paimon.version>0.7.0-incubating</paimon.version>
<hadoop.version>2.8.5</hadoop.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated version can be removed.

Copy link
Contributor

@lvyanquan lvyanquan Aug 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        <paimon.version>0.7.0-incubating</paimon.version>
        <hadoop.version>2.8.5</hadoop.version>
        <hive.version>2.3.9</hive.version>

These dependencies are unnecessary and can be removed.

* various data types that can be used in database columns and are relevant for serialization and
* deserialization processes.
*/
public enum ColumnType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is now useless and can be removed.

}
} catch (Exception e) {
// Handle the exception as needed, e.g., log the error
System.err.println("Failed to deserialize Operation: " + e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use Logger to display this.

assertThat(response.getSource().get("extra_bool")).isEqualTo(expectedExtraBool);
}

private List<Event> createTestEvents(TableId tableId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like createTestEvents/createTestEventsWithDelete/createTestEventsWithAddColumn are common method in Elasticsearch6DataSinkITCaseTest/Elasticsearch7DataSinkITCaseTest/ElasticsearchDataSinkITCaseTest, we can extract them to an util class of a parent class.

private final long maxTimeInBufferMS;
private final long maxRecordSizeInBytes;
private final NetworkConfig networkConfig;
private final int version; // 新增字段
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments are still existed.

import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was not uploaded.

new IllegalStateException(
"Primary key column not found: "
+ primaryKey));
int index = schema.getColumns().indexOf(column);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use a for loop to avoid traversing twice.

Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @proletarians for this contribution,LGTM.

@lvyanquan
Copy link
Contributor

hi @leonardBang, could you please help to
trige CI check?

@yuxiqian
Copy link
Contributor

yuxiqian commented Aug 12, 2024

Hi @proletarians,

License check is failing since it detects some questionable licenses. Seems jakarta.json/jakarta.json-api and org.eclipse.parsson/parsson library are released in EPL 2.0 license (and an alternative GPL-2.0 license), which is marked as "Category B", which needs manual check to verify if it's compatible with ASF policies[1].

By quickly checking licenses declared by jakarta.json/jakarta.json-api and org.eclipse.parsson/parsson, I believe they could be legally packaged into binary release.

You may apply the folllowing patch to suppress the false alarm:

--- a/tools/ci/license_check.rb	(revision 44ea2a73d662b6adaae1c79beb3f1ab3e37c6278)
+++ b/tools/ci/license_check.rb	(date 1723425566467)
@@ -75,6 +75,9 @@
   'org.glassfish.jersey', # dual-licensed under GPL 2 and EPL 2.0
   'org.glassfish.hk2', # dual-licensed under GPL 2 and EPL 2.0
   'javax.ws.rs-api', # dual-licensed under GPL 2 and EPL 2.0
+  'jakarta.json-api', # dual-licensed under GPL 2 and EPL 2.0
+  'org.eclipse.parsson', # EPL 2.0
+  'org/eclipse/parsson', # EPL 2.0
   'jakarta.ws.rs' # dual-licensed under GPL 2 and EPL 2.0
 ].freeze
 
@@ -110,7 +113,7 @@
   Zip::File.open(jar_file) do |jar|
     jar.filter { |e| e.ftype == :file }
        .filter { |e| !File.basename(e.name).downcase.end_with?(*BINARY_FILE_EXTENSIONS) }
-       .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 'dependencies' }
+       .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 'dependencies', 'notice' }
        .filter { |e| EXCEPTION_PACKAGES.none? { |ex| e.name.include? ex } }
        .map do |e|
          content = e.get_input_stream.read.force_encoding('UTF-8')

[1] https://www.apache.org/legal/resolved.html

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @proletarians' great work, just left some comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why explicitly specifying a much older version of ryuk here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: please also declare this new connector into .github/labeler.yml to label PRs correctly.


private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class);
private static final String ELASTICSEARCH_VERSION = "7.10.2";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Elasticsearch6DataSinkITCaseTest runs on ES 7.10.2?

@github-actions github-actions bot added the build label Aug 12, 2024
Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @proletarians' rapid response!

@leonardBang
Copy link
Contributor

@proletarians Could you check the failed CI?

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @proletarians for the nice work and @lvyanquan and @yuxiqian for the review work, +1

@leonardBang leonardBang merged commit 8137f9d into apache:master Aug 12, 2024
2 checks passed
qiaozongmi pushed a commit to qiaozongmi/flink-cdc that referenced this pull request Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants