Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: online to offline sync tool #3256

Merged
merged 11 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ src/version.h
src/config.h
java/openmldb-native/src/main/resources
java/openmldb-native/src/main/java
java/openmldb-common/src/main/java/com/_4paradigm/openmldb/proto/
java/openmldb-import/src/main/java/com/_4paradigm/openmldb/proto/
java/openmldb-nearline-tablet/src/main/java/com/_4paradigm/openmldb/proto/
java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/proto/
java/openmldb-*/src/main/java/com/_4paradigm/openmldb/proto/
java/hybridse-native/src/main/java
java/hybridse-proto/src
**/.mvn/wrapper/maven-wrapper.jar
Expand Down
1 change: 1 addition & 0 deletions docs/zh/deploy/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
compile
integrate_hadoop
offline_integrate_kubernetes
online_offline_sync
85 changes: 85 additions & 0 deletions docs/zh/deploy/online_offline_sync.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# 在离线数据同步

在离线数据同步,指将在线数据同步到离线地址,离线地址指大容量持久化存储地址,用户可以自行指定,不一定是OpenMLDB表中的离线数据地址。
vagetablechicken marked this conversation as resolved.
Show resolved Hide resolved
vagetablechicken marked this conversation as resolved.
Show resolved Hide resolved

开启在离线同步功能,需要部署两种组件,DataCollector和SyncTool。一期仅支持单个SyncTool,DataCollector需要在**每台部署TabletServer的机器**上**至少**部署一台。举例说明,一台机器上可以存在多个TabletServer,同步任务将使用该机器上的一个DataCollector,如果你添加了更多的DataCollector,它们不会工作,直到运行的DataCollector下线,将由下一个DataCollector代替以继续工作。

虽然SyncTool仅支持单体运行,但它再启动时可恢复工作进度,无需额外操作。

## 部署方式

由于SyncTool有状态,如果先启动它,可能会在无DataCollector的情况下尝试分配同步任务。所以,请保证先启动所有DataCollector,再启动SyncTool。

部署包请从github release或镜像网站中下载,版本>0.7.3,并解压。

### DataCollector

#### 配置(重点)

更新`<package>/conf/data_collector.flags`配置文件。

vagetablechicken marked this conversation as resolved.
Show resolved Hide resolved
配置中请填写正确的zk地址和路径,以及配置无端口冲突`endpoint`(endpoint与TabletServer保持一致,如果TabletServer使用本机的公网IP,DataCollector endpoint使用127.0.0.1地址,无法自动转换)。

需要注意的是,请慎重选择`collector_datadir`。我们将在同步中对TabletServer的磁盘数据进行硬链接,所以`collector_datadir`需要与TabletServer的数据地址`hdd_root_path`/`ssd_root_path`在同一磁盘上,否则报错`Invalid cross-device link`。

`max_pack_size`默认1M,如果同步任务过多,容易出现`[E1011]The server is overcrowded`,请再适当调小此配置。也可适当调整`socket_max_unwritten_bytes`,增大写缓存容忍度。

#### 启动

```
./bin/start.sh start data_collector
```
#### 状态确认

启动后使用以下命令,可以获得实时的DataCollector RPC状态页面。如果失败,查询日志。
```
curl http://<data_collector>/status
```
TODO: synctool_helper可以得到DataCollector列表
vagetablechicken marked this conversation as resolved.
Show resolved Hide resolved
### SyncTool

#### 配置
- 请更新`<package>/conf/synctool.properties`配置,在start时它将覆盖`<package>/synctool/conf/synctool.properties`。
- 当前只支持直写到HDFS,可通过properties文件配置`hadoop.conf.dir`或环境变量`HADOOP_CONF_DIR`来配置HDFS连接,请保证SyncTool的OS启动用户拥有HDFS路径(路径由每个同步任务创建时指定)的写权限。

#### 启动
```
./bin/start.sh start synctool
```

SyncTool目前只支持单进程运行,如果启动多个,它们互相独立,需要用户自行判断是否有重复的同步任务。SyncTool实时保存进度,如果下线,可原地启动恢复任务进度。

SyncTool负责同步任务的管理和数据收集、写入离线地址。首先,说明一下任务关系,SyncTool收到用户的“表同步任务”,将会被分割为多个“分片同步任务”(后续简称为子任务)进行创建和管理。

任务管理中,如果DataCollector掉线或出错,将会让DataCollector重新创建任务。如果重新赋予任务时,找不到合适的DataCollector,将会标记任务失败。如果不这样,SyncTool将会一直尝试赋予新任务,同步任务进度停滞,错误也不明显,所以为了及时发现问题,这种情况将标记子任务为失败。
TODO: 由于创建表同步任务时具体定义每个子任务的起始进度很难,所以可以考虑重启SyncTool,SyncTool recover时不会检查子任务是否曾经failed,会当作init状态开始任务。
vagetablechicken marked this conversation as resolved.
Show resolved Hide resolved

#### SyncTool Helper

创建、删除与查询同步任务,使用`<package>/tools/synctool_helper.py`。

```bash
# create
python tools/synctool_helper.py create -t db.table -m 1 -ts 233 -d /tmp/hdfs-dest [ -s <sync tool endpoint> ]
# delete
python tools/synctool_helper.py delete -t db.table [ -s <sync tool endpoint> ]
# task status
python tools/synctool_helper.py status [ -s <sync tool endpoint> ]
# sync tool status for dev
python tools/synctool_helper.py tool-status [ -f <properties path> ]
```

Mode: ...
vagetablechicken marked this conversation as resolved.
Show resolved Hide resolved

status结果说明:

执行命令status将会打印每个partition task的状态,如果你只关注整体情况,可以只查看`table scope`之后的内容,它展示了表级别的同步任务状态,如果有某表存在`FAILED`子任务,也会提示。

对于每个子任务而言,注意其'status'字段,如果它是刚启动,还未收到DataCollector的第一次数据同步,将会是INIT状态。收到第一次数据同步后,将变为RUNNING状态。(我们尤其关注DataCollector和SyncTool的初始状态,所以,特别设置INIT状态。)如果同步任务是随着SyncTool重启而恢复,将直接进入RUNNING状态。任务在过程中可能出现REASSIGNING状态,这是中间状态,不代表任务已经不可用。在Mode 0,可能出现SUCCESS状态,表示任务已完成。当一张表的所有子任务都完成时,SyncTool将自动清理掉该表的任务,使用helper将查询不到该表的任务。

只有FAILED表示该子任务失败,不会重试,也不会删除该任务。确认失败原因且修复后,可以删除再重建同步任务。如果不想要丢失已导入的进度,可以重启SyncTool,让SyncTool恢复任务继续同步(但可能出现更多的重复数据)。

## 功能边界

DataCollector中对表的进度(snapshot进度)标记方式没有唯一性,如果子task中途shutdown,用当前进度创建任务,可能有一段重复数据。
SyncTool HDFS先写入再持久化进度,如果此时SyncTool shutdown,由于进度没更新,将会重复写入一段数据。由于此功能必然有重复隐患,此处暂时不做额外工作。
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public ResultSet getResultSet() throws SQLException {
return new SQLResultSet(resultSet);
}

// TODO(hw): why return sqlresultset?
@Override
public SQLResultSet executeQuery(String sql) throws SQLException {
checkClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ PreparedStatement getBatchRequestPreparedStmt(String db, String sql,

ProcedureInfo showProcedure(String dbName, String proName) throws SQLException;

NS.TableInfo getTableInfo(String db, String table) throws SQLException;
NS.TableInfo getTableInfo(String db, String table);

List<String> getTableNames(String db);

Expand Down
218 changes: 218 additions & 0 deletions java/openmldb-synctool/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>openmldb-parent</artifactId>
<groupId>com.4paradigm.openmldb</groupId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<artifactId>openmldb-synctool</artifactId>
<name>openmldb-synctool</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<hdfs.version>3.2.4</hdfs.version>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.4paradigm.openmldb</groupId>
<artifactId>openmldb-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.4paradigm.openmldb</groupId>
<artifactId>openmldb-jdbc</artifactId>
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.16.3</version>
</dependency>

<dependency>
<groupId>com.baidu</groupId>
<artifactId>brpc-java</artifactId>
<version>3.0.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.2.0</version>
</dependency>

<!--日志开始-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--日志结束-->

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>

<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<version>1.0</version>
</dependency>

<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!--Dependency for FileSink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hdfs.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-minicluster -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hdfs.version}</version>
<scope>test</scope>
</dependency>
<!-- csv -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.9.0</version>
</dependency>

</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.conf</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
<version>1.3</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assemble</goal>
</goals>
</execution>
</executions>
<configuration>
<configurationDirectory>conf</configurationDirectory>
<configurationSourceDirectory>src/main/resources</configurationSourceDirectory>
<copyConfigurationDirectory>true</copyConfigurationDirectory>
<includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
<repositoryLayout>flat</repositoryLayout>
<repositoryName>lib</repositoryName>
<assembleDirectory>${project.build.directory}/${project.name}-binary</assembleDirectory>
<binFileExtensions>
<unix>.sh</unix>
</binFileExtensions>
<platforms>
<platform>unix</platform>
</platforms>
<programs>
<program>
<mainClass>com._4paradigm.openmldb.synctool.SyncTool</mainClass>
<name>synctool</name>
</program>
</programs>
</configuration>
</plugin>
</plugins>
</build>
<!-- for openmldb-jdbc -->
<repositories>
<repository>
<id>openmldb</id>
<name>openmldb sdk repo</name>
<url>https://repo.maven.apache.org/maven2</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com._4paradigm.openmldb.synctool;

import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.core.fs.Path;

class CsvFilePathFilter extends FilePathFilter {
@Override
public boolean filterPath(Path filePath) {
return filePath == null
|| filePath.getName().startsWith(".")
|| filePath.getName().startsWith("_")
|| filePath.getName().contains(HADOOP_COPYING)
|| !filePath.getName().endsWith(".csv");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com._4paradigm.openmldb.synctool;

import com._4paradigm.openmldb.proto.DataSync;
import com.baidu.brpc.protocol.BrpcMeta;

public interface DataCollectorService {
// c++ serviceName doesn't contain the package name.
@BrpcMeta(serviceName = "DataCollector", methodName = "AddSyncTask")
DataSync.GeneralResponse addSyncTask(DataSync.AddSyncTaskRequest request);
}
Loading