Skip to content

Commit

Permalink
feat: support multi db in parse and add getDependenceTables (#3314)
Browse files Browse the repository at this point in the history
  • Loading branch information
vagetablechicken authored Jun 25, 2023
1 parent bf4e614 commit 4b88184
Show file tree
Hide file tree
Showing 19 changed files with 942 additions and 510 deletions.
50 changes: 34 additions & 16 deletions docs/zh/quickstart/sdk/java_sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,17 +506,17 @@ java -cp target/demo-1.0-SNAPSHOT.jar com.openmldb.demo.App

### 可选配置项

| **可选配置项** | **说明** |
| -------------- | ------------------------------------------------------------ |
| **可选配置项** | **说明** |
| --------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| enableDebug | 默认 false,开启 hybridse 的 debug 日志(注意不是全局的 debug 日志),可以查看到更多 sql 编译和运行的日志。但这些日志不是全部被客户端收集,需要查看 tablet server 日志。 |
| requestTimeout | 默认 60000 ms,这个 timeout 是客户端发送的 rpc 超时时间,发送到 taskmanager 的除外(job 的 rpc timeout 由 variable `job_timeout` 控制)。 |
| glogLevel | 默认 0,和 glog 的 minloglevel 类似,`INFO/WARNING/ERROR/FATAL` 日志分别对应 `0/1/2/3`。0 表示打印 INFO 以及上的等级。 |
| glogDir | 默认为 empty,日志目录为空时,打印到 stderr,即控制台。 |
| maxSqlCacheSize | 默认 50,客户端单个 db 单种执行模式的最大 sql cache 数量,如果出现 cache淘汰引发的错误,可以增大这一 size 避开问题。 |
| sessionTimeout | 默认 10000 ms,zk 的 session timeout。 |
| zkLogLevel | 默认 3,`0/1/2/3/4` 分别代表 `禁止所有 zk log/error/warn/info/debug` |
| zkLogFile | 默认 empty,打印到 stdout。 |
| sparkConfPath | 默认 empty,可以通过此配置更改 job 使用的 spark conf,而不需要配置 taskmanager 重启。 |
| requestTimeout | 默认 60000 ms,这个 timeout 是客户端发送的 rpc 超时时间,发送到 taskmanager 的除外(job 的 rpc timeout 由 variable `job_timeout` 控制)。 |
| glogLevel | 默认 0,和 glog 的 minloglevel 类似,`INFO/WARNING/ERROR/FATAL` 日志分别对应 `0/1/2/3`。0 表示打印 INFO 以及上的等级。 |
| glogDir | 默认为 empty,日志目录为空时,打印到 stderr,即控制台。 |
| maxSqlCacheSize | 默认 50,客户端单个 db 单种执行模式的最大 sql cache 数量,如果出现 cache淘汰引发的错误,可以增大这一 size 避开问题。 |
| sessionTimeout | 默认 10000 ms,zk 的 session timeout。 |
| zkLogLevel | 默认 3,`0/1/2/3/4` 分别代表 `禁止所有 zk log/error/warn/info/debug` |
| zkLogFile | 默认 empty,打印到 stdout。 |
| sparkConfPath | 默认 empty,可以通过此配置更改 job 使用的 spark conf,而不需要配置 taskmanager 重启。 |

## SQL 校验

Expand All @@ -525,26 +525,43 @@ Java 客户端支持对 SQL 进行正确性校验,验证是否可执行。分
- `validateSQLInBatch` 可以验证 SQL 是否能在离线端执行。
- `validateSQLInRequest` 可以验证 SQL 是否能被部署上线。

两个接口都需要传入 SQL 所需要的所有表 schema。目前只支持单 db,请不要在 SQL 语句中使用 `db.table` 格式
两种接口都需要传入 SQL 所需要的所有表 schema,支持多 db。为了向后兼容,允许参数中不填写`db`(当前use的db),等价于use schema表中的第一个db。这种情况下,输入 SQL 语句需要保证`<table>`格式的表来自第一个db,不影响`<db>.<table>`格式的 SQL

例如:验证 SQL `select count(c1) over w1 from t3 window w1 as(partition by c1 order by c2 rows between unbounded preceding and current row);`,那么除了这个语句,还需要将表 `t3` 的 schema 作为第二参数 schemaMaps 传入。格式为 Map,key 为 db 名,value 为每个 db 的所有 table schema(Map)。实际只支持单 db,所以这里通常只有 1 个 db,如下所示的 db3。db 下的 table schema map key 为 table name,value 为 com.\_4paradigm.openmldb.sdk.Schema,由每列的 name 和 type 构成。
例如:验证 SQL `select count(c1) over w1 from t3 window w1 as(partition by c1 order by c2 rows between unbounded preceding and current row);`,那么除了这个语句,还需要将表 `t3` 的 schema 作为第二参数 schemaMaps 传入。格式为 Map,key 为 db 名,value 为每个 db 的所有 table schema(Map)。这里为了演示简单,只有 1 个 db,如下所示的 db3。db 下的 table schema map key 为 table name,value 为 `com.\_4paradigm.openmldb.sdk.Schema`,由每列的 name 和 type 构成。

返回结果`List<String>`,如果校验正确,返回空列表;如果校验失败,返回错误信息列表`[error_msg, error_trace]`

```java
Map<String, Map<String, Schema>> schemaMaps = new HashMap<>();
Map<String, Schema> dbSchema = new HashMap<>();
dbSchema = new HashMap<>();
dbSchema.put("t3", new Schema(Arrays.asList(new Column("c1", Types.VARCHAR), new Column("c2", Types.BIGINT))));
schemaMaps.put("db3", dbSchema);
// 可以使用no db参数的格式,需保证schemaMaps中只有一个db,且sql中只是用<table>格式
// List<String> ret = SqlClusterExecutor.validateSQLInRequest("select count(c1) over w1 from t3 window "+
// "w1 as(partition by c1 order by c2 rows between unbounded preceding and current row);", schemaMaps);
List<String> ret = SqlClusterExecutor.validateSQLInRequest("select count(c1) over w1 from t3 window "+
"w1 as(partition by c1 order by c2 rows between unbounded preceding and current row);", schemaMaps);
"w1 as(partition by c1 order by c2 rows between unbounded preceding and current row);", "db3", schemaMaps);
Assert.assertEquals(ret.size(), 0);
```

## 生成建表DDL

`public static List<String> genDDL(String sql, Map<String, Map<String, Schema>> tableSchema)`方法可以帮助用户,根据想要deploy的 SQL,自动生成建表语句,**目前只支持单db**。参数`sql`不可以是使用`<db>.<table>`格式,`tableSchema`输入sql依赖的所有table的schema,格式和前文一致,即使此处`tableSchema`存在多db,db信息也会被丢弃,所有表都等价于在同一个不知名的db中。

## SQL Output Schema

`public static Schema genOutputSchema(String sql, String usedDB, Map<String, Map<String, Schema>> tableSchema)`方法可以得到 SQL 的 Output Schema,支持多db。如果使用`usedDB``sql`中使用该db的表,可以使用`<table>`格式。为了向后兼容,还支持了`public static Schema genOutputSchema(String sql, Map<String, Map<String, Schema>> tableSchema)`无db的接口,等价于使用第一个db作为used db,因此,也需要保证`sql``<table>`格式的表来自此db。

## SQL 表血缘

`public static List<Pair<String, String>> getDependentTables(String sql, String usedDB, Map<String, Map<String, Schema>> tableSchema)`可以获得`sql`依赖的所有表,`Pair<String, String>`分别对应库名和表名,列表的第一个元素为主表,`[1,end)`为其他依赖表(不包括主表)。输入参数`usedDB`若为空串,即无use db下进行查询。(区别于前面的`genDDL`等兼容规则)

## SQL 合并

Java 客户端支持对多个 SQL 进行合并,并进行 request 模式的正确性校验,接口为`mergeSQL`,只能在所有输入SQL的主表一致的情况下合并。

输入参数:想要合并的 SQL 组,主表名与主表的join key(可多个),以及所有表的schema。
输入参数:想要合并的 SQL 组,当前使用的库名,主表的join key(可多个),以及所有表的schema。

例如,我们有这样四个特征组SQL:
```
Expand All @@ -561,7 +578,8 @@ select sum(c2) over w1 from main window w1 as (union (select \"\" as id, * from
它们的主表均为main表,所以它们可以进行 SQL 合并。合并本质是进行join,所以我们还需要知道main表的unique列,它们可以定位到唯一一行数据。例如,main表id并不唯一,可能存在多行的id值相同,但不会出现id与c1两列值都相同,那么我们可以用id与c1两列来进行join。类似 SQL 校验,我们也传入表的schema map。

```java
String merged = SqlClusterExecutor.mergeSQL(sqls, "main", Arrays.asList("id", "c1"), schemaMaps);
// 为了展示简单,我们仅使用单个db的表,所以只需要填写used db,sql中均使用<table>格式的表名。如果sql均使用<db>.<table>格式,used db可以填空串。
String merged = SqlClusterExecutor.mergeSQL(sqls, "db", Arrays.asList("id", "c1"), schemaMaps);
```

输出结果为单个合并后的 SQL,见下。输入的SQL一共选择四个特征,所以合并 SQL 只会输出这四个特征列。(我们会自动过滤join keys)
Expand All @@ -572,4 +590,4 @@ select `c1`, `of2`, `of4`, `sum(c2)over w1` from (select main.id as merge_id_0,

```{note}
如果合并出现`Ambiguous column name`错误,可能是不同特征组里有相同的特征名,请在输入SQL中使用别名区分它们。
```
```
Loading

0 comments on commit 4b88184

Please sign in to comment.