-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[3.0][cdc-common] Introduce Event related APIs for Flink CDC 3.0
This closes #2624.
- Loading branch information
1 parent
82a652c
commit c803f5d
Showing
7 changed files
with
349 additions
and
0 deletions.
There are no files selected for viewing
30 changes: 30 additions & 0 deletions
30
flink-cdc-common/src/main/java/com/ververica/cdc/common/event/ChangeEvent.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright 2023 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.ververica.cdc.common.event; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
|
||
/** | ||
* Class {@code ChangeEvent} represents the change events of external systems, including {@link | ||
* DataChangeEvent} and {@link SchemaChangeEvent}. | ||
*/ | ||
@PublicEvolving | ||
public interface ChangeEvent extends Event { | ||
|
||
/** Describes the database table corresponding to the occurrence of a change event. */ | ||
TableId tableId(); | ||
} |
41 changes: 41 additions & 0 deletions
41
flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Copyright 2023 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.ververica.cdc.common.event; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* Class {@code DataChangeEvent} represents the data change events of external systems, such as | ||
* INSERT, UPDATE, DELETE and so on. | ||
*/ | ||
@PublicEvolving | ||
public interface DataChangeEvent extends ChangeEvent { | ||
|
||
/** Describes the record of data before change. */ | ||
DataRecord before(); | ||
|
||
/** Describes the record of data after change. */ | ||
DataRecord after(); | ||
|
||
/** Describes the operation type of the change event. e.g. INSERT, UPDATE, REPLACE, DELETE. */ | ||
OperationType op(); | ||
|
||
/** Optional, describes the metadata of the change event. e.g. MySQL binlog file name, pos. */ | ||
Map<String, String> meta(); | ||
} |
63 changes: 63 additions & 0 deletions
63
flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataRecord.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright 2023 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.ververica.cdc.common.event; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
|
||
/** Class {@code DataRecord} describes the data of changed record in the external system. */ | ||
@PublicEvolving | ||
public interface DataRecord { | ||
|
||
/** Returns the number of fields in this row. */ | ||
int getArity(); | ||
|
||
// ------------------------------------------------------------------------------------------ | ||
// Read-only accessor methods | ||
// ------------------------------------------------------------------------------------------ | ||
|
||
/** Returns true if the field is null at the given position. */ | ||
boolean isNullAt(int pos); | ||
|
||
/** Returns the boolean value at the given position. */ | ||
boolean getBoolean(int pos); | ||
|
||
/** Returns the byte value at the given position. */ | ||
byte getByte(int pos); | ||
|
||
/** Returns the short value at the given position. */ | ||
short getShort(int pos); | ||
|
||
/** Returns the integer value at the given position. */ | ||
int getInt(int pos); | ||
|
||
/** Returns the long value at the given position. */ | ||
long getLong(int pos); | ||
|
||
/** Returns the float value at the given position. */ | ||
float getFloat(int pos); | ||
|
||
/** Returns the double value at the given position. */ | ||
double getDouble(int pos); | ||
|
||
/** Returns the string value at the given position. */ | ||
String getString(int pos); | ||
|
||
/** Returns the binary value at the given position. */ | ||
byte[] getBinary(int pos); | ||
|
||
// TODO: add more methods for other types | ||
} |
26 changes: 26 additions & 0 deletions
26
flink-cdc-common/src/main/java/com/ververica/cdc/common/event/Event.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright 2023 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.ververica.cdc.common.event; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
|
||
/** | ||
* Class {@code Event} is the super interface defines the events of external systems flowing into | ||
* Flink CDC. | ||
*/ | ||
@PublicEvolving | ||
public interface Event {} |
30 changes: 30 additions & 0 deletions
30
flink-cdc-common/src/main/java/com/ververica/cdc/common/event/OperationType.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright 2023 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.ververica.cdc.common.event; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
|
||
/** | ||
* Enum {@code OperationType} describes the type of operation that the data change event reports. | ||
*/ | ||
@PublicEvolving | ||
public enum OperationType { | ||
INSERT, | ||
UPDATE, | ||
REPLACE, | ||
DELETE | ||
} |
26 changes: 26 additions & 0 deletions
26
flink-cdc-common/src/main/java/com/ververica/cdc/common/event/SchemaChangeEvent.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright 2023 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.ververica.cdc.common.event; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
|
||
/** | ||
* Class {@code SchemaChangeEvent} represents the changes in the table structure of the external | ||
* system, such as CREATE, DROP, RENAME and so on. | ||
*/ | ||
@PublicEvolving | ||
public interface SchemaChangeEvent extends ChangeEvent {} |
133 changes: 133 additions & 0 deletions
133
flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
/* | ||
* Copyright 2023 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.ververica.cdc.common.event; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
* The unique identifier used to represent the path of external table or external data collection, | ||
* all external system data collection could map to a dedicated {@code TableId}. | ||
* | ||
* <ul> | ||
* <li>{@code TableId} contains at most three parts, it will be treated as namespace, schema name | ||
* and table name. | ||
* <li>{@code TableId} could contain two parts, it will be treated as schema name and table name. | ||
* <li>{@code TableId} could contain only one part, it will be treated as table name. | ||
* </ul> | ||
* | ||
* <p>Connectors need to establish the mapping between {@code TableId} and external data collection | ||
* object path. For example, | ||
* | ||
* <ul> | ||
* <li>The mapping relationship for Oracle is: (database, schema, table). | ||
* <li>The mapping relationship for MySQL or Doris is: (database, table). | ||
* <li>The mapping relationship for Kafka is: (topic). | ||
* </ul> | ||
*/ | ||
@PublicEvolving | ||
public class TableId { | ||
|
||
@Nullable private final String namespace; | ||
@Nullable private final String schemaName; | ||
private final String tableName; | ||
|
||
private TableId(@Nullable String namespace, @Nullable String schemaName, String tableName) { | ||
this.namespace = namespace; | ||
this.schemaName = schemaName; | ||
this.tableName = Objects.requireNonNull(tableName); | ||
} | ||
|
||
/** The mapping relationship for external systems. e.g. Oracle (database, schema, table). */ | ||
public static TableId tableId(String namespace, String schemaName, String tableName) { | ||
return new TableId( | ||
Objects.requireNonNull(namespace), Objects.requireNonNull(schemaName), tableName); | ||
} | ||
|
||
/** The mapping relationship for external systems. e.g. MySQL (database, table). */ | ||
public static TableId tableId(String schemaName, String tableName) { | ||
return new TableId(null, Objects.requireNonNull(schemaName), tableName); | ||
} | ||
|
||
/** The mapping relationship for external systems. e.g. Kafka (topic). */ | ||
public static TableId tableId(String tableName) { | ||
return new TableId(null, null, tableName); | ||
} | ||
|
||
public static TableId parse(String tableId) { | ||
String[] parts = Objects.requireNonNull(tableId).split("\\."); | ||
if (parts.length == 3) { | ||
return tableId(parts[0], parts[1], parts[2]); | ||
} else if (parts.length == 2) { | ||
return tableId(parts[0], parts[1]); | ||
} else if (parts.length == 1) { | ||
return tableId(parts[0]); | ||
} | ||
throw new IllegalArgumentException("Invalid tableId: " + tableId); | ||
} | ||
|
||
public String identifier() { | ||
if (namespace == null || namespace.isEmpty()) { | ||
if (schemaName == null || schemaName.isEmpty()) { | ||
return tableName; | ||
} | ||
return schemaName + "." + tableName; | ||
} | ||
return namespace + "." + schemaName + "." + tableName; | ||
} | ||
|
||
@Nullable | ||
public String getNamespace() { | ||
return namespace; | ||
} | ||
|
||
@Nullable | ||
public String getSchemaName() { | ||
return schemaName; | ||
} | ||
|
||
public String getTableName() { | ||
return tableName; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
TableId that = (TableId) o; | ||
return Objects.equals(namespace, that.namespace) | ||
&& Objects.equals(schemaName, that.schemaName) | ||
&& Objects.equals(tableName, that.tableName); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(namespace, schemaName, tableName); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return identifier(); | ||
} | ||
} |