From 163b0e0926f6f54c1c9ca808a331a492a1883882 Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 27 Jun 2024 22:05:58 +0800 Subject: [PATCH] [FLINK-35638] Refactor OceanBase test cases and remove dependency on host network --- .../oceanbase/OceanBaseTestBase.java | 72 +++------ .../oceanbase/OceanBaseTestUtils.java | 86 +++++++++++ .../table/OceanBaseMySQLModeITCase.java | 105 +++++--------- .../table/OceanBaseOracleModeITCase.java | 70 ++++----- .../testutils/LogProxyContainer.java | 66 +++++++++ .../testutils/OceanBaseCdcMetadata.java | 46 ++++++ .../testutils/OceanBaseContainer.java | 137 ++++++++++++++++++ .../testutils/OceanBaseMySQLCdcMetadata.java | 86 +++++++++++ .../testutils/OceanBaseOracleCdcMetadata.java | 77 ++++++++++ .../test/resources/ddl/mysql/docker_init.sql | 17 --- 10 files changed, 582 insertions(+), 180 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java delete mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java index c3a4c2ba5d4..413b7494fef 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java @@ -17,20 +17,18 @@ package org.apache.flink.cdc.connectors.oceanbase; -import org.apache.flink.runtime.minicluster.RpcServiceSharing; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.table.utils.LegacyRowResource; -import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.flink.util.TestLogger; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.ClassRule; -import org.junit.Rule; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; @@ -43,51 +41,13 @@ import static org.junit.Assert.assertTrue; /** Basic class for testing OceanBase source. */ -public abstract class OceanBaseTestBase extends TestLogger { +public abstract class OceanBaseTestBase extends AbstractTestBase { private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); - protected static final int DEFAULT_PARALLELISM = 4; - - @Rule - public final MiniClusterWithClientResource miniClusterResource = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) - .setRpcServiceSharing(RpcServiceSharing.DEDICATED) - .withHaLeadershipControl() - .build()); - @ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE; - protected final String compatibleMode; - protected final String username; - protected final String password; - protected final String hostname; - protected final int port; - protected final String logProxyHost; - protected final int logProxyPort; - protected final String tenant; - - public OceanBaseTestBase( - String compatibleMode, - String username, - String password, - String hostname, - int port, - String logProxyHost, - int logProxyPort, - String tenant) { - this.compatibleMode = compatibleMode; - this.username = username; - this.password = password; - this.hostname = hostname; - this.port = port; - this.logProxyHost = logProxyHost; - this.logProxyPort = logProxyPort; - this.tenant = tenant; - } + protected abstract OceanBaseCdcMetadata metadata(); protected String commonOptionsString() { return String.format( @@ -96,8 +56,14 @@ protected String commonOptionsString() { + " 'password' = '%s', " + " 'hostname' = '%s', " + " 'port' = '%s', " - + " 'compatible-mode' = '%s'", - username, password, hostname, port, compatibleMode); + + " 'compatible-mode' = '%s', " + + " 'jdbc.driver' = '%s'", + metadata().getUsername(), + metadata().getPassword(), + metadata().getHostname(), + metadata().getPort(), + metadata().getCompatibleMode(), + metadata().getDriverClass()); } protected String logProxyOptionsString() { @@ -106,7 +72,9 @@ protected String logProxyOptionsString() { + " 'tenant-name' = '%s'," + " 'logproxy.host' = '%s'," + " 'logproxy.port' = '%s'", - tenant, logProxyHost, logProxyPort); + metadata().getTenantName(), + metadata().getLogProxyHost(), + metadata().getLogProxyPort()); } protected String initialOptionsString() { @@ -120,7 +88,10 @@ protected String snapshotOptionsString() { return " 'scan.startup.mode' = 'snapshot', " + commonOptionsString(); } - protected abstract Connection getJdbcConnection() throws SQLException; + protected Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + metadata().getJdbcUrl(), metadata().getUsername(), metadata().getPassword()); + } protected void setGlobalTimeZone(String serverTimeZone) throws SQLException { try (Connection connection = getJdbcConnection(); @@ -130,7 +101,8 @@ protected void setGlobalTimeZone(String serverTimeZone) throws SQLException { } protected void initializeTable(String sqlFile) { - final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode, sqlFile); + final String ddlFile = + String.format("ddl/%s/%s.sql", metadata().getCompatibleMode(), sqlFile); final URL ddlTestFile = getClass().getClassLoader().getResource(ddlFile); assertNotNull("Cannot locate " + ddlFile, ddlTestFile); try (Connection connection = getJdbcConnection(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java new file mode 100644 index 00000000000..faffba0124e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase; + +import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; + +/** Utils to help test. */ +@SuppressWarnings("resource") +public class OceanBaseTestUtils { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestUtils.class); + + public static final Network NETWORK = Network.newNetwork(); + + private static final String SUPPORTED_OB_VERSION = "4.2.1.6-106000012024042515"; + + private static final String SYS_PASSWORD = "123456"; + private static final String TEST_PASSWORD = "654321"; + + public static OceanBaseContainer createOceanBaseContainer() { + return createOceanBaseContainer(SUPPORTED_OB_VERSION, "mini"); + } + + public static OceanBaseContainer createOceanBaseContainer(String version, String mode) { + return new OceanBaseContainer(version) + .withNetwork(NETWORK) + .withMode(mode) + .withSysPassword(SYS_PASSWORD) + .withTenantPassword(TEST_PASSWORD) + .withStartupTimeout(Duration.ofMinutes(4)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + public static LogProxyContainer createLogProxyContainer() { + return new LogProxyContainer() + .withNetwork(NETWORK) + .withSysPassword(SYS_PASSWORD) + .withStartupTimeout(Duration.ofMinutes(1)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + public static String queryRootServiceList(Connection connection) { + try (Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 'rootservice_list'"); + return rs.next() ? rs.getString("VALUE") : null; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static String queryConfigUrl(Connection connection) { + try (Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 'obconfig_url'"); + return rs.next() ? rs.getString("VALUE") : null; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java index 4388b60af5e..eb5bb1fdada 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java @@ -18,6 +18,11 @@ package org.apache.flink.cdc.connectors.oceanbase.table; import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase; +import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils; +import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; @@ -27,31 +32,22 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; -import java.time.Duration; import java.time.ZoneId; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.stream.Stream; /** Integration tests for OceanBase MySQL mode table source. */ -@RunWith(Parameterized.class) public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLModeITCase.class); @@ -62,38 +58,34 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().inStreamingMode().build()); - private static final String NETWORK_MODE = "host"; - private static final String OB_SYS_PASSWORD = "123456"; - - @ClassRule - public static final GenericContainer OB_SERVER = - new GenericContainer<>("oceanbase/oceanbase-ce:4.2.0.0") - .withNetworkMode(NETWORK_MODE) - .withEnv("MODE", "slim") - .withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD) - .withEnv("OB_DATAFILE_SIZE", "1G") - .withEnv("OB_LOG_DISK_SIZE", "4G") - .withCopyFileToContainer( - MountableFile.forClasspathResource("ddl/mysql/docker_init.sql"), - "/root/boot/init.d/init.sql") - .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) - .withStartupTimeout(Duration.ofMinutes(4)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - @ClassRule - public static final GenericContainer LOG_PROXY = - new GenericContainer<>("whhe/oblogproxy:1.1.3_4x") - .withNetworkMode(NETWORK_MODE) - .withEnv("OB_SYS_PASSWORD", OB_SYS_PASSWORD) - .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) - .withStartupTimeout(Duration.ofMinutes(1)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + private static final OceanBaseContainer OB_SERVER = + OceanBaseTestUtils.createOceanBaseContainer(); + + private static final LogProxyContainer LOG_PROXY = OceanBaseTestUtils.createLogProxyContainer(); + + private static final OceanBaseCdcMetadata METADATA = + new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY); + + private static String rsList; @BeforeClass public static void startContainers() { LOG.info("Starting containers..."); Startables.deepStart(Stream.of(OB_SERVER, LOG_PROXY)).join(); LOG.info("Containers are started."); + + try (Connection connection = + DriverManager.getConnection( + METADATA.getJdbcUrl(), METADATA.getUsername(), METADATA.getPassword())) { + rsList = OceanBaseTestUtils.queryRootServiceList(connection); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (rsList == null) { + throw new RuntimeException("rootservice_list not found"); + } + LOG.info("Got 'rootservice_list': {}", rsList); } @AfterClass @@ -103,6 +95,11 @@ public static void stopContainers() { LOG.info("Containers are stopped."); } + @Override + protected OceanBaseCdcMetadata metadata() { + return METADATA; + } + @Before public void before() { TestValuesTableFactory.clearAllData(); @@ -110,36 +107,6 @@ public void before() { env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); } - private final String rsList; - - public OceanBaseMySQLModeITCase( - String username, - String password, - String hostname, - int port, - String logProxyHost, - int logProxyPort, - String tenant, - String rsList) { - super("mysql", username, password, hostname, port, logProxyHost, logProxyPort, tenant); - this.rsList = rsList; - } - - @Parameterized.Parameters - public static List parameters() { - return Collections.singletonList( - new Object[] { - "root@test", - "123456", - "127.0.0.1", - 2881, - "127.0.0.1", - 2983, - "test", - "127.0.0.1:2882:2881" - }); - } - @Override protected String logProxyOptionsString() { return super.logProxyOptionsString() @@ -147,12 +114,6 @@ protected String logProxyOptionsString() { + String.format(" 'rootserver-list' = '%s'", rsList); } - @Override - protected Connection getJdbcConnection() throws SQLException { - return DriverManager.getConnection( - "jdbc:mysql://" + hostname + ":" + port + "/?useSSL=false", username, password); - } - @Test public void testTableList() throws Exception { initializeTable("inventory"); @@ -312,6 +273,8 @@ public void testMetadataColumns() throws Exception { waitForSinkSize("sink", snapshotSize + 1); + String tenant = metadata().getTenantName(); + List expected = Arrays.asList( "+I(" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java index b11da43aec9..cdaf8ee5ccf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java @@ -18,73 +18,63 @@ package org.apache.flink.cdc.connectors.oceanbase.table; import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase; +import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata; +import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseOracleCdcMetadata; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; -import java.util.Collections; import java.util.List; /** Integration tests for OceanBase Oracle mode table source. */ @Ignore("Test ignored before oceanbase-xe docker image is available") -@RunWith(Parameterized.class) public class OceanBaseOracleModeITCase extends OceanBaseTestBase { + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseOracleModeITCase.class); + private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private final StreamTableEnvironment tEnv = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().inStreamingMode().build()); - private final String schema; - private final String configUrl; + private static final OceanBaseCdcMetadata METADATA = new OceanBaseOracleCdcMetadata(); - public OceanBaseOracleModeITCase( - String username, - String password, - String hostname, - int port, - String logProxyHost, - int logProxyPort, - String tenant, - String schema, - String configUrl) { - super("oracle", username, password, hostname, port, logProxyHost, logProxyPort, tenant); - this.schema = schema; - this.configUrl = configUrl; - } + private static String configUrl; - @Parameterized.Parameters - public static List parameters() { - return Collections.singletonList( - new Object[] { - "SYS@test", - "123456", - "127.0.0.1", - 2881, - "127.0.0.1", - 2983, - "test", - "SYS", - "http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster" - }); + @BeforeClass + public static void startContainers() { + try (Connection connection = + DriverManager.getConnection( + METADATA.getJdbcUrl(), METADATA.getUsername(), METADATA.getPassword())) { + configUrl = OceanBaseTestUtils.queryConfigUrl(connection); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (configUrl == null) { + throw new RuntimeException("obconfig_url not found"); + } + LOG.info("Got 'obconfig_url': {}", configUrl); } @Override - protected String commonOptionsString() { - return super.commonOptionsString() + " , " + " 'jdbc.driver' = 'com.oceanbase.jdbc.Driver'"; + protected OceanBaseCdcMetadata metadata() { + return METADATA; } @Override @@ -94,15 +84,11 @@ protected String logProxyOptionsString() { + String.format(" 'config-url' = '%s'", configUrl); } - @Override - protected Connection getJdbcConnection() throws SQLException { - return DriverManager.getConnection( - "jdbc:oceanbase://" + hostname + ":" + port + "/" + schema, username, password); - } - @Test public void testAllDataTypes() throws Exception { initializeTable("column_type_test"); + + String schema = metadata().getDatabase(); String sourceDDL = String.format( "CREATE TABLE full_types (" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java new file mode 100644 index 00000000000..e9bc0519c2c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +import org.jetbrains.annotations.NotNull; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.util.Collections; +import java.util.Set; + +/** OceanBase Log Proxy container. */ +public class LogProxyContainer extends GenericContainer { + + private static final String IMAGE = "oceanbase/oblogproxy-ce"; + + private static final int PORT = 2983; + private static final String ROOT_USER = "root"; + + private String sysPassword; + + public LogProxyContainer() { + this("latest"); + } + + public LogProxyContainer(String version) { + super(DockerImageName.parse(IMAGE + ":" + version)); + addExposedPorts(PORT); + setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1)); + } + + @Override + protected void configure() { + addEnv("OB_SYS_USERNAME", ROOT_USER); + addEnv("OB_SYS_PASSWORD", sysPassword); + } + + public @NotNull Set getLivenessCheckPortNumbers() { + return Collections.singleton(this.getMappedPort(PORT)); + } + + public int getPort() { + return getMappedPort(PORT); + } + + public LogProxyContainer withSysPassword(String sysPassword) { + this.sysPassword = sysPassword; + return this; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java new file mode 100644 index 00000000000..7563b283361 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +import java.io.Serializable; + +/** OceanBase CDC metadata. */ +public interface OceanBaseCdcMetadata extends Serializable { + + String getCompatibleMode(); + + String getHostname(); + + int getPort(); + + String getUsername(); + + String getPassword(); + + String getDriverClass(); + + String getDatabase(); + + String getJdbcUrl(); + + String getTenantName(); + + String getLogProxyHost(); + + int getLogProxyPort(); +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java new file mode 100644 index 00000000000..225aba9eee9 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +import org.jetbrains.annotations.NotNull; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.util.Collections; +import java.util.Set; + +/** OceanBase container. */ +public class OceanBaseContainer extends JdbcDatabaseContainer { + + private static final String IMAGE = "oceanbase/oceanbase-ce"; + + private static final int SQL_PORT = 2881; + private static final int RPC_PORT = 2882; + private static final String ROOT_USER = "root"; + private static final String TEST_DATABASE = "test"; + private static final String DEFAULT_TENANT = "test"; + private static final String DEFAULT_PASSWORD = ""; + + private String mode = "mini"; + private String tenantName = DEFAULT_TENANT; + private String sysPassword = DEFAULT_PASSWORD; + private String tenantPassword = DEFAULT_PASSWORD; + + public OceanBaseContainer(String version) { + super(DockerImageName.parse(IMAGE + ":" + version)); + addExposedPorts(SQL_PORT, RPC_PORT); + setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1)); + } + + @Override + protected void configure() { + addEnv("MODE", mode); + addEnv("OB_CLUSTER_NAME", "flink-cdc-ci"); + if (!DEFAULT_PASSWORD.equals(sysPassword)) { + addEnv("OB_SYS_PASSWORD", sysPassword); + } + if (!DEFAULT_TENANT.equals(tenantName)) { + addEnv("OB_TENANT_NAME", tenantName); + } + if (!DEFAULT_PASSWORD.equals(tenantPassword)) { + addEnv("OB_TENANT_PASSWORD", tenantPassword); + } + } + + protected void waitUntilContainerStarted() { + this.getWaitStrategy().waitUntilReady(this); + } + + public @NotNull Set getLivenessCheckPortNumbers() { + return Collections.singleton(this.getMappedPort(SQL_PORT)); + } + + @Override + public String getDriverClassName() { + return "com.mysql.cj.jdbc.driver"; + } + + @Override + public String getJdbcUrl() { + return "jdbc:mysql://" + getHost() + ":" + getDatabasePort() + "/?useSSL=false"; + } + + public int getDatabasePort() { + return getMappedPort(SQL_PORT); + } + + public String getTenantName() { + return tenantName; + } + + @Override + public String getDatabaseName() { + return TEST_DATABASE; + } + + @Override + public String getUsername() { + return ROOT_USER + "@" + tenantName; + } + + @Override + public String getPassword() { + return tenantPassword; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } + + public OceanBaseContainer withSetupSQL(String sqlPath) { + return withCopyFileToContainer( + MountableFile.forClasspathResource(sqlPath), "/root/boot/init.d/init.sql"); + } + + public OceanBaseContainer withMode(String mode) { + this.mode = mode; + return this; + } + + public OceanBaseContainer withTenantName(String tenantName) { + this.tenantName = tenantName; + return this; + } + + public OceanBaseContainer withSysPassword(String sysPassword) { + this.sysPassword = sysPassword; + return this; + } + + public OceanBaseContainer withTenantPassword(String tenantPassword) { + this.tenantPassword = tenantPassword; + return this; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java new file mode 100644 index 00000000000..8ba0450f53e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +/** OceanBase CDC MySQL mode metadata. */ +public class OceanBaseMySQLCdcMetadata implements OceanBaseCdcMetadata { + + private final OceanBaseContainer obServerContainer; + private final LogProxyContainer logProxyContainer; + + public OceanBaseMySQLCdcMetadata( + OceanBaseContainer obServerContainer, LogProxyContainer logProxyContainer) { + this.obServerContainer = obServerContainer; + this.logProxyContainer = logProxyContainer; + } + + @Override + public String getCompatibleMode() { + return "mysql"; + } + + @Override + public String getHostname() { + return obServerContainer.getHost(); + } + + @Override + public int getPort() { + return obServerContainer.getDatabasePort(); + } + + @Override + public String getUsername() { + return obServerContainer.getUsername(); + } + + @Override + public String getPassword() { + return obServerContainer.getPassword(); + } + + @Override + public String getDriverClass() { + return "com.mysql.cj.jdbc.driver"; + } + + @Override + public String getDatabase() { + return obServerContainer.getDatabaseName(); + } + + @Override + public String getJdbcUrl() { + return "jdbc:mysql://" + getHostname() + ":" + getPort() + "/?useSSL=false"; + } + + @Override + public String getTenantName() { + return obServerContainer.getTenantName(); + } + + @Override + public String getLogProxyHost() { + return logProxyContainer.getHost(); + } + + @Override + public int getLogProxyPort() { + return logProxyContainer.getPort(); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java new file mode 100644 index 00000000000..3a0fe9754b0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase.testutils; + +/** OceanBase CDC Oracle mode metadata. */ +public class OceanBaseOracleCdcMetadata implements OceanBaseCdcMetadata { + + @Override + public String getCompatibleMode() { + return "oracle"; + } + + @Override + public String getHostname() { + return System.getenv("host"); + } + + @Override + public int getPort() { + return Integer.parseInt(System.getenv("port")); + } + + @Override + public String getUsername() { + return System.getenv("username"); + } + + @Override + public String getPassword() { + return System.getenv("password"); + } + + @Override + public String getDatabase() { + return System.getenv("schema"); + } + + @Override + public String getDriverClass() { + return "com.oceanbase.jdbc.Driver"; + } + + @Override + public String getJdbcUrl() { + return "jdbc:oceanbase://" + getHostname() + ":" + getPort() + "/" + getDatabase(); + } + + @Override + public String getTenantName() { + return System.getenv("tenant"); + } + + @Override + public String getLogProxyHost() { + return System.getenv("log_proxy_host"); + } + + @Override + public int getLogProxyPort() { + return Integer.parseInt(System.getenv("log_proxy_port")); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql deleted file mode 100644 index 0db9c71db4a..00000000000 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql +++ /dev/null @@ -1,17 +0,0 @@ --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - --- Set the root user password of test tenant -ALTER USER root IDENTIFIED BY '123456';