Skip to content

Commit

Permalink
[cdc-connector][postgres] Apply DBZ-5398 postgres connector to handle…
Browse files Browse the repository at this point in the history
… functional unique index (apache#2842)

This closes apache#2710.
  • Loading branch information
loserwang1024 authored and joyCurry30 committed Mar 22, 2024
1 parent ff002b5 commit 052e7b8
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;

/**
* {@link JdbcConnection} connection extension used for connecting to Postgres instances.
Expand All @@ -61,6 +62,9 @@
* ConnectionFactory
* <li>override connection() to return a unwrapped PgConnection (otherwise, it will complain
* about HikariProxyConnection cannot be cast to class org.postgresql.core.BaseConnection)
* <li>override isTableUniqueIndexIncluded: Copied DBZ-5398 from Debezium 2.0.0.Final to fix
* https://github.com/ververica/flink-cdc-connectors/issues/2710. Remove this comment
* after bumping debezium version to 2.0.0.Final.
* </ul>
*/
public class PostgresConnection extends JdbcConnection {
Expand All @@ -72,6 +76,10 @@ public class PostgresConnection extends JdbcConnection {
public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat";
public static final String CONNECTION_GENERAL = "Debezium General";

private static final Pattern FUNCTION_DEFAULT_PATTERN =
Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)");
private static final Pattern EXPRESSION_DEFAULT_PATTERN =
Pattern.compile("\\(+(?:.+(?:[+ - * / < > = ~ ! @ # % ^ & | ` ?] ?.+)+)+\\)");
private static Logger LOGGER = LoggerFactory.getLogger(PostgresConnection.class);

private static final String URL_PATTERN =
Expand Down Expand Up @@ -830,6 +838,15 @@ protected boolean isTableType(String tableType) {
return "TABLE".equals(tableType) || "PARTITIONED TABLE".equals(tableType);
}

@Override
protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) {
if (columnName != null) {
return !FUNCTION_DEFAULT_PATTERN.matcher(columnName).matches()
&& !EXPRESSION_DEFAULT_PATTERN.matcher(columnName).matches();
}
return false;
}

@FunctionalInterface
public interface PostgresValueConverterBuilder {
PostgresValueConverter build(TypeRegistry registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;

import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
Expand All @@ -41,7 +42,9 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -219,6 +222,16 @@ protected PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
return postgresSourceConfigFactory;
}

public static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}

public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,16 +895,6 @@ private static List<String> fetchRowData(
return rows.stream().map(stringifier).collect(Collectors.toList());
}

private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}

/**
* Make some changes on the specified customer table. Changelog in string could be accessed by
* {@link #firstPartStreamEvents}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;

import com.ververica.cdc.connectors.postgres.PostgresTestBase;
Expand All @@ -35,6 +38,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -707,4 +711,69 @@ public void testUpsertMode() throws Exception {

result.getJobClient().get().cancel().get();
}

@Test
public void testUniqueIndexIncludingFunction() throws Exception {
// Clear the influence of usesLegacyRows which set USE_LEGACY_TO_STRING = true.
// In this test, print +I,-U, +U to see more clearly.
RowUtils.USE_LEGACY_TO_STRING = false;
initializePostgresTable(POSTGRES_CONTAINER, "index_type_test");

String sourceDDL =
String.format(
"CREATE TABLE functional_unique_index ("
+ " id INTEGER NOT NULL,"
+ " char_c STRING"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
// In the snapshot phase of increment snapshot mode, table without
// primary key is not allowed now.Thus, when
// scan.incremental.snapshot.enabled = true, use 'latest-offset'
// startup mode.
+ (parallelismSnapshot
? " 'scan.startup.mode' = 'latest-offset',"
: "")
+ " 'slot.name' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"indexes",
"functional_unique_index",
parallelismSnapshot,
getSlotName());
tEnv.executeSql(sourceDDL);

// async submit job
TableResult tableResult = tEnv.executeSql("SELECT * FROM functional_unique_index");
List<String> expected = new ArrayList<>();
if (!parallelismSnapshot) {
expected.add("+I[1, a]");
}

// wait a bit to make sure the replication slot is ready
Thread.sleep(5000L);

// generate WAL
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute("UPDATE indexes.functional_unique_index SET char_c=NULL WHERE id=1;");
}

expected.addAll(Arrays.asList("-U[1, a]", "+U[1, null]"));
CloseableIterator<Row> iterator = tableResult.collect();
assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size()));
tableResult.getJobClient().get().cancel().get();
RowUtils.USE_LEGACY_TO_STRING = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ INSERT INTO full_types
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
'2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479 -36.7208)'::geometry,
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: key_type_test
-- ----------------------------------------------------------------------------------------------------------------
-- Generate a number of tables to cover as many of the PG index situation (primary key, unique index) as possible
DROP SCHEMA IF EXISTS indexes CASCADE;
CREATE SCHEMA indexes;
SET search_path TO indexes;

-- Generate a table without primary key but a functional unique index
CREATE TABLE functional_unique_index
(
id INTEGER NOT NULL,
char_c CHAR
);
create unique index test_tbl_idx
on functional_unique_index(id, COALESCE(char_c, ''::text));

ALTER TABLE functional_unique_index
REPLICA IDENTITY FULL;

INSERT INTO functional_unique_index
VALUES (1, 'a');

0 comments on commit 052e7b8

Please sign in to comment.