-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[cdc-pipeline-connector][doris] Introduce Doris cdc pipeline DataSink #2729
Conversation
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connectors-doris/pom.xml
Outdated
Show resolved
Hide resolved
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connectors-doris/pom.xml
Outdated
Show resolved
Hide resolved
...ne-connectors-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSink.java
Outdated
Show resolved
Hide resolved
private void applyAddColumnEvent(AddColumnEvent event) throws IOException, IllegalArgumentException { | ||
TableId tableId = event.tableId(); | ||
List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns(); | ||
for(AddColumnEvent.ColumnWithPosition col: addedColumns){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some kinds of ColumnPosition, Does here deal with LAST type only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because adding a value column in Doris does not support adding it to the middle of multiple Key columns.
At the same time, the default is to use JSON format to import. The order of the columns will not affect the data quality. The value column can be directly appended to the end.
...ectors-doris/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory
Outdated
Show resolved
Hide resolved
...ectors-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSinkOptions.java
Outdated
Show resolved
Hide resolved
...ectors-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java
Outdated
Show resolved
Hide resolved
...ne-connectors-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDatabase.java
Outdated
Show resolved
Hide resolved
# Conflicts: # flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java
Addressed the comments. PTAL @lvyanquan |
@JNSimba Run 'mvn spotless:apply' to fix these violations. |
flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericStringData.java
Outdated
Show resolved
Hide resolved
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
Show resolved
Hide resolved
...tor-doris/src/main/java/com/ververica/cdc/connectors/doris/factory/DorisDataSinkFactory.java
Outdated
Show resolved
Hide resolved
/** A serializer for Event to Tuple2<String, byte[]> */ | ||
public class DorisEventSerializer implements DorisRecordSerializer<Event> { | ||
private ObjectMapper objectMapper = new ObjectMapper(); | ||
private Map<TableId, Schema> schemaMaps = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A troublesome problem is that we need to maintain the schemaMaps in State to recover from failure, so we need to add a subclass of DorisWriter or DorisBatchWriter to overwrite snapshotState
method.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for this comment, as the discuss before, CreateTableEvent will always sent before DataChangEvent, so we don't need to consider this situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I understand that it will resend CreateTableEvent even if it is restarted.
# Conflicts: # flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericStringData.java
flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/RecordDataUtils.java
Outdated
Show resolved
Hide resolved
# Conflicts: # flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java
...nector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisEventSerializer.java
Outdated
Show resolved
Hide resolved
# Conflicts: # flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java
Addressed the comments. PTAL @lvyanquan |
Thanks for your contribution. Over look good to me. Left some comments, and can you clean the commit message? |
flink-cdc-common/src/main/java/com/ververica/cdc/common/types/utils/DataTypeUtils.java
Show resolved
Hide resolved
...nector-doris/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory
Show resolved
Hide resolved
...tor-doris/src/main/java/com/ververica/cdc/connectors/doris/factory/DorisDataSinkFactory.java
Outdated
Show resolved
Hide resolved
...nector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSinkOptions.java
Outdated
Show resolved
Hide resolved
...nector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java
Outdated
Show resolved
Hide resolved
Addressed the comments. PTAL @lvyanquan |
...nector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSinkOptions.java
Outdated
Show resolved
Hide resolved
Addressed the comments. PTAL @lvyanquan |
Resolved by 4abd86a |
This closes #2646
add a
DorisDataSink
that implement interface ofDataSink
to build a pipeline.