Skip to content

Commit

Permalink
feat: optimize tidb data integration (#3839)
Browse files Browse the repository at this point in the history
* feat: add read-write tidb sdk and update related usage docs

* update tidb sdk usage docs

* feat: add data-schema check switch and rtidb data-schema mapping

* fix: optimize rtidb data-schema mapping

* feat: update tidb integration documents

* feat: optimize tidb document description

* docs: optimize tidb document description

* docs: update installation description about tispark

* feat: disable schema checking for online imports

* fix: initialize is_check_schema and store in options

* Update tidb.md typo

* feat: optimize code structure and user documentation

* feat: optimize schema mapping and check

* fixbug: autoSchemaMappingAndCheck

* fixbug: limit the usage scope of skip_cvt parameter

* fixbug: skipCvt init error

* docs: update tidb sdk usage docs

* docs: clarify tispark’s restrictions on the use of option parameters

---------

Co-authored-by: Yuan Haitao <[email protected]>
  • Loading branch information
yht520100 and Yuan Haitao authored Apr 15, 2024
1 parent d447e46 commit bfe5c1c
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 113 deletions.
35 changes: 20 additions & 15 deletions docs/en/integration/offline_data_sources/tidb.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

### Installation

[OpenMLDB Spark Distribution](../../tutorial/openmldbspark_distribution.md) v0.8.5 and later versions utilize the TiSpark tool to interact with TiDB. The current release includes TiSpark 3.1.x dependencies (`tispark-assembly-3.2_2.12-3.1.5.jar`, `mysql-connector-java-8.0.29.jar`). If your TiSpark version doesn't match your TiDB version, refer to the [TiSpark documentation](https://docs.pingcap.com/tidb/stable/tispark-overview) for compatible dependencies to add to Spark's classpath/jars.
The current version utilizes TiSpark for interacting with the TiDB database. To get started, download the necessary dependencies for TiSpark 3.1.x (`tispark-assembly-3.2_2.12-3.1.5.jar` and `mysql-connector-java-8.0.29.jar`). If the TiSpark version is not compatible with your current TiDB version, refer to the [TiSpark documentation](https://docs.pingcap.com/tidb/stable/tispark-overview) for downloading the corresponding TiSpark dependencies. Then, add them to the Spark classpath/jars.


### Configuration
Expand All @@ -32,25 +32,29 @@ Once either configuration is successful, access TiDB tables using the format `ti

TiDB schema reference can be found at [TiDB Schema](https://docs.pingcap.com/tidb/stable/data-type-overview). Currently, only the following TiDB data formats are supported:

| OpenMLDB Data Format | TiDB Data Format |
|----------------------|-------------------------|
| BOOL | BOOL |
| SMALLINT | Currently not supported |
| INT | Currently not supported |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIMESTAMP | TIMESTAMP |
| STRING | VARCHAR(M) |
| OpenMLDB Data Format | TiDB Data Format |
|----------------------|------------------|
| BOOL | BOOL |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIMESTAMP | DATETIME |
| TIMESTAMP | TIMESTAMP |
| STRING | VARCHAR(M) |

Tip: Asymmetric integer conversion will be affected by the value range. Please try to refer to the above data types for mapping.

## Importing TiDB Data into OpenMLDB

Importing data from TiDB sources is supported through the [`LOAD DATA INFILE`](../../openmldb_sql/dml/LOAD_DATA_STATEMENT.md) API, using the specific URI interface format `tidb://tidb_catalog.[db].[table]` to import data from TiDB. Note:

- Both offline and online engines can import TiDB data sources.
- TiDB import supports symbolic links, which can reduce hard copying and ensure that OpenMLDB always reads the latest data from TiDB. To enable soft link data import, use the parameter `deep_copy=false`.
- The `OPTIONS` parameter only supports `deep_copy`, `mode`, and `sql`.
- TiDB supports parameter `skip_cvt` in `@@execute_mode='online'` mode: whether to skip field type conversion, the default is `false`, if it is `true`, field type conversion and strict schema checking will be performed , if it is `false`, there will be no conversion and schema checking actions, and the performance will be better, but there may be errors such as type overflow, which requires manual inspection.
- The `OPTIONS` parameter only supports `deep_copy`, `mode`, `sql` , and `skip_cvt` .

For example:

Expand All @@ -70,11 +74,12 @@ LOAD DATA INFILE 'tidb://tidb_catalog.db1.t1' INTO TABLE tidb_catalog.db1.t1 OPT

Exporting data from OpenMLDB to TiDB sources is supported through the [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) API, using the specific URI interface format `tidb://tidb_catalog.[db].[table]` to export data to the TiDB data warehouse. Note:

- The offline engine can support exporting TiDB data sources, but the online engine does not yet support it.
- The database and table must already exist. Currently, automatic creation of non-existent databases or tables is not supported.
- Only the export mode `mode` is effective in the `OPTIONS` parameter. Other parameters are not effective, and the current parameter is mandatory.
- The `OPTIONS` parameter is only valid for `mode='append'`. Other parameters as `overwrite` and `errorifexists` are invalid. This is because the current version of TiSpark does not support them. If TiSpark supports them in future versions, you can upgrade for compatibility.

For example:

```sql
SELECT col1, col2, col3 FROM t1 INTO OUTFILE 'tidb://tidb_catalog.db1.t1' options(mode='append');
```
```
31 changes: 18 additions & 13 deletions docs/zh/integration/offline_data_sources/tidb.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

### 安装

[OpenMLDB Spark 发行版](../../tutorial/openmldbspark_distribution.md) v0.8.5 及以上版本使用了TiSpark工具来操作TiDB数据库, 当前版本已包含 TiSpark 3.1.x 依赖(tispark-assembly-3.2_2.12-3.1.5.jarh、mysql-connector-java-8.0.29.jar)。如果TiSpark版本不兼容现有的TiDB版本,你可以从[TiSpark文档](https://docs.pingcap.com/zh/tidb/stable/tispark-overview)查找下载对应的TiSpark依赖,并将其添加到Spark的classpath/jars中。
当前版本使用TiSpark来操作TiDB数据库, 需要先下载 TiSpark 3.1.x 的相关依赖(`tispark-assembly-3.2_2.12-3.1.5.jar``mysql-connector-java-8.0.29.jar`)。如果TiSpark版本不兼容现有的TiDB版本,可以在[TiSpark文档](https://docs.pingcap.com/zh/tidb/stable/tispark-overview)查找下载对应的TiSpark依赖,然后将其添加到Spark的classpath/jars中。


### 配置
Expand All @@ -32,25 +32,29 @@ spark.default.conf=spark.sql.extensions=org.apache.spark.sql.TiExtensions;spark.

TiDB schema参考[TiDB Schema](https://docs.pingcap.com/zh/tidb/stable/data-type-overview)。目前,仅支持以下TiDB数据格式:

| OpenMLDB 数据格式 | TiDB 数据格式 |
| ----------------- |---------|
| BOOL | BOOL |
| SMALLINT | 暂不支持 |
| INT | 暂不支持 |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIMESTAMP | TIMESTAMP |
| OpenMLDB 数据格式 | TiDB 数据格式 |
| ----------------- |------------|
| BOOL | BOOL |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIMESTAMP | DATETIME |
| TIMESTAMP | TIMESTAMP |
| STRING | VARCHAR(M) |

提示:不对称的整型转换会被取值范围影响,请尽量参考以上数据类型进行映射。

## 导入 TiDB 数据到 OpenMLDB

对于 TiDB 数据源的导入是通过 API [`LOAD DATA INFILE`](../../openmldb_sql/dml/LOAD_DATA_STATEMENT.md) 进行支持,通过使用特定的 URI 接口 `tidb://tidb_catalog.[db].[table]` 的格式进行导入 TiDB 内的数据。注意:

- 离线和在线引擎均可以导入 TiDB 数据源
- TiDB 导入支持软连接,可以减少硬拷贝并且保证 OpenMLDB 随时读取到 TiDB 的最新数据。启用软链接方式进行数据导入:使用参数 `deep_copy=false`
- `OPTIONS` 参数仅有 `deep_copy``mode``sql` 有效
- TiDB 在`@@execute_mode='online'`模式下支持参数`skip_cvt`:是否跳过字段类型转换,默认为`false`,如果为`true`则会进行字段类型转换以及严格的schema检查,如果为`false`则没有转换以及schema检查动作,性能更好一些,但可能存在类型溢出等错误,需要人工检查。
- `OPTIONS` 参数仅有 `deep_copy``mode``sql``skip_cvt` 有效

举例:

Expand All @@ -70,8 +74,9 @@ LOAD DATA INFILE 'tidb://tidb_catalog.db1.t1' INTO TABLE tidb_catalog.db1.t1 OPT

对于 TiDB 数据源的导出是通过 API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) 进行支持,通过使用特定的 URI 接口 `tidb://tidb_catalog.[db].[table]` 的格式进行导出到 TiDB 数仓。注意:

- 离线引擎可以支持导出 TiDB 数据源,在线引擎还不支持
- 数据库和数据表必须已经存在,目前不支持对于不存在的数据库或数据表进行自动创建
- `OPTIONS` 参数只有导出模式`mode`生效,其他参数均不生效,当前参数为必填项
- `OPTIONS` 参数仅`mode='append'`有效,其他参数`overwrite``errorifexists`均无效,这是由于TiSpark当前版本不支持,如果TiSpark后续版本支持可以进行升级兼容。

举例:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ object LoadDataPlan {
val (format, options, mode, extra) = HybridseUtil.parseOptions(inputFile, node)
// load have the option deep_copy
val deepCopy = extra.get("deep_copy").get.toBoolean

// auto schema conversion option skip_cvt
val skipCvt = (storage, format) match {
case ("online", "tidb") => extra.getOrElse("skip_cvt", "false").toBoolean
case _ => false
}
require(ctx.getOpenmldbSession != null, "LOAD DATA must use OpenmldbSession, not SparkSession")
val info = ctx.getOpenmldbSession.openmldbCatalogService.getTableInfo(db, table)

Expand All @@ -52,7 +56,7 @@ object LoadDataPlan {
// we read input file even in soft copy,
// cause we want to check if "the input file schema == openmldb table schema"
val df = DataSourceUtil.autoLoad(ctx.getOpenmldbSession, inputFile, format, options, info.getColumnDescList,
loadDataSql)
loadDataSql, skipCvt)

// write
logger.info("write data to storage {}, writer mode {}, is deep {}", storage, mode, deepCopy.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ object SelectIntoPlan {
logger.debug("select {} rows", input.getDf().count())
input.getDf().show(10)
}

// write options don't need deepCopy, may have coalesce
val (format, options, mode, extra) = HybridseUtil.parseOptions(outPath, node)
if (input.getSchema.size == 0 && input.getDf().isEmpty) {
Expand Down Expand Up @@ -66,10 +65,10 @@ object SelectIntoPlan {

} else {
logger.info("offline select into: format[{}], options[{}], write mode[{}], out path {}", format, options,
mode, outPath)
mode, outPath)
var ds = input.getDf()
val coalesce = extra.get("coalesce").map(_.toInt)
if (coalesce.nonEmpty && coalesce.get > 0){
if (coalesce.nonEmpty && coalesce.get > 0) {
ds = ds.coalesce(coalesce.get)
logger.info("coalesce to {} part", coalesce.get)
}
Expand Down
Loading

0 comments on commit bfe5c1c

Please sign in to comment.