Skip to content

Commit

Permalink
[FLINK-35638][OceanBase][test] Refactor OceanBase test cases and remo…
Browse files Browse the repository at this point in the history
…ve dependency on host network (#3439)
  • Loading branch information
whhe authored Aug 18, 2024
1 parent c5396fb commit cbb33bb
Show file tree
Hide file tree
Showing 20 changed files with 841 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,23 @@ under the License.
version: '2.1'
services:
observer:
image: oceanbase/oceanbase-ce:4.0.0.0
image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515'
container_name: observer
network_mode: "host"
environment:
- 'MODE=mini'
- 'OB_SYS_PASSWORD=123456'
- 'OB_TENANT_PASSWORD=654321'
ports:
- '2881:2881'
- '2882:2882'
oblogproxy:
image: whhe/oblogproxy:1.1.0_4x
image: 'oceanbase/oblogproxy-ce:latest'
container_name: oblogproxy
environment:
- 'OB_SYS_USERNAME=root'
- 'OB_SYS_PASSWORD=pswd'
network_mode: "host"
- 'OB_SYS_PASSWORD=123456'
ports:
- '2983:2983'
elasticsearch:
image: 'elastic/elasticsearch:7.6.0'
container_name: elasticsearch
Expand Down Expand Up @@ -85,42 +92,26 @@ services:
docker-compose up -d
```

### 设置密码

OceanBase 中 root 用户默认是没有密码的,但是 oblogproxy 需要配置一个使用非空密码的系统租户用户,因此这里我们需要先为 root@sys 用户设置一个密码。
### 查询 Root Service List

登陆 sys 租户的 root 用户:

```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys
```

设置密码,注意这里的密码需要与上一步中 oblogproxy 服务的环境变量 'OB_SYS_PASSWORD' 保持一样。

```mysql
ALTER USER root IDENTIFIED BY 'pswd';
```

OceanBase 从社区版 4.0.0.0 开始只支持对非 sys 租户的增量数据拉取,这里我们使用 test 租户的 root 用户作为示例。

登陆 test 租户的 root 用户:

```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456
```

设置密码:
执行以下 sql 以查询 root service list,将 VALUE 列的值保存下来。

```mysql
ALTER USER root IDENTIFIED BY 'test';
SHOW PARAMETERS LIKE 'rootservice_list';
```

### 准备数据

使用 'root@test' 用户登陆。
使用测试用的 test 租户的 root 用户登陆。

```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321
```

```sql
Expand Down Expand Up @@ -169,6 +160,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),

### 在 Flink SQL CLI 中使用 Flink DDL 创建表

注意在 OceanBase 源表的 SQL 中替换 root_service_list 为真实值。

```sql
-- 设置间隔时间为3秒
Flink SQL> SET execution.checkpointing.interval = 3s;
Expand All @@ -189,13 +182,13 @@ Flink SQL> CREATE TABLE orders (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
'password' = 'test',
'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^orders$',
'hostname' = 'localhost',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
Expand All @@ -211,13 +204,13 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
'password' = 'test',
'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^products$',
'hostname' = 'localhost',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,27 @@ under the License.

Create `docker-compose.yml`.

*Note*: `host` network mode is required in this demo, so it can only work on Linux, see [network-tutorial-host](https://docs.docker.com/network/network-tutorial-host/).

```yaml
version: '2.1'
services:
observer:
image: oceanbase/oceanbase-ce:4.2.0.0
image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515'
container_name: observer
environment:
- 'MODE=slim'
- 'OB_ROOT_PASSWORD=pswd'
network_mode: "host"
- 'MODE=mini'
- 'OB_SYS_PASSWORD=123456'
- 'OB_TENANT_PASSWORD=654321'
ports:
- '2881:2881'
- '2882:2882'
oblogproxy:
image: whhe/oblogproxy:1.1.3_4x
image: 'oceanbase/oblogproxy-ce:latest'
container_name: oblogproxy
environment:
- 'OB_SYS_USERNAME=root'
- 'OB_SYS_PASSWORD=pswd'
network_mode: "host"
- 'OB_SYS_PASSWORD=123456'
ports:
- '2983:2983'
elasticsearch:
image: 'elastic/elasticsearch:7.6.0'
container_name: elasticsearch
Expand Down Expand Up @@ -89,30 +91,26 @@ Execute the following command in the directory where `docker-compose.yml` is loc
docker-compose up -d
```

### Set password
### Query Root Service List

From OceanBase 4.0.0.0 CE, we can only fetch the commit log of non-sys tenant.

Here we use the 'test' tenant for example.

Login with 'root' user of 'test' tenant:
Login with 'root' user of 'sys' tenant:

```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456
```

Set a password:
Query the root service list by following SQL and store the value.

```mysql
ALTER USER root IDENTIFIED BY 'test';
SHOW PARAMETERS LIKE 'rootservice_list';
```

### Create data for reading snapshot

Login 'root' user of 'test' tenant.

```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321
```

Insert data:
Expand Down Expand Up @@ -163,6 +161,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),

### Use Flink DDL to create dynamic table in Flink SQL CLI

Note that in the SQL of the OceanBase source table, replace root_service_list with the actual value.

```sql
-- checkpoint every 3000 milliseconds
Flink SQL> SET execution.checkpointing.interval = 3s;
Expand All @@ -183,13 +183,13 @@ Flink SQL> CREATE TABLE orders (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
'password' = 'test',
'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^orders$',
'hostname' = 'localhost',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
Expand All @@ -205,13 +205,13 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
'password' = 'test',
'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^products$',
'hostname' = 'localhost',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,21 @@ limitations under the License.

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

package org.apache.flink.cdc.connectors.oceanbase;

import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.test.util.AbstractTestBase;

import org.junit.ClassRule;
import org.junit.Rule;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
Expand All @@ -43,51 +41,13 @@
import static org.junit.Assert.assertTrue;

/** Basic class for testing OceanBase source. */
public abstract class OceanBaseTestBase extends TestLogger {
public abstract class OceanBaseTestBase extends AbstractTestBase {

private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

protected static final int DEFAULT_PARALLELISM = 4;

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());

@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

protected final String compatibleMode;
protected final String username;
protected final String password;
protected final String hostname;
protected final int port;
protected final String logProxyHost;
protected final int logProxyPort;
protected final String tenant;

public OceanBaseTestBase(
String compatibleMode,
String username,
String password,
String hostname,
int port,
String logProxyHost,
int logProxyPort,
String tenant) {
this.compatibleMode = compatibleMode;
this.username = username;
this.password = password;
this.hostname = hostname;
this.port = port;
this.logProxyHost = logProxyHost;
this.logProxyPort = logProxyPort;
this.tenant = tenant;
}
protected abstract OceanBaseCdcMetadata metadata();

protected String commonOptionsString() {
return String.format(
Expand All @@ -96,8 +56,14 @@ protected String commonOptionsString() {
+ " 'password' = '%s', "
+ " 'hostname' = '%s', "
+ " 'port' = '%s', "
+ " 'compatible-mode' = '%s'",
username, password, hostname, port, compatibleMode);
+ " 'compatible-mode' = '%s', "
+ " 'jdbc.driver' = '%s'",
metadata().getUsername(),
metadata().getPassword(),
metadata().getHostname(),
metadata().getPort(),
metadata().getCompatibleMode(),
metadata().getDriverClass());
}

protected String logProxyOptionsString() {
Expand All @@ -106,7 +72,9 @@ protected String logProxyOptionsString() {
+ " 'tenant-name' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s'",
tenant, logProxyHost, logProxyPort);
metadata().getTenantName(),
metadata().getLogProxyHost(),
metadata().getLogProxyPort());
}

protected String initialOptionsString() {
Expand All @@ -120,7 +88,10 @@ protected String snapshotOptionsString() {
return " 'scan.startup.mode' = 'snapshot', " + commonOptionsString();
}

protected abstract Connection getJdbcConnection() throws SQLException;
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
metadata().getJdbcUrl(), metadata().getUsername(), metadata().getPassword());
}

protected void setGlobalTimeZone(String serverTimeZone) throws SQLException {
try (Connection connection = getJdbcConnection();
Expand All @@ -130,7 +101,8 @@ protected void setGlobalTimeZone(String serverTimeZone) throws SQLException {
}

protected void initializeTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode, sqlFile);
final String ddlFile =
String.format("ddl/%s/%s.sql", metadata().getCompatibleMode(), sqlFile);
final URL ddlTestFile = getClass().getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection();
Expand Down
Loading

0 comments on commit cbb33bb

Please sign in to comment.