diff --git a/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md b/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md index 609e797008..62798a8210 100644 --- a/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md +++ b/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md @@ -42,16 +42,23 @@ under the License. version: '2.1' services: observer: - image: oceanbase/oceanbase-ce:4.0.0.0 + image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515' container_name: observer - network_mode: "host" + environment: + - 'MODE=mini' + - 'OB_SYS_PASSWORD=123456' + - 'OB_TENANT_PASSWORD=654321' + ports: + - '2881:2881' + - '2882:2882' oblogproxy: - image: whhe/oblogproxy:1.1.0_4x + image: 'oceanbase/oblogproxy-ce:latest' container_name: oblogproxy environment: - 'OB_SYS_USERNAME=root' - - 'OB_SYS_PASSWORD=pswd' - network_mode: "host" + - 'OB_SYS_PASSWORD=123456' + ports: + - '2983:2983' elasticsearch: image: 'elastic/elasticsearch:7.6.0' container_name: elasticsearch @@ -85,42 +92,26 @@ services: docker-compose up -d ``` -### 设置密码 - -OceanBase 中 root 用户默认是没有密码的,但是 oblogproxy 需要配置一个使用非空密码的系统租户用户,因此这里我们需要先为 root@sys 用户设置一个密码。 +### 查询 Root Service List 登陆 sys 租户的 root 用户: ```shell -docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -``` - -设置密码,注意这里的密码需要与上一步中 oblogproxy 服务的环境变量 'OB_SYS_PASSWORD' 保持一样。 - -```mysql -ALTER USER root IDENTIFIED BY 'pswd'; -``` - -OceanBase 从社区版 4.0.0.0 开始只支持对非 sys 租户的增量数据拉取,这里我们使用 test 租户的 root 用户作为示例。 - -登陆 test 租户的 root 用户: - -```shell -docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test +docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456 ``` -设置密码: +执行以下 sql 以查询 root service list,将 VALUE 列的值保存下来。 ```mysql -ALTER USER root IDENTIFIED BY 'test'; +SHOW PARAMETERS LIKE 'rootservice_list'; ``` ### 准备数据 -使用 'root@test' 用户登陆。 +使用测试用的 test 租户的 root 用户登陆。 ```shell -docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest +docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321 ``` ```sql @@ -169,6 +160,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), ### 在 Flink SQL CLI 中使用 Flink DDL 创建表 +注意在 OceanBase 源表的 SQL 中替换 root_service_list 为真实值。 + ```sql -- 设置间隔时间为3秒 Flink SQL> SET execution.checkpointing.interval = 3s; @@ -189,13 +182,13 @@ Flink SQL> CREATE TABLE orders ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'root@test', - 'password' = 'test', + 'password' = '654321', 'tenant-name' = 'test', 'database-name' = '^ob$', 'table-name' = '^orders$', 'hostname' = 'localhost', 'port' = '2881', - 'rootserver-list' = '127.0.0.1:2882:2881', + 'rootserver-list' = '${root_service_list}', 'logproxy.host' = 'localhost', 'logproxy.port' = '2983', 'working-mode' = 'memory' @@ -211,13 +204,13 @@ Flink SQL> CREATE TABLE products ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'root@test', - 'password' = 'test', + 'password' = '654321', 'tenant-name' = 'test', 'database-name' = '^ob$', 'table-name' = '^products$', 'hostname' = 'localhost', 'port' = '2881', - 'rootserver-list' = '127.0.0.1:2882:2881', + 'rootserver-list' = '${root_service_list}', 'logproxy.host' = 'localhost', 'logproxy.port' = '2983', 'working-mode' = 'memory' diff --git a/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md b/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md index a9bdc3b98d..744e8b79bb 100644 --- a/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md +++ b/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md @@ -37,25 +37,27 @@ under the License. Create `docker-compose.yml`. -*Note*: `host` network mode is required in this demo, so it can only work on Linux, see [network-tutorial-host](https://docs.docker.com/network/network-tutorial-host/). - ```yaml version: '2.1' services: observer: - image: oceanbase/oceanbase-ce:4.2.0.0 + image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515' container_name: observer environment: - - 'MODE=slim' - - 'OB_ROOT_PASSWORD=pswd' - network_mode: "host" + - 'MODE=mini' + - 'OB_SYS_PASSWORD=123456' + - 'OB_TENANT_PASSWORD=654321' + ports: + - '2881:2881' + - '2882:2882' oblogproxy: - image: whhe/oblogproxy:1.1.3_4x + image: 'oceanbase/oblogproxy-ce:latest' container_name: oblogproxy environment: - 'OB_SYS_USERNAME=root' - - 'OB_SYS_PASSWORD=pswd' - network_mode: "host" + - 'OB_SYS_PASSWORD=123456' + ports: + - '2983:2983' elasticsearch: image: 'elastic/elasticsearch:7.6.0' container_name: elasticsearch @@ -89,22 +91,18 @@ Execute the following command in the directory where `docker-compose.yml` is loc docker-compose up -d ``` -### Set password +### Query Root Service List -From OceanBase 4.0.0.0 CE, we can only fetch the commit log of non-sys tenant. - -Here we use the 'test' tenant for example. - -Login with 'root' user of 'test' tenant: +Login with 'root' user of 'sys' tenant: ```shell -docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test +docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456 ``` -Set a password: +Query the root service list by following SQL and store the value. ```mysql -ALTER USER root IDENTIFIED BY 'test'; +SHOW PARAMETERS LIKE 'rootservice_list'; ``` ### Create data for reading snapshot @@ -112,7 +110,7 @@ ALTER USER root IDENTIFIED BY 'test'; Login 'root' user of 'test' tenant. ```shell -docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest +docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321 ``` Insert data: @@ -163,6 +161,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), ### Use Flink DDL to create dynamic table in Flink SQL CLI +Note that in the SQL of the OceanBase source table, replace root_service_list with the actual value. + ```sql -- checkpoint every 3000 milliseconds Flink SQL> SET execution.checkpointing.interval = 3s; @@ -183,13 +183,13 @@ Flink SQL> CREATE TABLE orders ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'root@test', - 'password' = 'test', + 'password' = '654321', 'tenant-name' = 'test', 'database-name' = '^ob$', 'table-name' = '^orders$', 'hostname' = 'localhost', 'port' = '2881', - 'rootserver-list' = '127.0.0.1:2882:2881', + 'rootserver-list' = '${root_service_list}', 'logproxy.host' = 'localhost', 'logproxy.port' = '2983', 'working-mode' = 'memory' @@ -205,13 +205,13 @@ Flink SQL> CREATE TABLE products ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'root@test', - 'password' = 'test', + 'password' = '654321', 'tenant-name' = 'test', 'database-name' = '^ob$', 'table-name' = '^products$', 'hostname' = 'localhost', 'port' = '2881', - 'rootserver-list' = '127.0.0.1:2882:2881', + 'rootserver-list' = '${root_service_list}', 'logproxy.host' = 'localhost', 'logproxy.port' = '2983', 'working-mode' = 'memory' diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml index 1ed25b89fb..726d7f4382 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml @@ -163,4 +163,21 @@ limitations under the License. + + + + org.apache.maven.plugins + maven-jar-plugin + + + test-jar + + test-jar + + + + + + + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java index c3a4c2ba5d..413b7494fe 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java @@ -17,20 +17,18 @@ package org.apache.flink.cdc.connectors.oceanbase; -import org.apache.flink.runtime.minicluster.RpcServiceSharing; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.table.utils.LegacyRowResource; -import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.flink.util.TestLogger; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.ClassRule; -import org.junit.Rule; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; @@ -43,51 +41,13 @@ import static org.junit.Assert.assertTrue; /** Basic class for testing OceanBase source. */ -public abstract class OceanBaseTestBase extends TestLogger { +public abstract class OceanBaseTestBase extends AbstractTestBase { private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); - protected static final int DEFAULT_PARALLELISM = 4; - - @Rule - public final MiniClusterWithClientResource miniClusterResource = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) - .setRpcServiceSharing(RpcServiceSharing.DEDICATED) - .withHaLeadershipControl() - .build()); - @ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE; - protected final String compatibleMode; - protected final String username; - protected final String password; - protected final String hostname; - protected final int port; - protected final String logProxyHost; - protected final int logProxyPort; - protected final String tenant; - - public OceanBaseTestBase( - String compatibleMode, - String username, - String password, - String hostname, - int port, - String logProxyHost, - int logProxyPort, - String tenant) { - this.compatibleMode = compatibleMode; - this.username = username; - this.password = password; - this.hostname = hostname; - this.port = port; - this.logProxyHost = logProxyHost; - this.logProxyPort = logProxyPort; - this.tenant = tenant; - } + protected abstract OceanBaseCdcMetadata metadata(); protected String commonOptionsString() { return String.format( @@ -96,8 +56,14 @@ protected String commonOptionsString() { + " 'password' = '%s', " + " 'hostname' = '%s', " + " 'port' = '%s', " - + " 'compatible-mode' = '%s'", - username, password, hostname, port, compatibleMode); + + " 'compatible-mode' = '%s', " + + " 'jdbc.driver' = '%s'", + metadata().getUsername(), + metadata().getPassword(), + metadata().getHostname(), + metadata().getPort(), + metadata().getCompatibleMode(), + metadata().getDriverClass()); } protected String logProxyOptionsString() { @@ -106,7 +72,9 @@ protected String logProxyOptionsString() { + " 'tenant-name' = '%s'," + " 'logproxy.host' = '%s'," + " 'logproxy.port' = '%s'", - tenant, logProxyHost, logProxyPort); + metadata().getTenantName(), + metadata().getLogProxyHost(), + metadata().getLogProxyPort()); } protected String initialOptionsString() { @@ -120,7 +88,10 @@ protected String snapshotOptionsString() { return " 'scan.startup.mode' = 'snapshot', " + commonOptionsString(); } - protected abstract Connection getJdbcConnection() throws SQLException; + protected Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + metadata().getJdbcUrl(), metadata().getUsername(), metadata().getPassword()); + } protected void setGlobalTimeZone(String serverTimeZone) throws SQLException { try (Connection connection = getJdbcConnection(); @@ -130,7 +101,8 @@ protected void setGlobalTimeZone(String serverTimeZone) throws SQLException { } protected void initializeTable(String sqlFile) { - final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode, sqlFile); + final String ddlFile = + String.format("ddl/%s/%s.sql", metadata().getCompatibleMode(), sqlFile); final URL ddlTestFile = getClass().getClassLoader().getResource(ddlFile); assertNotNull("Cannot locate " + ddlFile, ddlTestFile); try (Connection connection = getJdbcConnection(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java new file mode 100644 index 0000000000..50885089e6 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase; + +import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.time.Duration; + +/** Utils to help test. */ +@SuppressWarnings("resource") +public class OceanBaseTestUtils { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestUtils.class); + + private static final String LATEST_VERSION = "latest"; + private static final String CDC_TEST_OB_VERSION = "4.2.1.6-106000012024042515"; + + private static final String SYS_PASSWORD = "123456"; + private static final String TEST_PASSWORD = "654321"; + + public static OceanBaseContainer createOceanBaseContainerForCDC() { + return createOceanBaseContainer(CDC_TEST_OB_VERSION, "mini") + .withSysPassword(SYS_PASSWORD) + .withStartupTimeout(Duration.ofMinutes(4)); + } + + public static OceanBaseContainer createOceanBaseContainerForJdbc() { + return createOceanBaseContainer(LATEST_VERSION, "slim") + .withStartupTimeout(Duration.ofMinutes(2)); + } + + public static OceanBaseContainer createOceanBaseContainer(String version, String mode) { + return new OceanBaseContainer(version) + .withMode(mode) + .withTenantPassword(TEST_PASSWORD) + .withEnv("OB_DATAFILE_SIZE", "2G") + .withEnv("OB_LOG_DISK_SIZE", "4G") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + public static LogProxyContainer createLogProxyContainer() { + return new LogProxyContainer(LATEST_VERSION) + .withSysPassword(SYS_PASSWORD) + .withStartupTimeout(Duration.ofMinutes(1)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java index 4388b60af5..616ccd5e23 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java @@ -18,89 +18,55 @@ package org.apache.flink.cdc.connectors.oceanbase.table; import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase; +import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; +import org.testcontainers.containers.Network; import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; import java.sql.Statement; -import java.time.Duration; import java.time.ZoneId; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer; +import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC; /** Integration tests for OceanBase MySQL mode table source. */ -@RunWith(Parameterized.class) public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { - private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLModeITCase.class); - private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private final StreamTableEnvironment tEnv = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().inStreamingMode().build()); - private static final String NETWORK_MODE = "host"; - private static final String OB_SYS_PASSWORD = "123456"; + @ClassRule public static final Network NETWORK = Network.newNetwork(); @ClassRule - public static final GenericContainer OB_SERVER = - new GenericContainer<>("oceanbase/oceanbase-ce:4.2.0.0") - .withNetworkMode(NETWORK_MODE) - .withEnv("MODE", "slim") - .withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD) - .withEnv("OB_DATAFILE_SIZE", "1G") - .withEnv("OB_LOG_DISK_SIZE", "4G") - .withCopyFileToContainer( - MountableFile.forClasspathResource("ddl/mysql/docker_init.sql"), - "/root/boot/init.d/init.sql") - .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) - .withStartupTimeout(Duration.ofMinutes(4)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + public static final OceanBaseContainer OB_SERVER = + createOceanBaseContainerForCDC().withNetwork(NETWORK); @ClassRule - public static final GenericContainer LOG_PROXY = - new GenericContainer<>("whhe/oblogproxy:1.1.3_4x") - .withNetworkMode(NETWORK_MODE) - .withEnv("OB_SYS_PASSWORD", OB_SYS_PASSWORD) - .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) - .withStartupTimeout(Duration.ofMinutes(1)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - @BeforeClass - public static void startContainers() { - LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(OB_SERVER, LOG_PROXY)).join(); - LOG.info("Containers are started."); - } + public static final LogProxyContainer LOG_PROXY = + createLogProxyContainer().withNetwork(NETWORK); + + private static final OceanBaseCdcMetadata METADATA = + new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY); - @AfterClass - public static void stopContainers() { - LOG.info("Stopping containers..."); - Stream.of(OB_SERVER, LOG_PROXY).forEach(GenericContainer::stop); - LOG.info("Containers are stopped."); + @Override + protected OceanBaseCdcMetadata metadata() { + return METADATA; } @Before @@ -110,47 +76,11 @@ public void before() { env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); } - private final String rsList; - - public OceanBaseMySQLModeITCase( - String username, - String password, - String hostname, - int port, - String logProxyHost, - int logProxyPort, - String tenant, - String rsList) { - super("mysql", username, password, hostname, port, logProxyHost, logProxyPort, tenant); - this.rsList = rsList; - } - - @Parameterized.Parameters - public static List parameters() { - return Collections.singletonList( - new Object[] { - "root@test", - "123456", - "127.0.0.1", - 2881, - "127.0.0.1", - 2983, - "test", - "127.0.0.1:2882:2881" - }); - } - @Override protected String logProxyOptionsString() { return super.logProxyOptionsString() + " , " - + String.format(" 'rootserver-list' = '%s'", rsList); - } - - @Override - protected Connection getJdbcConnection() throws SQLException { - return DriverManager.getConnection( - "jdbc:mysql://" + hostname + ":" + port + "/?useSSL=false", username, password); + + String.format(" 'rootserver-list' = '%s'", METADATA.getRsList()); } @Test @@ -312,6 +242,8 @@ public void testMetadataColumns() throws Exception { waitForSinkSize("sink", snapshotSize + 1); + String tenant = metadata().getTenantName(); + List expected = Arrays.asList( "+I(" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java index b11da43aec..e7345baba1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.cdc.connectors.oceanbase.table; import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseOracleCdcMetadata; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; @@ -26,20 +28,14 @@ import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; -import java.util.Collections; import java.util.List; /** Integration tests for OceanBase Oracle mode table source. */ @Ignore("Test ignored before oceanbase-xe docker image is available") -@RunWith(Parameterized.class) public class OceanBaseOracleModeITCase extends OceanBaseTestBase { private final StreamExecutionEnvironment env = @@ -48,61 +44,25 @@ public class OceanBaseOracleModeITCase extends OceanBaseTestBase { StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().inStreamingMode().build()); - private final String schema; - private final String configUrl; - - public OceanBaseOracleModeITCase( - String username, - String password, - String hostname, - int port, - String logProxyHost, - int logProxyPort, - String tenant, - String schema, - String configUrl) { - super("oracle", username, password, hostname, port, logProxyHost, logProxyPort, tenant); - this.schema = schema; - this.configUrl = configUrl; - } - - @Parameterized.Parameters - public static List parameters() { - return Collections.singletonList( - new Object[] { - "SYS@test", - "123456", - "127.0.0.1", - 2881, - "127.0.0.1", - 2983, - "test", - "SYS", - "http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster" - }); - } + private static final OceanBaseCdcMetadata METADATA = new OceanBaseOracleCdcMetadata(); @Override - protected String commonOptionsString() { - return super.commonOptionsString() + " , " + " 'jdbc.driver' = 'com.oceanbase.jdbc.Driver'"; + protected OceanBaseCdcMetadata metadata() { + return METADATA; } @Override protected String logProxyOptionsString() { return super.logProxyOptionsString() + " , " - + String.format(" 'config-url' = '%s'", configUrl); - } - - @Override - protected Connection getJdbcConnection() throws SQLException { - return DriverManager.getConnection( - "jdbc:oceanbase://" + hostname + ":" + port + "/" + schema, username, password); + + String.format(" 'config-url' = '%s'", METADATA.getConfigUrl()); } @Test public void testAllDataTypes() throws Exception { initializeTable("column_type_test"); + + String schema = metadata().getDatabase(); String sourceDDL = String.format( "CREATE TABLE full_types (" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java new file mode 100644 index 0000000000..c33eccbb42 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +import org.jetbrains.annotations.NotNull; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.util.Collections; +import java.util.Set; + +/** OceanBase Log Proxy container. */ +public class LogProxyContainer extends GenericContainer { + + private static final String IMAGE = "oceanbase/oblogproxy-ce"; + + private static final int PORT = 2983; + private static final String ROOT_USER = "root"; + + private String sysPassword; + + public LogProxyContainer(String version) { + super(DockerImageName.parse(IMAGE + ":" + version)); + addExposedPorts(PORT); + setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1)); + } + + @Override + protected void configure() { + addEnv("OB_SYS_USERNAME", ROOT_USER); + addEnv("OB_SYS_PASSWORD", sysPassword); + } + + public @NotNull Set getLivenessCheckPortNumbers() { + return Collections.singleton(this.getMappedPort(PORT)); + } + + public int getPort() { + return getMappedPort(PORT); + } + + public LogProxyContainer withSysPassword(String sysPassword) { + this.sysPassword = sysPassword; + return this; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java new file mode 100644 index 0000000000..bb2469509d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +import java.io.Serializable; + +/** OceanBase CDC metadata. */ +public interface OceanBaseCdcMetadata extends Serializable { + + String getCompatibleMode(); + + String getHostname(); + + int getPort(); + + String getUsername(); + + String getPassword(); + + String getDriverClass(); + + String getDatabase(); + + String getJdbcUrl(); + + String getTenantName(); + + String getLogProxyHost(); + + int getLogProxyPort(); + + default String getConfigUrl() { + return null; + } + + default String getRsList() { + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java new file mode 100644 index 0000000000..1cd4984605 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +import org.jetbrains.annotations.NotNull; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.util.Collections; +import java.util.Set; + +/** OceanBase container. */ +public class OceanBaseContainer extends JdbcDatabaseContainer { + + private static final String IMAGE = "oceanbase/oceanbase-ce"; + + private static final int SQL_PORT = 2881; + private static final int RPC_PORT = 2882; + private static final String ROOT_USER = "root"; + private static final String TEST_DATABASE = "test"; + private static final String DEFAULT_TENANT = "test"; + private static final String DEFAULT_PASSWORD = ""; + + private String mode = "mini"; + private String tenantName = DEFAULT_TENANT; + private String sysPassword = DEFAULT_PASSWORD; + private String tenantPassword = DEFAULT_PASSWORD; + + public OceanBaseContainer(String version) { + super(DockerImageName.parse(IMAGE + ":" + version)); + addExposedPorts(SQL_PORT, RPC_PORT); + setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1)); + } + + @Override + protected void configure() { + addEnv("MODE", mode); + addEnv("OB_CLUSTER_NAME", "flink-cdc-ci"); + if (!DEFAULT_PASSWORD.equals(sysPassword)) { + addEnv("OB_SYS_PASSWORD", sysPassword); + } + if (!DEFAULT_TENANT.equals(tenantName)) { + addEnv("OB_TENANT_NAME", tenantName); + } + if (!DEFAULT_PASSWORD.equals(tenantPassword)) { + addEnv("OB_TENANT_PASSWORD", tenantPassword); + } + } + + protected void waitUntilContainerStarted() { + this.getWaitStrategy().waitUntilReady(this); + } + + public @NotNull Set getLivenessCheckPortNumbers() { + return Collections.singleton(this.getMappedPort(SQL_PORT)); + } + + @Override + public String getDriverClassName() { + return "com.mysql.cj.jdbc.Driver"; + } + + public String getJdbcUrl(String databaseName) { + return "jdbc:mysql://" + + getHost() + + ":" + + getDatabasePort() + + "/" + + databaseName + + "?useSSL=false"; + } + + @Override + public String getJdbcUrl() { + return getJdbcUrl(""); + } + + public int getDatabasePort() { + return getMappedPort(SQL_PORT); + } + + public String getTenantName() { + return tenantName; + } + + @Override + public String getDatabaseName() { + return TEST_DATABASE; + } + + @Override + public String getUsername() { + return ROOT_USER + "@" + tenantName; + } + + @Override + public String getPassword() { + return tenantPassword; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } + + public OceanBaseContainer withMode(String mode) { + this.mode = mode; + return this; + } + + public OceanBaseContainer withTenantName(String tenantName) { + this.tenantName = tenantName; + return this; + } + + public OceanBaseContainer withSysPassword(String sysPassword) { + this.sysPassword = sysPassword; + return this; + } + + public OceanBaseContainer withTenantPassword(String tenantPassword) { + this.tenantPassword = tenantPassword; + return this; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java new file mode 100644 index 0000000000..aede8a2924 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +/** OceanBase CDC MySQL mode metadata. */ +public class OceanBaseMySQLCdcMetadata implements OceanBaseCdcMetadata { + + private final OceanBaseContainer obServerContainer; + private final LogProxyContainer logProxyContainer; + + private String rsList; + + public OceanBaseMySQLCdcMetadata( + OceanBaseContainer obServerContainer, LogProxyContainer logProxyContainer) { + this.obServerContainer = obServerContainer; + this.logProxyContainer = logProxyContainer; + } + + @Override + public String getCompatibleMode() { + return "mysql"; + } + + @Override + public String getHostname() { + return obServerContainer.getHost(); + } + + @Override + public int getPort() { + return obServerContainer.getDatabasePort(); + } + + @Override + public String getUsername() { + return obServerContainer.getUsername(); + } + + @Override + public String getPassword() { + return obServerContainer.getPassword(); + } + + @Override + public String getDriverClass() { + return obServerContainer.getDriverClassName(); + } + + @Override + public String getDatabase() { + return obServerContainer.getDatabaseName(); + } + + @Override + public String getJdbcUrl() { + return "jdbc:mysql://" + getHostname() + ":" + getPort() + "/?useSSL=false"; + } + + @Override + public String getTenantName() { + return obServerContainer.getTenantName(); + } + + @Override + public String getLogProxyHost() { + return logProxyContainer.getHost(); + } + + @Override + public int getLogProxyPort() { + return logProxyContainer.getPort(); + } + + @Override + public String getRsList() { + if (rsList == null) { + try (Connection connection = + DriverManager.getConnection( + getJdbcUrl(), getUsername(), getPassword()); + Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 'rootservice_list'"); + rsList = rs.next() ? rs.getString("VALUE") : null; + } catch (SQLException e) { + throw new RuntimeException("Failed to query rs list", e); + } + if (rsList == null) { + throw new RuntimeException("Got empty rs list"); + } + } + return rsList; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java new file mode 100644 index 0000000000..c68fdda037 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +/** OceanBase CDC Oracle mode metadata. */ +public class OceanBaseOracleCdcMetadata implements OceanBaseCdcMetadata { + + @Override + public String getCompatibleMode() { + return "oracle"; + } + + @Override + public String getHostname() { + return System.getenv("host"); + } + + @Override + public int getPort() { + return Integer.parseInt(System.getenv("port")); + } + + @Override + public String getUsername() { + return System.getenv("username"); + } + + @Override + public String getPassword() { + return System.getenv("password"); + } + + @Override + public String getDatabase() { + return System.getenv("schema"); + } + + @Override + public String getDriverClass() { + return "com.oceanbase.jdbc.Driver"; + } + + @Override + public String getJdbcUrl() { + return "jdbc:oceanbase://" + getHostname() + ":" + getPort() + "/" + getDatabase(); + } + + @Override + public String getTenantName() { + return System.getenv("tenant"); + } + + @Override + public String getLogProxyHost() { + return System.getenv("log_proxy_host"); + } + + @Override + public int getLogProxyPort() { + return Integer.parseInt(System.getenv("log_proxy_port")); + } + + @Override + public String getConfigUrl() { + return System.getenv("config_url"); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java new file mode 100644 index 0000000000..d189d030f4 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertNotNull; + +/** + * Create and populate a unique instance of an OceanBase database for each run of JUnit test. A user + * of class needs to provide a logical name for Debezium and database name. It is expected that + * there is an init file in src/test/resources/ddl/<database_name>.sql. The + * database name is enriched with a unique suffix that guarantees complete isolation between runs + * + * <database_name>_<suffix> + * + *

This class is inspired from Debezium project. + */ +public class UniqueDatabase { + + private static final String[] CREATE_DATABASE_DDL = + new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"}; + private static final String DROP_DATABASE_DDL = "DROP DATABASE IF EXISTS `$DBNAME$`;"; + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + private final OceanBaseContainer container; + private final String databaseName; + private final String templateName; + + public UniqueDatabase(OceanBaseContainer container, String databaseName) { + this(container, databaseName, Integer.toUnsignedString(new Random().nextInt(), 36)); + } + + private UniqueDatabase( + OceanBaseContainer container, String databaseName, final String identifier) { + this.container = container; + this.databaseName = databaseName + "_" + identifier; + this.templateName = databaseName; + } + + public String getHost() { + return container.getHost(); + } + + public int getDatabasePort() { + return container.getDatabasePort(); + } + + public String getDatabaseName() { + return databaseName; + } + + public String getUsername() { + return container.getUsername(); + } + + public String getPassword() { + return container.getPassword(); + } + + /** @return Fully qualified table name <databaseName>.<tableName> */ + public String qualifiedTableName(final String tableName) { + return String.format("%s.%s", databaseName, tableName); + } + + /** Creates the database and populates it with initialization SQL script. */ + public void createAndInitialize() { + final String ddlFile = String.format("ddl/%s.sql", templateName); + final URL ddlTestFile = UniqueDatabase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try { + try (Connection connection = + DriverManager.getConnection( + container.getJdbcUrl(), getUsername(), getPassword()); + Statement statement = connection.createStatement()) { + final List statements = + Arrays.stream( + Stream.concat( + Arrays.stream(CREATE_DATABASE_DDL), + Files.readAllLines( + Paths.get(ddlTestFile.toURI())) + .stream()) + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .map(this::convertSQL) + .collect(Collectors.joining("\n")) + .split(";")) + .map(x -> x.replace("$$", ";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } + } catch (final Exception e) { + throw new IllegalStateException(e); + } + } + + /** Drop the database if it is existing. */ + public void dropDatabase() { + try { + try (Connection connection = + DriverManager.getConnection( + container.getJdbcUrl(), getUsername(), getPassword()); + Statement statement = connection.createStatement()) { + final String dropDatabaseStatement = convertSQL(DROP_DATABASE_DDL); + statement.execute(dropDatabaseStatement); + } + } catch (final Exception e) { + throw new IllegalStateException(e); + } + } + + public Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + container.getJdbcUrl(databaseName), getUsername(), getPassword()); + } + + private String convertSQL(final String sql) { + return sql.replace("$DBNAME$", databaseName); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql deleted file mode 100644 index 0db9c71db4..0000000000 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql +++ /dev/null @@ -1,17 +0,0 @@ --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - --- Set the root user password of test tenant -ALTER USER root IDENTIFIED BY '123456'; diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java index d1c0bb7e73..72576bf178 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -102,7 +102,6 @@ public void before() throws Exception { new GenericContainer<>(getFlinkDockerImageTag()) .withCommand("jobmanager") .withNetwork(NETWORK) - .withExtraHost("host.docker.internal", "host-gateway") .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) .withExposedPorts(JOB_MANAGER_REST_PORT) .withEnv("FLINK_PROPERTIES", flinkProperties) @@ -111,7 +110,6 @@ public void before() throws Exception { taskManager = new GenericContainer<>(getFlinkDockerImageTag()) .withCommand("taskmanager") - .withExtraHost("host.docker.internal", "host-gateway") .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) .withEnv("FLINK_PROPERTIES", flinkProperties) diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml index c1e50101ae..cdb0b43e8e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml @@ -91,6 +91,13 @@ limitations under the License. test-jar test + + org.apache.flink + flink-connector-oceanbase-cdc + ${project.version} + test-jar + test + org.apache.flink flink-connector-oracle-cdc diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java index e257311a5b..2cd2f88eb9 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java @@ -19,99 +19,68 @@ import org.apache.flink.cdc.common.test.utils.JdbcProxy; import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata; +import org.apache.flink.cdc.connectors.oceanbase.testutils.UniqueDatabase; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.MountableFile; - -import java.net.URL; -import java.nio.file.Files; + import java.nio.file.Path; -import java.nio.file.Paths; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; import java.util.Arrays; import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import static org.junit.Assert.assertNotNull; +import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer; +import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC; /** End-to-end tests for oceanbase-cdc connector uber jar. */ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(OceanBaseE2eITCase.class); - - private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + private static final String INTER_CONTAINER_OB_SERVER_ALIAS = "oceanbase"; + private static final String INTER_CONTAINER_LOG_PROXY_ALIAS = "oblogproxy"; private static final Path obCdcJar = TestUtils.getResource("oceanbase-cdc-connector.jar"); private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); - // ------------------------------------------------------------------------------------------ - // OceanBase container variables - // ------------------------------------------------------------------------------------------ - private static final String OB_SERVER_IMAGE = "oceanbase/oceanbase-ce:4.2.0.0"; - private static final String OB_LOG_PROXY_IMAGE = "whhe/oblogproxy:1.1.3_4x"; - private static final String NETWORK_MODE = "host"; - private static final String INTER_CONTAINER_OB_HOST = "host.docker.internal"; - private static final String SYS_PASSWORD = "1234567"; - private static final String TEST_TENANT = "test"; - private static final String TEST_USER = "root@" + TEST_TENANT; - private static final String TEST_PASSWORD = "7654321"; - @ClassRule - public static final GenericContainer OB_SERVER = - new GenericContainer<>(OB_SERVER_IMAGE) - .withNetworkMode(NETWORK_MODE) - .withEnv("MODE", "slim") - .withEnv("OB_DATAFILE_SIZE", "1G") - .withEnv("OB_LOG_DISK_SIZE", "4G") - .withEnv("OB_ROOT_PASSWORD", SYS_PASSWORD) - .withEnv("OB_TENANT_NAME", TEST_TENANT) - .withCopyFileToContainer( - MountableFile.forClasspathResource("docker/oceanbase/setup.sql"), - "/root/boot/init.d/init.sql") - .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) - .withStartupTimeout(Duration.ofMinutes(3)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + public static final OceanBaseContainer OB_SERVER = + createOceanBaseContainerForCDC() + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_OB_SERVER_ALIAS); @ClassRule - public static final GenericContainer LOG_PROXY = - new GenericContainer<>(OB_LOG_PROXY_IMAGE) - .withNetworkMode(NETWORK_MODE) - .withEnv("OB_SYS_PASSWORD", SYS_PASSWORD) - .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) - .withStartupTimeout(Duration.ofMinutes(1)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + public static final LogProxyContainer LOG_PROXY = + createLogProxyContainer() + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_LOG_PROXY_ALIAS); + + private static final OceanBaseCdcMetadata METADATA = + new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY); + + protected final UniqueDatabase obInventoryDatabase = + new UniqueDatabase(OB_SERVER, "oceanbase_inventory"); @Before public void before() { super.before(); - initializeTable("oceanbase_inventory"); + obInventoryDatabase.createAndInitialize(); } - private Connection getTestConnection(String databaseName) { - try { - Class.forName(MYSQL_DRIVER_CLASS); - return DriverManager.getConnection( - String.format("jdbc:mysql://127.0.0.1:2881/%s?useSSL=false", databaseName), - TEST_USER, - TEST_PASSWORD); - } catch (Exception e) { - throw new RuntimeException("Failed to get test jdbc connection", e); - } + @After + public void after() { + super.after(); + + obInventoryDatabase.dropDatabase(); } @Test @@ -131,16 +100,18 @@ public void testOceanBaseCDC() throws Exception { ") WITH (", " 'connector' = 'oceanbase-cdc',", " 'scan.startup.mode' = 'initial',", - " 'username' = '" + TEST_USER + "',", - " 'password' = '" + TEST_PASSWORD + "',", - " 'tenant-name' = '" + TEST_TENANT + "',", - " 'table-list' = 'inventory.products_source',", - " 'hostname' = '" + INTER_CONTAINER_OB_HOST + "',", + " 'username' = '" + METADATA.getUsername() + "',", + " 'password' = '" + METADATA.getPassword() + "',", + " 'tenant-name' = '" + METADATA.getTenantName() + "',", + " 'table-list' = '" + + obInventoryDatabase.qualifiedTableName("products_source") + + "',", + " 'hostname' = '" + INTER_CONTAINER_OB_SERVER_ALIAS + "',", " 'port' = '2881',", - " 'jdbc.driver' = '" + MYSQL_DRIVER_CLASS + "',", - " 'logproxy.host' = '" + INTER_CONTAINER_OB_HOST + "',", + " 'jdbc.driver' = '" + METADATA.getDriverClass() + "',", + " 'logproxy.host' = '" + INTER_CONTAINER_LOG_PROXY_ALIAS + "',", " 'logproxy.port' = '2983',", - " 'rootserver-list' = '127.0.0.1:2882:2881',", + " 'rootserver-list' = '" + METADATA.getRsList() + "',", " 'working-mode' = 'memory',", " 'jdbc.properties.useSSL' = 'false'", ");", @@ -168,7 +139,7 @@ public void testOceanBaseCDC() throws Exception { submitSQLJob(sqlLines, obCdcJar, jdbcJar, mysqlDriverJar); waitUntilJobRunning(Duration.ofSeconds(30)); - try (Connection conn = getTestConnection("inventory"); + try (Connection conn = obInventoryDatabase.getJdbcConnection(); Statement stat = conn.createStatement()) { stat.execute( "UPDATE products_source SET description='18oz carpenter hammer' WHERE id=106;"); @@ -211,32 +182,4 @@ public void testOceanBaseCDC() throws Exception { new String[] {"id", "name", "description", "weight", "enum_c", "json_c"}, 60000L); } - - protected void initializeTable(String sqlFile) { - final String ddlFile = String.format("ddl/%s.sql", sqlFile); - final URL ddlTestFile = OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile); - assertNotNull("Cannot locate " + ddlFile, ddlTestFile); - try (Connection connection = getTestConnection(""); - Statement statement = connection.createStatement()) { - final List statements = - Arrays.stream( - Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() - .map(String::trim) - .filter(x -> !x.startsWith("--") && !x.isEmpty()) - .map( - x -> { - final Matcher m = - COMMENT_PATTERN.matcher(x); - return m.matches() ? m.group(1) : x; - }) - .collect(Collectors.joining("\n")) - .split(";")) - .collect(Collectors.toList()); - for (String stmt : statements) { - statement.execute(stmt); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } } diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java index 91e7e5c6f0..49ef039e03 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java @@ -135,7 +135,6 @@ public void before() { new GenericContainer<>(getFlinkDockerImageTag()) .withCommand("jobmanager") .withNetwork(NETWORK) - .withExtraHost("host.docker.internal", "host-gateway") .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) .withExposedPorts(JOB_MANAGER_REST_PORT) .withEnv("FLINK_PROPERTIES", flinkProperties) @@ -143,7 +142,6 @@ public void before() { taskManager = new GenericContainer<>(getFlinkDockerImageTag()) .withCommand("taskmanager") - .withExtraHost("host.docker.internal", "host-gateway") .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) .withEnv("FLINK_PROPERTIES", flinkProperties) diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql index 6afe82d878..9c4ec59697 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql @@ -16,13 +16,8 @@ -- ---------------------------------------------------------------------------------------------------------------- -- DATABASE: inventory -- ---------------------------------------------------------------------------------------------------------------- --- Create and populate our products using a single insert with many rows -DROP DATABASE IF EXISTS inventory; - -CREATE DATABASE inventory; - -USE inventory; +-- Create and populate our products using a single insert with many rows CREATE TABLE products_source ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL DEFAULT 'flink', diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql deleted file mode 100644 index ac15e8cb92..0000000000 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql +++ /dev/null @@ -1,16 +0,0 @@ --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - -ALTER USER root IDENTIFIED BY '7654321';