diff --git a/docs/en/deploy/conf.md b/docs/en/deploy/conf.md
index f499169cd7c..676de5ef866 100644
--- a/docs/en/deploy/conf.md
+++ b/docs/en/deploy/conf.md
@@ -24,11 +24,11 @@
#--request_max_retry=3
# Configure the request timeout in milliseconds, the default is 12 seconds
#--request_timeout_ms=12000
-# Configure the retry interval when the request is unreachable, generally do not need to be modified, in milliseconds
+# Configure the retry interval when the request is unreachable, generally does not need to be modified, in milliseconds
#--request_sleep_time=1000
# Configure the zookeeper session timeout in milliseconds
--zk_session_timeout=10000
-# Configure the zookeeper health check interval, the unit is milliseconds, generally do not need to be modified
+# Configure the zookeeper health check interval, the unit is milliseconds, generally does not need to be modified
#--zk_keep_alive_check_interval=15000
# Configure the timeout period for tablet heartbeat detection in milliseconds, the default is 1 minute. If the tablet is still unreachable after this time, the nameserver considers that the tablet is unavailable and will perform the operation of offline the node
--tablet_heartbeat_timeout=60000
@@ -265,7 +265,6 @@ spark.default.conf=
spark.eventLog.dir=
spark.yarn.maxAppAttempts=1
batchjob.jar.path=
-namenode.uri=
offline.data.prefix=file:///tmp/openmldb_offline_storage/
hadoop.conf.dir=
#enable.hive.support=false
@@ -275,6 +274,7 @@ hadoop.conf.dir=
Some of the important configurations for Spark Config is as follows:
+
```{note}
Understand the relationships between configurations and environment variables.
@@ -295,47 +295,60 @@ With one-clock deployment, SPARK_HOME will be set as `/spark`. For
`spark.master` configures Spark modes, more information can be found at [Spark Master URL](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls).
-
-
TaskManager only allows `local` and its variants, `yarn`, `yarn-cluster` and `yarn-client` modes. Default mode is `local[*]`, which is milti-process local mode (thread count is cpu counts). Spark cluster `spark://`, Mesos cluster `mesos://` and Kubernetes `k8s://` cluster modes are currently not supported.
##### `local` Mode
-Spark tasks are executed locally on TaskManager deployment machine. Please note the following:
-- `offline.data.prefix` is set by default as `file:///tmp/openmldb_offline_storage/`, which is on TaskManager deployment machine. This can be set to other locations as required.
-- **Before starting TaskManager**, HDFS path can be configured by setting environment variable `HADOOP_CONF_DIR` to Hadoop configuration directory (Note: it is the environment variable, not the configuration item). The directory needs to include `core-site.xml`, `hdfs-site.xml` configuration files. For more information, refer to [Spark documentation](https://spark.apache.org/docs/3.2.1/configuration.html#inheriting-hadoop-cluster-configuration).
+The local mode means that the Spark task runs on the local machine (where the TaskManager is located). In this mode, not many configurations are required, but two points should be noted:
+- The storage location of offline tables `offline.data.prefix` is set to `file:///tmp/openmldb_offline_storage/` by default, which refers to the `/tmp` directory on the TaskManager's machine. If the TaskManager is moved to another machine, the data cannot be automatically migrated. It is not recommended to use `file://` when deploying multiple TaskManagers on different machines. You can configure it as an HDFS path, and you need to configure the variables `hadoop.conf.dir` and `hadoop.user.name`. For more details, see [Hadoop-related configurations](#hadoop-related-configurations).
-```{note}
-Currently, `namenode.uri` needs to be configured. When deleting an offline table, HDFS FileSystem `namenode.uri` will be connected, and offline table path will be deleted. This item will be discarded in future updates.
-```
-- batchjob path `batchjob.jar.path` can be set automativally. It can be configured to other paths.
+- The path of the batchjob `batchjob.jar.path` can be automatically obtained and does not need to be configured. If you want to use a batchjob from elsewhere, you can configure this parameter.
```{seealso}
-if Hadoop/Yarm requires Kerberos authentication, refer to [FAQ](../faq.md).
+If Hadoop/Yarn requires Kerberos authentication, refer to the [Client FAQ](../faq/client_faq.md).
```
+
##### `yarn/yarn-cluster` Mode
+"yarn" and "yarn-cluster" are the same mode, where Spark tasks run on a Yarn cluster. This mode requires several configurations, including:
+
+- The yarn mode must connect to a Hadoop cluster and requires the proper configuration of Hadoop variables `hadoop.conf.dir` and `hadoop.user.name`. For more details, refer to [Hadoop-related configurations](#hadoop-related-configurations).
-`yarn` and `yarn-cluster` is the same mode, where Spark tasks execute on the Yarn cluster. The configuration items are:
+The following configurations usually require an HDFS that belongs to the same Hadoop cluster as Yarn, unless a direct `hdfs://` address can be used.
-- **Before starting TaskManager**, configure environment variable `HADOOP_CONF_DIR` to Hadoop and Yarn configuration directory. The directory should include `core-site.xml` and `hdfs-site.xml` for hadoop, and `yarn-site.xml` for Yarn. For more details, refer to [Spark documentation](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn).
+- The `spark.yarn.jars` configuration specifies the location of Spark runtime JAR files that Yarn needs to read. It must be an `hdfs://` address. You can upload the `jars` directory from the [OpenMLDB Spark distribution](../../tutorial/openmldbspark_distribution.md) to HDFS and configure it as `hdfs:///jars/*` (note the wildcard). [If this parameter is not configured, Yarn will package and distribute `$SPARK_HOME/jars` for each offline task, which is inefficient](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#preparations). Therefore, it is recommended to configure this parameter.
+- `batchjob.jar.path` must be an HDFS path (specific to the package name). Upload the batch job JAR file to HDFS and configure it with the corresponding address to ensure that all workers in the Yarn cluster can access the batch job package.
+- `offline.data.prefix` must be an HDFS path to ensure that all workers in the Yarn cluster can read and write data.
-- `spark.yarn.jars` Configure Spark jar for Yarn. It has to be a `hdfs://` path. You can update the `jars` directory from [OpenMLDB Spark distribution](../../tutorial/openmldbspark_distribution.md) to HDFS, and set it as `hdfs:///jars/*`. If not set, Yarn will package and distribute `$SPARK_HOME/jars`, and will do so [for each offline task](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#preparations). Therefore, we recommend it to be set.
+##### `yarn-client` Mode
-- `batchjob.jar.path` It has to be a HDFS path. Upload batchjob jar to HDFS and configure the respective path. Make sure that all Workers in Yarn cluster have access to the batchjob jar.
+[Driver executes locally](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn), and the executor executes on the Yarn cluster. Configurations are the same as `yarn-cluster`.
-- `offline.data.prefix` It has to be a HDFS path. Make sure that all Workers in Yarn cluster have access. Use the environment variable `HADOOP_CONF_DIR`.
+#### spark.default.conf
+`spark.default.conf` configures Spark parameters in the format of `key=value`. Multiple configurations are separated by `;`, for example:
-##### `yarn-client` Mode
+#### Hadoop-related configurations
-[Driver executes locally](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn),and executor execute on the Yarn cluster. Configurations are the same as `yarn-cluster`.
+`hadoop.conf.dir` and `hadoop.user.name` are configurations for TaskManager. They will be passed to the Spark Job when TaskManager submits the job, which is equivalent to configuring the environment variables `HADOOP_CONF_DIR` and `HADOOP_USER_NAME` before creating the Spark Job.
-#### `spark.default.conf`
+Details of the configurations:
-Format is `key=value`, use `;` to separate. For example:
+- `hadoop.conf.dir` represents the directory where Hadoop and Yarn configuration files are located (note that this directory is on the TaskManager node; the file directory should include Hadoop's `core-site.xml`, `hdfs-site.xml`, `yarn-site.xml`, and other configuration files, refer to the [Spark official documentation](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn)).
+- `hadoop.user.name` represents the Hadoop user.
-```
-spark.default.conf=spark.executor.instances=2;spark.executor.memory=2g;spark.executor.cores=2
-```
-Same affect as `--conf`. For more configurations, refer to [Spark documentation](https://spark.apache.org/docs/3.1.2/configuration.html).
+Essentially, it configures environment variables, and the scope of their effect is explained in Understanding the Relationship Between Configurations and Environment Variables. If there are special requirements, it is possible to bypass the configuration in TaskManager and configure the environment variables in other ways. However, it is recommended not to mix the two methods and use only one method for easier debugging.
+
+Please note that unspecified variables cannot be passed in sbin deployment. Currently, TaskManager only receives the environment variables `SPARK_HOME` and `RUNNER_JAVA_HOME`. Therefore, if you are using sbin deployment, it is recommended to use the TaskManager configuration file.
+
+Other configuration methods:
+- Copy Hadoop and Yarn configuration files (`core-site.xml`, `hdfs-site.xml`, etc.) to the `{spark.home}/conf` directory.
+- If there are existing environment variables on the TaskManager node, or **before manually starting TaskManager**, configure the environment variables `HADOOP_CONF_DIR` and `HADOOP_USER_NAME`.
+ > Similar to the following steps:
+ > ```bash
+ > cd
+ > export HADOOP_CONF_DIR=
+ > export HADOOP_USER_NAME=
+ > bash bin/start.sh start taskmanager
+ > ```
+ > Note that SSH remote startup may lose environment variables, so it is recommended to export them correctly before starting.
diff --git a/docs/en/integration/offline_data_sources/tidb.md b/docs/en/integration/offline_data_sources/tidb.md
index c778a4caf17..f814c8a1b14 100644
--- a/docs/en/integration/offline_data_sources/tidb.md
+++ b/docs/en/integration/offline_data_sources/tidb.md
@@ -8,7 +8,7 @@
### Installation
-[OpenMLDB Spark Distribution](../../tutorial/openmldbspark_distribution.md) v0.8.5 and later versions utilize the TiSpark tool to interact with TiDB. The current release includes TiSpark 3.1.x dependencies (`tispark-assembly-3.2_2.12-3.1.5.jar`, `mysql-connector-java-8.0.29.jar`). If your TiSpark version doesn't match your TiDB version, refer to the [TiSpark documentation](https://docs.pingcap.com/tidb/stable/tispark-overview) for compatible dependencies to add to Spark's classpath/jars.
+The current version utilizes TiSpark for interacting with the TiDB database. To get started, download the necessary dependencies for TiSpark 3.1.x (`tispark-assembly-3.2_2.12-3.1.5.jar` and `mysql-connector-java-8.0.29.jar`). If the TiSpark version is not compatible with your current TiDB version, refer to the [TiSpark documentation](https://docs.pingcap.com/tidb/stable/tispark-overview) for downloading the corresponding TiSpark dependencies. Then, add them to the Spark classpath/jars.
### Configuration
@@ -32,17 +32,20 @@ Once either configuration is successful, access TiDB tables using the format `ti
TiDB schema reference can be found at [TiDB Schema](https://docs.pingcap.com/tidb/stable/data-type-overview). Currently, only the following TiDB data formats are supported:
-| OpenMLDB Data Format | TiDB Data Format |
-|----------------------|-------------------------|
-| BOOL | BOOL |
-| SMALLINT | Currently not supported |
-| INT | Currently not supported |
-| BIGINT | BIGINT |
-| FLOAT | FLOAT |
-| DOUBLE | DOUBLE |
-| DATE | DATE |
-| TIMESTAMP | TIMESTAMP |
-| STRING | VARCHAR(M) |
+| OpenMLDB Data Format | TiDB Data Format |
+|----------------------|------------------|
+| BOOL | BOOL |
+| SMALLINT | SMALLINT |
+| INT | INT |
+| BIGINT | BIGINT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DATE | DATE |
+| TIMESTAMP | DATETIME |
+| TIMESTAMP | TIMESTAMP |
+| STRING | VARCHAR(M) |
+
+Tip: Asymmetric integer conversion will be affected by the value range. Please try to refer to the above data types for mapping.
## Importing TiDB Data into OpenMLDB
@@ -50,7 +53,8 @@ Importing data from TiDB sources is supported through the [`LOAD DATA INFILE`](.
- Both offline and online engines can import TiDB data sources.
- TiDB import supports symbolic links, which can reduce hard copying and ensure that OpenMLDB always reads the latest data from TiDB. To enable soft link data import, use the parameter `deep_copy=false`.
-- The `OPTIONS` parameter only supports `deep_copy`, `mode`, and `sql`.
+- TiDB supports parameter `skip_cvt` in `@@execute_mode='online'` mode: whether to skip field type conversion, the default is `false`, if it is `true`, field type conversion and strict schema checking will be performed , if it is `false`, there will be no conversion and schema checking actions, and the performance will be better, but there may be errors such as type overflow, which requires manual inspection.
+- The `OPTIONS` parameter only supports `deep_copy`, `mode`, `sql` , and `skip_cvt` .
For example:
@@ -70,11 +74,12 @@ LOAD DATA INFILE 'tidb://tidb_catalog.db1.t1' INTO TABLE tidb_catalog.db1.t1 OPT
Exporting data from OpenMLDB to TiDB sources is supported through the [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) API, using the specific URI interface format `tidb://tidb_catalog.[db].[table]` to export data to the TiDB data warehouse. Note:
+- The offline engine can support exporting TiDB data sources, but the online engine does not yet support it.
- The database and table must already exist. Currently, automatic creation of non-existent databases or tables is not supported.
-- Only the export mode `mode` is effective in the `OPTIONS` parameter. Other parameters are not effective, and the current parameter is mandatory.
+- The `OPTIONS` parameter is only valid for `mode='append'`. Other parameters as `overwrite` and `errorifexists` are invalid. This is because the current version of TiSpark does not support them. If TiSpark supports them in future versions, you can upgrade for compatibility.
For example:
```sql
SELECT col1, col2, col3 FROM t1 INTO OUTFILE 'tidb://tidb_catalog.db1.t1' options(mode='append');
-```
\ No newline at end of file
+```
diff --git a/docs/en/openmldb_sql/data_types/date_and_time_types.md b/docs/en/openmldb_sql/data_types/date_and_time_types.md
index 7c35f0a94d1..f2aa18a2b8b 100644
--- a/docs/en/openmldb_sql/data_types/date_and_time_types.md
+++ b/docs/en/openmldb_sql/data_types/date_and_time_types.md
@@ -4,10 +4,10 @@ OpenMLDB supports date type `DATE` and timestamp `TIMESTAMP`.
Each time type has a valid range of values and a NULL value. The NULL value is used when specifying an invalid value that cannot be represented.
-| Type | Size (bytes) | Scope | Format | Use |
-| :-------- | :----------- | :----------------------------------------------------------- | :-------------- | :----------------------- |
-| DATE | 4 | 1900-01-01 ~ | YYYY-MM-DD | Date Value |
-| TIMESTAMP | 8 | ~ INT64_MAX | online: int64, offline `LOAD DATA`: int64 or 'yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]' | Mixed Date and Time Value, Timestamp |
+| Type | Size (bytes) | Scope | Format | Use |
+| :-------- | :----------- | :----------------------------------------------------------- |:-------------------------------------------------------------------------------------------| :----------------------- |
+| DATE | 4 | 1900-01-01 ~ | YYYY-MM-DD | Date Value |
+| TIMESTAMP | 8 | ~ INT64_MAX | online: int64, offline(`LOAD DATA`, `INSERT`): int64 or 'yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]' | Mixed Date and Time Value, Timestamp |
## Time Zone Handling
diff --git a/docs/en/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md b/docs/en/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md
index 051c1a9e3ba..1d901e66b51 100644
--- a/docs/en/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md
+++ b/docs/en/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md
@@ -1,6 +1,6 @@
# CREATE INDEX
-The `CREATE INDEX` statement is used to create a new index on existing table. Running `CREATE INDEX` will initiates an asynchronous job, and you can check the status of the job by executing `SHOW JOBS FROM NAMESERVER`.
+The `CREATE INDEX` statement is used to create a new index on an existing table. Running `CREATE INDEX` initiates an asynchronous job, and you can check the status of the job by executing `SHOW JOBS FROM NAMESERVER`. Please note that the index is not available until the asynchronous task is completed, and any scenarios that require the new index will fail.
## Syntax
diff --git a/docs/en/openmldb_sql/dml/INSERT_STATEMENT.md b/docs/en/openmldb_sql/dml/INSERT_STATEMENT.md
index 9bbab72e61b..fb8530668d5 100644
--- a/docs/en/openmldb_sql/dml/INSERT_STATEMENT.md
+++ b/docs/en/openmldb_sql/dml/INSERT_STATEMENT.md
@@ -5,7 +5,7 @@ OpenMLDB supports single-row and multi-row insert statements.
## Syntax
```
-INSERT INFO tbl_name (column_list) VALUES (value_list) [, value_list ...]
+INSERT [[OR] IGNORE] INFO tbl_name (column_list) VALUES (value_list) [, value_list ...]
column_list:
col_name [, col_name] ...
@@ -15,7 +15,9 @@ value_list:
```
**Description**
-- `INSERT` statement only works in online execute mode
+- By default, `INSERT` does not deduplicate records, whereas `INSERT OR IGNORE` allows ignoring data that already exists in the table, making it suitable for repeated attempts.
+- Offline execute mode only supports `INSERT`, not `INSERT OR IGNORE`.
+- `INSERT` statement in offline execute mode is unsupported on tables with symbolic path.In OpenMLDB, tables have two types of offline data addresses: Data path and Symbolic path, as detailed in [Offline Import Rules](./LOAD_DATA_STATEMENT.md#offline-import-rules). `INSERT` data in offline execute mode will be written to Data path and the writing format is in Parquet format. Since the data format of Symbolic path can be set freely, `INSERT` data may cause data format conflicts if the table has symbolic path. Therefore, `INSERT` statement in offline execute mode is unsupported on tables with symbolic paths currently.
## Examples
diff --git a/docs/poetry.lock b/docs/poetry.lock
index 3b2ecfb5fc7..ca6c9ccb8c6 100644
--- a/docs/poetry.lock
+++ b/docs/poetry.lock
@@ -1,4 +1,4 @@
-# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
+# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]]
name = "alabaster"
@@ -92,13 +92,13 @@ files = [
[[package]]
name = "idna"
-version = "3.3"
+version = "3.7"
description = "Internationalized Domain Names in Applications (IDNA)"
optional = false
python-versions = ">=3.5"
files = [
- {file = "idna-3.3-py3-none-any.whl", hash = "sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff"},
- {file = "idna-3.3.tar.gz", hash = "sha256:9d643ff0a55b762d5cdb124b8eaa99c66322e2157b69160bc32796e824360e6d"},
+ {file = "idna-3.7-py3-none-any.whl", hash = "sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0"},
+ {file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"},
]
[[package]]
diff --git a/docs/zh/deploy/conf.md b/docs/zh/deploy/conf.md
index 500455a64a4..20342a83145 100644
--- a/docs/zh/deploy/conf.md
+++ b/docs/zh/deploy/conf.md
@@ -270,7 +270,6 @@ spark.default.conf=
spark.eventLog.dir=
spark.yarn.maxAppAttempts=1
batchjob.jar.path=
-namenode.uri=
offline.data.prefix=file:///tmp/openmldb_offline_storage/
hadoop.conf.dir=
hadoop.user.name=
@@ -308,52 +307,24 @@ TaskManager只接受`local`及其变种、`yarn`、`yarn-cluster`、`yarn-client
##### local模式
local模式即Spark任务运行在本地(TaskManager所在主机),该模式下不需要太多配置,只需要注意两点:
-- 离线表的存储地址`offline.data.prefix`,默认为`file:///tmp/openmldb_offline_storage/`,即TaskManager所在主机的`/tmp`目录,你可以修改该配置为其他目录。
- - 可以配置为HDFS路径,如果配置为HDFS路径,需要正确配置变量 `hadoop.conf.dir` 和 `hadoop.user.name`,其中 `hadoop.conf.dir` 表示Hadoop配置文件所在目录(注意该目录是TaskManager节点目录;文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`等配置文件,更多见[Spark官方文档](https://spark.apache.org/docs/3.2.1/configuration.html#inheriting-hadoop-cluster-configuration)),`hadoop.user.name` 表示hadoop运行用户,可以通过以下三种方式之一配置这两个变量:
- 1. 在 `conf/taskmanager.properties` 配置文件中配置变量 `hadoop.conf.dir`, `hadoop.user.name`
- 2. 在(TaskManager节点)**启动TaskManager前**配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME`
- 3. 拷贝Hadoop配置文件(`core-site.xml`、`hdfs-site.xml`等)到 `{spark.home}/conf` 目录中
- > sbin部署不能传递非指定的变量,目前TaskManager只会传递环境变量 `SPARK_HOME` 和 `RUNNER_JAVA_HOME`。所以如果是sbin部署,尽量使用第一种方法。
- >
- > 如果使用第二种方式,配置的环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 最好是永久生效的,如果不希望环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 永久生效,可以在一个session里,先临时配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` ,然后启动TaskManager,例如
- > ```bash
- > cd
- > export HADOOP_CONF_DIR=<这里替换为Hadoop配置目录>
- > export HADOOP_USER_NAME=<这里替换为Hadoop用户名>
- > bash bin/start.sh start taskmanager
- > ```
- >
- > 环境变量生效范围参考 理解配置项与环境变量的关系
- ```{note}
- HDFS路径目前需要配置`namenode.uri`,删除离线表时会连接HDFS FileSystem`namenode.uri`,并删除离线表的存储目录(Offline Table Path)。未来将废弃此配置项。
- ```
+- 离线表的存储地址`offline.data.prefix`,默认为`file:///tmp/openmldb_offline_storage/`,即TaskManager所在主机的`/tmp`目录。如果TaskManager换机器,数据无法自动迁移,多机部署TaskManager时不建议使用`file://`。可以配置为HDFS路径,需要配置变量 `hadoop.conf.dir` 和 `hadoop.user.name`,详情见[Hadoop相关配置](#hadoop相关配置)。
+
- batchjob的路径`batchjob.jar.path`可自动获取,无需配置,如果你要使用别处的batchjob,可以配置该参数。
```{seealso}
如果Hadoop/Yarn需要Kerberos认证,参考[FAQ](../faq/client_faq.md#如何配置taskmanager来访问开启kerberos的yarn集群)。
```
-
##### yarn/yarn-cluster模式
"yarn"和"yarn-cluster"是同一个模式,即Spark任务运行在Yarn集群上,该模式下需要配置的参数较多,主要包括:
-- 正确配置变量 `hadoop.conf.dir` 和 `hadoop.user.name`,其中 `hadoop.conf.dir` 表示Hadoop和Yarn配置文件所在目录(注意该目录是TaskManager节点目录;文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`, `yarn-site.xml`等配置文件,参考[Spark官方文档](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn)),`hadoop.user.name` 表示hadoop运行用户,可以通过以下三种方式之一配置这两个变量:
- 1. 在 `conf/taskmanager.properties` 配置文件中配置变量 `hadoop.conf.dir`, `hadoop.user.name`
- 2. 在(TaskManager节点)**启动TaskManager前**配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME`
- 3. 拷贝Hadoop和Yarn配置文件(`core-site.xml`、`hdfs-site.xml`等)到 `{spark.home}/conf` 目录中
- > sbin部署不能传递非指定的变量,目前TaskManager只会传递环境变量 `SPARK_HOME` 和 `RUNNER_JAVA_HOME`。所以如果是sbin部署,尽量使用第一种方法。
- >
- > 如果使用第二种方式,配置的环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 最好是永久生效的,如果不希望环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 永久生效,可以在一个session里,先临时配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` ,然后启动TaskManager,例如
- > ```bash
- > cd
- > export HADOOP_CONF_DIR=<这里替换为Hadoop配置目录>
- > export HADOOP_USER_NAME=<这里替换为Hadoop用户名>
- > bash bin/start.sh start taskmanager
- > ```
- >
- > 环境变量生效范围参考 理解配置项与环境变量的关系
+
+- yarn模式必须连接Hadoop集群,需要配置好Hadoop相关变量 `hadoop.conf.dir` 和 `hadoop.user.name`,详情见[Hadoop相关配置](#hadoop相关配置)。
+
+以下配置的HDFS通常和yarn属于一个Hadoop集群,否则只能使用可直连的`hdfs://`地址。
+
- `spark.yarn.jars`配置Yarn需要读取的Spark运行jar包地址,必须是`hdfs://`地址。可以上传[OpenMLDB Spark 发行版](../../tutorial/openmldbspark_distribution.md)解压后的`jars`目录到HDFS上,并配置为`hdfs:///jars/*`(注意通配符)。[如果不配置该参数,Yarn会将`$SPARK_HOME/jars`打包上传分发,并且每次离线任务都要分发](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#preparations),效率较低,所以推荐配置。
- `batchjob.jar.path`必须是HDFS路径(具体到包名),上传batchjob jar包到HDFS上,并配置为对应地址,保证Yarn集群上所有Worker可以获得batchjob包。
-- `offline.data.prefix`必须是HDFS路径,保证Yarn集群上所有Worker可读写数据。应使用前面配置的环境变量`HADOOP_CONF_DIR`中的Hadoop集群地址。
+- `offline.data.prefix`必须是HDFS路径,保证Yarn集群上所有Worker可读写数据。
##### yarn-client模式
@@ -366,3 +337,29 @@ local模式即Spark任务运行在本地(TaskManager所在主机),该模
spark.default.conf=spark.executor.instances=2;spark.executor.memory=2g;spark.executor.cores=2
```
等效于Spark的`--conf`参数,如果提示修改Spark高级参数,请将参数加入此项中。更多参数,参考[Spark 配置](https://spark.apache.org/docs/3.1.2/configuration.html)。
+
+#### Hadoop相关配置
+
+`hadoop.conf.dir`与`hadoop.user.name`属于TaskManager的配置,它们将在TaskManager提交Spark Job时传给Job,等价于创建Spark Job前配置环境变量`HADOOP_CONF_DIR`和`HADOOP_USER_NAME`。
+
+配置项详情:
+
+- `hadoop.conf.dir` 表示Hadoop和Yarn配置文件所在目录(注意该目录是TaskManager节点目录;文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`, `yarn-site.xml`等配置文件,参考[Spark官方文档](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn))。
+- `hadoop.user.name` 表示hadoop运行用户。
+
+本质是在配置环境变量,生效范围参考 理解配置项与环境变量的关系。如果有特殊需求,可以绕过在TaskManager中配置,用其他方式进行环境变量配置。但建议不要混合使用,只用一种方式更容易调试。
+
+请注意,sbin部署不能传递非指定的变量,目前TaskManager只会接收到环境变量 `SPARK_HOME` 和 `RUNNER_JAVA_HOME`。所以如果是sbin部署,尽量使用TaskManager配置文件。
+
+其他配置方法:
+- 拷贝Hadoop和Yarn配置文件(`core-site.xml`、`hdfs-site.xml`等)到 `{spark.home}/conf` 目录中。
+
+- TaskManager节点上已有环境变量,或**手动启动TaskManager前**配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME`。
+ > 类似以下步骤:
+ > ```bash
+ > cd
+ > export HADOOP_CONF_DIR=<这里替换为Hadoop配置目录>
+ > export HADOOP_USER_NAME=<这里替换为Hadoop用户名>
+ > bash bin/start.sh start taskmanager
+ > ```
+ > 注意,ssh远程启动可能会丢失环境变量,建议启动前export保证无误。
diff --git a/docs/zh/integration/offline_data_sources/tidb.md b/docs/zh/integration/offline_data_sources/tidb.md
index dad2196a75f..26c303ac00b 100644
--- a/docs/zh/integration/offline_data_sources/tidb.md
+++ b/docs/zh/integration/offline_data_sources/tidb.md
@@ -8,7 +8,7 @@
### 安装
-[OpenMLDB Spark 发行版](../../tutorial/openmldbspark_distribution.md) v0.8.5 及以上版本使用了TiSpark工具来操作TiDB数据库, 当前版本已包含 TiSpark 3.1.x 依赖(tispark-assembly-3.2_2.12-3.1.5.jarh、mysql-connector-java-8.0.29.jar)。如果TiSpark版本不兼容现有的TiDB版本,你可以从[TiSpark文档](https://docs.pingcap.com/zh/tidb/stable/tispark-overview)查找下载对应的TiSpark依赖,并将其添加到Spark的classpath/jars中。
+当前版本使用TiSpark来操作TiDB数据库, 需要先下载 TiSpark 3.1.x 的相关依赖(`tispark-assembly-3.2_2.12-3.1.5.jar`、`mysql-connector-java-8.0.29.jar`)。如果TiSpark版本不兼容现有的TiDB版本,可以在[TiSpark文档](https://docs.pingcap.com/zh/tidb/stable/tispark-overview)查找下载对应的TiSpark依赖,然后将其添加到Spark的classpath/jars中。
### 配置
@@ -32,25 +32,29 @@ spark.default.conf=spark.sql.extensions=org.apache.spark.sql.TiExtensions;spark.
TiDB schema参考[TiDB Schema](https://docs.pingcap.com/zh/tidb/stable/data-type-overview)。目前,仅支持以下TiDB数据格式:
-| OpenMLDB 数据格式 | TiDB 数据格式 |
-| ----------------- |---------|
-| BOOL | BOOL |
-| SMALLINT | 暂不支持 |
-| INT | 暂不支持 |
-| BIGINT | BIGINT |
-| FLOAT | FLOAT |
-| DOUBLE | DOUBLE |
-| DATE | DATE |
-| TIMESTAMP | TIMESTAMP |
+| OpenMLDB 数据格式 | TiDB 数据格式 |
+| ----------------- |------------|
+| BOOL | BOOL |
+| SMALLINT | SMALLINT |
+| INT | INT |
+| BIGINT | BIGINT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DATE | DATE |
+| TIMESTAMP | DATETIME |
+| TIMESTAMP | TIMESTAMP |
| STRING | VARCHAR(M) |
+提示:不对称的整型转换会被取值范围影响,请尽量参考以上数据类型进行映射。
+
## 导入 TiDB 数据到 OpenMLDB
对于 TiDB 数据源的导入是通过 API [`LOAD DATA INFILE`](../../openmldb_sql/dml/LOAD_DATA_STATEMENT.md) 进行支持,通过使用特定的 URI 接口 `tidb://tidb_catalog.[db].[table]` 的格式进行导入 TiDB 内的数据。注意:
- 离线和在线引擎均可以导入 TiDB 数据源
- TiDB 导入支持软连接,可以减少硬拷贝并且保证 OpenMLDB 随时读取到 TiDB 的最新数据。启用软链接方式进行数据导入:使用参数 `deep_copy=false`
-- `OPTIONS` 参数仅有 `deep_copy` 、`mode` 和 `sql` 有效
+- TiDB 在`@@execute_mode='online'`模式下支持参数`skip_cvt`:是否跳过字段类型转换,默认为`false`,如果为`true`则会进行字段类型转换以及严格的schema检查,如果为`false`则没有转换以及schema检查动作,性能更好一些,但可能存在类型溢出等错误,需要人工检查。
+- `OPTIONS` 参数仅有 `deep_copy` 、`mode` 、`sql` 和 `skip_cvt` 有效
举例:
@@ -70,8 +74,9 @@ LOAD DATA INFILE 'tidb://tidb_catalog.db1.t1' INTO TABLE tidb_catalog.db1.t1 OPT
对于 TiDB 数据源的导出是通过 API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) 进行支持,通过使用特定的 URI 接口 `tidb://tidb_catalog.[db].[table]` 的格式进行导出到 TiDB 数仓。注意:
+- 离线引擎可以支持导出 TiDB 数据源,在线引擎还不支持
- 数据库和数据表必须已经存在,目前不支持对于不存在的数据库或数据表进行自动创建
-- `OPTIONS` 参数只有导出模式`mode`生效,其他参数均不生效,当前参数为必填项
+- `OPTIONS` 参数仅`mode='append'`有效,其他参数`overwrite`、`errorifexists`均无效,这是由于TiSpark当前版本不支持,如果TiSpark后续版本支持可以进行升级兼容。
举例:
diff --git a/docs/zh/openmldb_sql/data_types/date_and_time_types.md b/docs/zh/openmldb_sql/data_types/date_and_time_types.md
index 9bba51438e7..d67a691a2ad 100644
--- a/docs/zh/openmldb_sql/data_types/date_and_time_types.md
+++ b/docs/zh/openmldb_sql/data_types/date_and_time_types.md
@@ -4,10 +4,10 @@ OpenMLDB支持日期类型`DATE`和时间戳`TIMESTAMP`。
每个时间类型有一个有效值范围和一个NULL值,当指定不合法不能表示的值时使用NULL值。
-| 类型 | 大小 (bytes) | 范围 | 格式 | 用途 |
-| :-------- | :----------- | :----------------------------------------------------------- | :-------------- | :----------------------- |
-| DATE | 4 | 1900-01-01 ~ | YYYY-MM-DD | 日期值 |
-| TIMESTAMP | 8 | ~ INT64_MAX | 在线: int64, 离线`LOAD DATA`: int64 或 'yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]' | 混合日期和时间值,时间戳 |
+| 类型 | 大小 (bytes) | 范围 | 格式 | 用途 |
+| :-------- | :----------- | :----------------------------------------------------------- |:---------------------------------------------------------------------------------| :----------------------- |
+| DATE | 4 | 1900-01-01 ~ | YYYY-MM-DD | 日期值 |
+| TIMESTAMP | 8 | ~ INT64_MAX | 在线: int64, 离线(`LOAD DATA`, `INSERT`): int64 或 'yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]' | 混合日期和时间值,时间戳 |
## 时区处理
diff --git a/docs/zh/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md b/docs/zh/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md
index dd8813f0afa..abfa201ab29 100644
--- a/docs/zh/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md
+++ b/docs/zh/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md
@@ -1,6 +1,6 @@
# CREATE INDEX
-`CREATE INDEX` 语句用来创建索引。添加索引会发起异步任务来加载数据, 可以通过执行`SHOW JOBS FROM NAMESERVER`来查看任务状态
+`CREATE INDEX` 语句用来创建索引。添加索引会发起异步任务来加载数据, 可以通过执行`SHOW JOBS FROM NAMESERVER`来查看任务状态。请注意,异步任务未完成之前,索引不可用,需要新索引的场景会失败。
## 语法
diff --git a/docs/zh/openmldb_sql/dml/INSERT_STATEMENT.md b/docs/zh/openmldb_sql/dml/INSERT_STATEMENT.md
index 4799e557577..cd07b39d6cc 100644
--- a/docs/zh/openmldb_sql/dml/INSERT_STATEMENT.md
+++ b/docs/zh/openmldb_sql/dml/INSERT_STATEMENT.md
@@ -15,8 +15,9 @@ value_list:
```
**说明**
-- `INSERT` 只能用在在线模式
- 默认`INSERT`不会去重,`INSERT OR IGNORE` 则可以忽略已存在于表中的数据,可以反复重试。
+- 离线模式仅支持`INSERT`,不支持`INSERT OR IGNORE`。
+- 离线模式`INSERT`不支持写入有软链接的表。OpenMLDB 中,表的离线数据地址分为两类,离线地址和软链接地址,详见[离线导入规则](./LOAD_DATA_STATEMENT.md#离线导入规则),离线模式`INSERT`的数据会写入离线地址中,写入格式固定为parquet格式。由于软链接的数据格式设置自由,若表存在软链接地址,写入数据可能导致数据格式冲突,因此当前离线模式`INSERT`不支持写入有软链接的表。
## Examples
diff --git a/docs/zh/quickstart/function_boundary.md b/docs/zh/quickstart/function_boundary.md
index 3e7dfc94884..1288454ca7f 100644
--- a/docs/zh/quickstart/function_boundary.md
+++ b/docs/zh/quickstart/function_boundary.md
@@ -10,10 +10,18 @@
通过配置 TaskManager 可以决定离线存储地址 `offline.data.prefix`、离线 job 计算所需 Spark 模式 `spark.master` 等。
-`offline.data.prefix`:可配置为文件路径或 HDFS 路径。生产环境建议配置 HDFS 路径,测试环境(特指 onebox 型,例如在 Docker 容器内启动)可以配置本地文件路径。文件路径作为离线存储,将无法支持多 TaskManager 分布式部署(TaskManager 之间不会传输数据)。如果想在多台主机上部署 TaskManager,请使用 HDFS 等多机可同时访问到的存储介质。如果想测试多 TaskManager 工作协同,可以在一台主机上部署多个 TaskManager,此时可以使用文件路径作为离线存储。
+`offline.data.prefix`:可配置为文件路径或 HDFS 路径。生产环境建议配置 HDFS 路径,测试环境(特指 onebox 型,例如在 Docker 容器内启动,或所有组件都在一台机器上)可以配置本地文件路径。这是因为,TaskManager提交local的Spark Job,这个Job可以访问到本地文件路径。但这样的话,将无法支持多 TaskManager 分布式部署(TaskManager 之间不会传输数据)。
+
+- 如果想在多台主机上部署 TaskManager,请使用 HDFS 等多机可同时访问到的存储介质。
+
+- 如果想测试多 TaskManager 工作协同,可以在一台主机上部署多个 TaskManager,此时可以使用文件路径作为离线存储。
`spark.master=local[*]`:Spark 默认配置为 `local[*]` 模式(自动绑定 CPU 核数,如果发现离线任务比较慢,建议使用 yarn 模式,改变配置后重启 TaskManager 生效。更多配置可参考 [master-urls](https://spark.apache.org/docs/3.1.2/submitting-applications.htmlmaster-urls)。
+### 配置更新
+
+TaskManager除了`spark.default.conf`,其他所有配置都需要重启生效。TaskManager是无状态的,只要不是正在访问中,重启不会有副作用。如果你只需要临时改变离线命令相关的配置,可以不用在TaskManager的配置中更新,可以使用[临时spark配置](#临时spark配置)的方式,只对单个离线任务进行配置调整。
+
### spark.default.conf
更多可选配置,可以写在 `spark.default.conf` 参数中,格式为 `k1=v1;k2=v2`。例如:
@@ -26,7 +34,7 @@ spark.default.conf=spark.port.maxRetries=32;foo=bar
### 临时Spark配置
-见[客户端Spark配置文件](../reference/client_config/client_spark_config.md),CLI支持临时更改Spark配置,不需要重启TaskManager。但此配置方式不可以改变spark.master等配置。
+见[客户端Spark配置文件](../reference/client_config/client_spark_config.md),CLI支持临时更改Spark配置,不需要重启TaskManager。但此配置方式不可以改变spark.master等配置,只能改变`spark.default.conf`中的配置项。
## DDL 边界——DEPLOY 语句
@@ -71,7 +79,9 @@ spark.default.conf=spark.port.maxRetries=32;foo=bar
`LOAD DATA` 无论导入到在线或离线,都是离线 job。源数据的格式规则,离线在线没有区别。
-推荐使用 HDFS 文件作为源数据,无论 TaskManager 是 local/yarn 模式,还是 TaskManager 在别的主机上运行,都可以导入。如果源数据为本地文件,是否可以顺利导入需要考虑 TaskManager 模式和运行主机。
+推荐使用 HDFS 文件作为源数据,无论 TaskManager 是 local/yarn 模式,还是 TaskManager 在别的主机上运行,都可以导入。
+
+如果源数据为本地文件,是否可以顺利导入需要考虑 TaskManager 模式和运行主机:
- TaskManager 是 local 模式,只有将源数据放在 TaskManager 进程的主机上才能顺利导入。
- TaskManager 是 yarn (client and cluster) 模式时,由于不知道运行容器是哪台主机,不可使用文件路径作为源数据地址。
diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h
index 030d71dcb53..eaea21050f2 100644
--- a/hybridse/include/vm/physical_op.h
+++ b/hybridse/include/vm/physical_op.h
@@ -1945,6 +1945,8 @@ class PhysicalInsertNode : public PhysicalOpNode {
const node::InsertStmt* GetInsertStmt() const { return insert_stmt_; }
+ static PhysicalInsertNode *CastFrom(PhysicalOpNode *node);
+
private:
const node::InsertStmt* insert_stmt_;
};
diff --git a/hybridse/src/sdk/hybridse_interface_core.i b/hybridse/src/sdk/hybridse_interface_core.i
index 71cd2aab39a..99a997ec1a9 100644
--- a/hybridse/src/sdk/hybridse_interface_core.i
+++ b/hybridse/src/sdk/hybridse_interface_core.i
@@ -210,3 +210,4 @@ using hybridse::node::DataType;
%template(VectorDataType) std::vector;
%template(ExprNodeVector) std::vector;
+%template(VectorString) std::vector;
diff --git a/hybridse/src/vm/physical_op.cc b/hybridse/src/vm/physical_op.cc
index 290ac202d8e..7c0e33bdf1b 100644
--- a/hybridse/src/vm/physical_op.cc
+++ b/hybridse/src/vm/physical_op.cc
@@ -548,6 +548,11 @@ PhysicalLimitNode* PhysicalLimitNode::CastFrom(PhysicalOpNode* node) { return dy
PhysicalRenameNode* PhysicalRenameNode::CastFrom(PhysicalOpNode* node) {
return dynamic_cast(node);
}
+
+PhysicalInsertNode* PhysicalInsertNode::CastFrom(PhysicalOpNode* node) {
+ return dynamic_cast(node);
+}
+
void PhysicalConstProjectNode::Print(std::ostream& output, const std::string& tab) const {
PhysicalOpNode::Print(output, tab);
}
diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/SparkPlanner.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/SparkPlanner.scala
index 0cf2470dcf8..6a6d60d51b7 100644
--- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/SparkPlanner.scala
+++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/SparkPlanner.scala
@@ -20,14 +20,14 @@ import com._4paradigm.hybridse.`type`.TypeOuterClass.Database
import com._4paradigm.hybridse.node.{DataType, JoinType}
import com._4paradigm.hybridse.sdk.{SqlEngine, UnsupportedHybridSeException}
import com._4paradigm.hybridse.vm.{CoreAPI, Engine, PhysicalConstProjectNode, PhysicalCreateTableNode,
- PhysicalDataProviderNode, PhysicalFilterNode, PhysicalGroupAggrerationNode, PhysicalGroupNode, PhysicalJoinNode,
- PhysicalLimitNode, PhysicalLoadDataNode, PhysicalOpNode, PhysicalOpType, PhysicalProjectNode, PhysicalRenameNode,
- PhysicalSelectIntoNode, PhysicalSimpleProjectNode, PhysicalSortNode, PhysicalTableProjectNode,
- PhysicalWindowAggrerationNode, ProjectType, PhysicalSetOperationNode}
+ PhysicalDataProviderNode, PhysicalFilterNode, PhysicalGroupAggrerationNode, PhysicalGroupNode, PhysicalInsertNode,
+ PhysicalJoinNode, PhysicalLimitNode, PhysicalLoadDataNode, PhysicalOpNode, PhysicalOpType, PhysicalProjectNode,
+ PhysicalRenameNode, PhysicalSelectIntoNode, PhysicalSetOperationNode, PhysicalSimpleProjectNode, PhysicalSortNode,
+ PhysicalTableProjectNode, PhysicalWindowAggrerationNode, ProjectType}
import com._4paradigm.openmldb.batch.api.OpenmldbSession
import com._4paradigm.openmldb.batch.nodes.{ConstProjectPlan, CreateTablePlan, DataProviderPlan, FilterPlan,
- GroupByAggregationPlan, GroupByPlan, JoinPlan, LimitPlan, LoadDataPlan, RenamePlan, RowProjectPlan, SelectIntoPlan,
- SimpleProjectPlan, SortByPlan, WindowAggPlan, SetOperationPlan}
+ GroupByAggregationPlan, GroupByPlan, InsertPlan, JoinPlan, LimitPlan, LoadDataPlan, RenamePlan, RowProjectPlan,
+ SelectIntoPlan, SetOperationPlan, SimpleProjectPlan, SortByPlan, WindowAggPlan}
import com._4paradigm.openmldb.batch.utils.{DataTypeUtil, ExternalUdfUtil, GraphvizUtil, HybridseUtil, NodeIndexInfo,
NodeIndexType}
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor
@@ -35,6 +35,7 @@ import com._4paradigm.std.VectorDataType
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
+
import scala.collection.JavaConverters.seqAsJavaList
import scala.collection.mutable
import scala.reflect.io.File
@@ -273,6 +274,8 @@ class SparkPlanner(session: SparkSession, config: OpenmldbBatchConfig, sparkAppN
CreateTablePlan.gen(ctx, PhysicalCreateTableNode.CastFrom(root))
case PhysicalOpType.kPhysicalOpSetOperation =>
SetOperationPlan.gen(ctx, PhysicalSetOperationNode.CastFrom(root), children)
+ case PhysicalOpType.kPhysicalOpInsert =>
+ InsertPlan.gen(ctx, PhysicalInsertNode.CastFrom(root))
case _ =>
throw new UnsupportedHybridSeException(s"Plan type $opType not supported")
}
diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/InsertPlan.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/InsertPlan.scala
new file mode 100644
index 00000000000..36c721891d0
--- /dev/null
+++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/InsertPlan.scala
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2021 4Paradigm
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com._4paradigm.openmldb.batch.nodes
+
+import com._4paradigm.hybridse.node.ExprNode
+import com._4paradigm.hybridse.vm.PhysicalInsertNode
+import com._4paradigm.openmldb.batch.utils.SparkRowUtil
+import com._4paradigm.openmldb.batch.{PlanContext, SparkInstance}
+import com._4paradigm.openmldb.proto.Common.ColumnDesc
+import com._4paradigm.openmldb.proto.NS.OfflineTableInfo
+import com._4paradigm.std.{ExprNodeVector, VectorString}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{BooleanType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType,
+ ShortType, StringType, StructField, StructType, TimestampType}
+import org.slf4j.LoggerFactory
+
+import java.sql.{Date, Timestamp}
+import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+
+object InsertPlan {
+ case class ColInfo(colDesc: ColumnDesc, field: StructField)
+
+ private val logger = LoggerFactory.getLogger(this.getClass)
+
+ def gen(ctx: PlanContext, node: PhysicalInsertNode): SparkInstance = {
+ val stmt = node.GetInsertStmt()
+ require(stmt != null, "Fail to get insert statement")
+
+ val insertMode = stmt.getInsert_mode_
+ require("DEFAULT_MODE".equals(insertMode.toString), s"insert mode: $insertMode is unsupported in offline mode")
+
+ val dbInStmt = stmt.getDb_name_
+ val db = if (dbInStmt.nonEmpty) dbInStmt else ctx.getConf.defaultDb
+ val table = stmt.getTable_name_
+ val tableInfo = ctx.getOpenmldbSession.openmldbCatalogService.getTableInfo(db, table)
+ require(tableInfo != null && tableInfo.getName.nonEmpty,
+ s"table $db.$table info is not existed(no table name): $tableInfo")
+
+ val hasOfflineTableInfo = tableInfo.hasOfflineTableInfo
+ logger.info(s"hasOfflineTableInfo: $hasOfflineTableInfo")
+ if (hasOfflineTableInfo) {
+ val symbolicPaths = tableInfo.getOfflineTableInfo.getSymbolicPathsList
+ require(symbolicPaths == null || symbolicPaths.isEmpty, "can't insert into table with soft copied data")
+ }
+
+ val colDescList = tableInfo.getColumnDescList
+ var oriSchema = new StructType
+ val colInfoMap = mutable.Map[String, ColInfo]()
+ colDescList.foreach(col => {
+ val colName = col.getName
+ val field = StructField(colName, SparkRowUtil.protoTypeToScalaType(col.getDataType), !col.getNotNull)
+ oriSchema = oriSchema.add(field)
+ colInfoMap.put(colName, ColInfo(col, field))
+ })
+
+ val (insertSchema, insertColInfos, stmtColNum) = parseInsertCols(stmt.getColumns_, oriSchema, colInfoMap)
+ val insertRows = parseInsertRows(stmt.getValues_, insertColInfos, stmtColNum)
+
+ val spark = ctx.getSparkSession
+ var insertDf = spark.createDataFrame(spark.sparkContext.parallelize(insertRows), insertSchema)
+ val schemaDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], oriSchema)
+ insertDf = schemaDf.unionByName(insertDf, allowMissingColumns = true)
+
+ val offlineDataPath = getOfflineDataPath(ctx, db, table)
+ val newTableInfoBuilder = tableInfo.toBuilder
+ if (!hasOfflineTableInfo) {
+ val newOfflineInfo = OfflineTableInfo
+ .newBuilder()
+ .setPath(offlineDataPath)
+ .setFormat("parquet")
+ .build()
+ newTableInfoBuilder.setOfflineTableInfo(newOfflineInfo)
+ }
+ val newTableInfo = newTableInfoBuilder.build()
+
+ insertDf.write.mode("append").format(newTableInfo.getOfflineTableInfo.getFormat).save(offlineDataPath)
+ if (!hasOfflineTableInfo) {
+ ctx.getOpenmldbSession.openmldbCatalogService.updateOfflineTableInfo(newTableInfo)
+ }
+
+ SparkInstance.fromDataFrame(spark.emptyDataFrame)
+ }
+
+ def parseInsertCols(cols: VectorString, oriSchema: StructType, colInfoMap: mutable.Map[String, ColInfo]):
+ (StructType, ArrayBuffer[ColInfo], Int) = {
+ var insertSchema = new StructType
+ val insertColInfos = ArrayBuffer[ColInfo]()
+ var stmtColNum = 0
+ if (cols == null || cols.size() == 0) {
+ insertSchema = oriSchema
+ insertSchema.foreach(field => insertColInfos += colInfoMap(field.name))
+ stmtColNum = oriSchema.size
+ } else {
+ stmtColNum = cols.size()
+ val insertColSet = mutable.Set[String]()
+ for (i <- 0 until stmtColNum) {
+ val colName = cols.get(i)
+ require(colInfoMap.contains(colName), s"Fail to get insert info--can't recognize column $colName")
+
+ val colInfo = colInfoMap(colName)
+ insertColInfos += colInfo
+ insertSchema = insertSchema.add(colInfo.field)
+ insertColSet.add(colName)
+ }
+
+ for ((colName, colInfo) <- colInfoMap) {
+ val colDesc = colInfo.colDesc
+ if (colDesc.hasDefaultValue && !insertColSet.contains(colName)) {
+ val colInfo = colInfoMap(colName)
+ insertColInfos += colInfo
+ insertSchema = insertSchema.add(colInfo.field)
+ insertColSet.add(colName)
+ }
+ require(!colDesc.getNotNull || insertColSet.contains(colName),
+ s"Fail to get insert info--require not null column ${colName}")
+ }
+ }
+
+ (insertSchema, insertColInfos, stmtColNum)
+ }
+
+ def parseInsertRows(valuesExpr: ExprNodeVector, insertColInfos: ArrayBuffer[ColInfo], stmtColNum: Int):
+ ListBuffer[Row] = {
+ val defaultValues = getDefaultColValues(insertColInfos, stmtColNum)
+
+ val insertRows = ListBuffer[Row]()
+ for (i <- 0 until valuesExpr.size()) {
+ val rowExpr = valuesExpr.get(i)
+ var rowValues = ListBuffer[Any]()
+ try {
+ rowValues = parseRowExpr(rowExpr, insertColInfos, stmtColNum)
+ } catch {
+ case _: IllegalArgumentException | _: NumberFormatException =>
+ throw new IllegalArgumentException(
+ s"Fail to get insert info--fail to parse row[$i]: ${rowExpr.GetExprString()}"
+ )
+ }
+
+ rowValues = rowValues ++ defaultValues
+ insertRows += Row.fromSeq(rowValues)
+ }
+ insertRows
+ }
+
+ def getDefaultColValues(insertColInfos: ArrayBuffer[ColInfo], stmtColNum: Int): ListBuffer[Any] = {
+ val defaultValues = ListBuffer[Any]()
+ for (i <- stmtColNum until insertColInfos.size) {
+ val colInfo = insertColInfos(i)
+ defaultValues += castVal(colInfo.colDesc.getDefaultValue, colInfo.field.dataType)
+ }
+ defaultValues
+ }
+
+ def parseRowExpr(rowExpr: ExprNode, insertColInfos: ArrayBuffer[ColInfo], stmtColNum: Int): ListBuffer[Any] = {
+ val valNum = rowExpr.GetChildNum()
+ require(valNum == stmtColNum)
+
+ val rowValues = ListBuffer[Any]()
+ for (i <- 0 until valNum) {
+ val valueExpr = rowExpr.GetChild(i)
+ val colInfo = insertColInfos(i)
+ val value = castVal(valueExpr.GetExprString(), colInfo.field.dataType)
+ require(value != null || !colInfo.colDesc.getNotNull)
+
+ rowValues += value
+ }
+ rowValues
+ }
+
+ def castVal(oriStr: String, dataType: DataType): Any = {
+ if (dataType == StringType) {
+ return oriStr
+ }
+ if ("null".equals(oriStr.toLowerCase)) {
+ return null
+ }
+ dataType match {
+ case BooleanType => oriStr.toBoolean
+ case ShortType => oriStr.toShort
+ case LongType => oriStr.toLong
+ case IntegerType => oriStr.toInt
+ case FloatType => oriStr.toFloat
+ case DoubleType => oriStr.toDouble
+ case DateType => Date.valueOf(oriStr)
+ case TimestampType => castStr2Timestamp(oriStr)
+ case StringType => oriStr
+ }
+ }
+
+ def castStr2Timestamp(oriStr: String): Timestamp = {
+ try {
+ Timestamp.valueOf(oriStr)
+ } catch {
+ case _: IllegalArgumentException =>
+ new Timestamp(oriStr.toLong)
+ }
+ }
+
+ def getOfflineDataPath(ctx: PlanContext, db: String, table: String): String = {
+ val offlineDataPrefix = if (ctx.getConf.offlineDataPrefix.endsWith("/")) {
+ ctx.getConf.offlineDataPrefix.dropRight(1)
+ } else {
+ ctx.getConf.offlineDataPrefix
+ }
+ s"$offlineDataPrefix/$db/$table"
+ }
+}
diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala
index 7f87c55ffce..887c40bd8f4 100644
--- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala
+++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala
@@ -40,7 +40,11 @@ object LoadDataPlan {
val (format, options, mode, extra) = HybridseUtil.parseOptions(inputFile, node)
// load have the option deep_copy
val deepCopy = extra.get("deep_copy").get.toBoolean
-
+ // auto schema conversion option skip_cvt
+ val skipCvt = (storage, format) match {
+ case ("online", "tidb") => extra.getOrElse("skip_cvt", "false").toBoolean
+ case _ => false
+ }
require(ctx.getOpenmldbSession != null, "LOAD DATA must use OpenmldbSession, not SparkSession")
val info = ctx.getOpenmldbSession.openmldbCatalogService.getTableInfo(db, table)
@@ -52,7 +56,7 @@ object LoadDataPlan {
// we read input file even in soft copy,
// cause we want to check if "the input file schema == openmldb table schema"
val df = DataSourceUtil.autoLoad(ctx.getOpenmldbSession, inputFile, format, options, info.getColumnDescList,
- loadDataSql)
+ loadDataSql, skipCvt)
// write
logger.info("write data to storage {}, writer mode {}, is deep {}", storage, mode, deepCopy.toString)
diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala
index 4f366774cd5..e42fcc4b18b 100644
--- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala
+++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala
@@ -32,7 +32,6 @@ object SelectIntoPlan {
logger.debug("select {} rows", input.getDf().count())
input.getDf().show(10)
}
-
// write options don't need deepCopy, may have coalesce
val (format, options, mode, extra) = HybridseUtil.parseOptions(outPath, node)
if (input.getSchema.size == 0 && input.getDf().isEmpty) {
@@ -66,10 +65,10 @@ object SelectIntoPlan {
} else {
logger.info("offline select into: format[{}], options[{}], write mode[{}], out path {}", format, options,
- mode, outPath)
+ mode, outPath)
var ds = input.getDf()
val coalesce = extra.get("coalesce").map(_.toInt)
- if (coalesce.nonEmpty && coalesce.get > 0){
+ if (coalesce.nonEmpty && coalesce.get > 0) {
ds = ds.coalesce(coalesce.get)
logger.info("coalesce to {} part", coalesce.get)
}
diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtil.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtil.scala
index 12de283497c..1fba8b9dbfc 100644
--- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtil.scala
+++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtil.scala
@@ -26,9 +26,9 @@ import com._4paradigm.openmldb.proto
import com._4paradigm.openmldb.proto.Common
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.functions.{col, first}
-import org.apache.spark.sql.types.{BooleanType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType,
- ShortType, StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{IntegerType, LongType, ShortType, StructType}
import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
+import _root_.org.apache.spark.sql.Column
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters.asScalaBufferConverter
@@ -41,12 +41,13 @@ object DataSourceUtil {
def autoLoad(openmldbSession: OpenmldbSession, file: String, format: String, options: Map[String, String],
columns: util.List[Common.ColumnDesc]): DataFrame = {
- autoLoad(openmldbSession, file, List.empty[String], format, options, columns, "")
+ autoLoad(openmldbSession, file, List.empty[String], format, options, columns, "", skipCvt = false)
}
def autoLoad(openmldbSession: OpenmldbSession, file: String, format: String, options: Map[String, String],
- columns: util.List[Common.ColumnDesc], loadDataSql: String): DataFrame = {
- autoLoad(openmldbSession, file, List.empty[String], format, options, columns, loadDataSql)
+ columns: util.List[Common.ColumnDesc], loadDataSql: String,
+ skipCvt: Boolean): DataFrame = {
+ autoLoad(openmldbSession, file, List.empty[String], format, options, columns, loadDataSql, skipCvt)
}
// otherwise isCatalog
@@ -60,7 +61,30 @@ object DataSourceUtil {
}
private def checkSchemaIgnoreNullable(actual: StructType, expect: StructType): Boolean = {
- actual.zip(expect).forall{case (a, b) => (a.name, a.dataType) == (b.name, b.dataType)}
+ actual.zip(expect).forall { case (a, b) => (a.name, a.dataType) == (b.name, b.dataType) }
+ }
+
+ private def checkSchemaColumnsName(actual: StructType, expect: StructType): Boolean = {
+ actual.zip(expect).forall { case (a, b) => (a.name) == (b.name) }
+ }
+
+ private def getMappingSchemaColumnsForTidb(actual: StructType, expect: StructType):
+ Seq[Column] = {
+ actual.zip(expect).flatMap { case (a, b) =>
+ if (a.name == b.name) {
+ if (a.dataType == b.dataType) {
+ Seq(col(b.name))
+ } else if (a.dataType == LongType && b.dataType == IntegerType) {
+ Seq(col(a.name).cast(IntegerType).alias(b.name))
+ } else if (a.dataType == LongType && b.dataType == ShortType) {
+ Seq(col(a.name).cast(ShortType).alias(b.name))
+ } else {
+ Seq.empty
+ }
+ } else {
+ Seq.empty
+ }
+ }
}
// Load df from file **and** symbol paths, they should in the same format and options.
@@ -74,52 +98,82 @@ object DataSourceUtil {
// We use OpenmldbSession for running sparksql in hiveLoad. If in 4pd Spark distribution, SparkSession.sql
// will do openmldbSql first, and if DISABLE_OPENMLDB_FALLBACK, we can't use sparksql.
def autoLoad(openmldbSession: OpenmldbSession, file: String, symbolPaths: List[String], format: String,
- options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String = "")
- : DataFrame = {
+ options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String = ""
+ , skipCvt: Boolean = false)
+ : DataFrame = {
val fmt = format.toLowerCase
- if (isCatalog(fmt)) {
- logger.info(s"load data from catalog table, format $fmt, paths: $file $symbolPaths")
- if (file.isEmpty) {
- // no file, read all symbol paths
- var outputDf: DataFrame = null
- symbolPaths.zipWithIndex.foreach { case (path, index) =>
- if (index == 0) {
- outputDf = catalogLoad(openmldbSession, path, columns, loadDataSql)
- } else {
- outputDf = outputDf.union(catalogLoad(openmldbSession, path, columns, loadDataSql))
- }
- }
- outputDf
+ val isCataLog = isCatalog(fmt)
+ val fileTypeDesc = if (isCataLog) "catalog table" else "file"
+ logger.info(s"load data from ${fileTypeDesc} ${file} & ${symbolPaths} " +
+ s"reader[format ${fmt}, options ${options}, skipCvt ${skipCvt}]")
+ val getDataLoad = (path: String) => {
+ val df: DataFrame = if (isCataLog) {
+ catalogLoad(openmldbSession, path, fmt, options, columns, loadDataSql)
} else {
- var outputDf = catalogLoad(openmldbSession, file, columns, loadDataSql)
- for (path: String <- symbolPaths) {
- outputDf = outputDf.union(catalogLoad(openmldbSession, path, columns, loadDataSql))
- }
- outputDf
+ autoFileLoad(openmldbSession, path, fmt, options, columns, loadDataSql)
}
- } else {
- logger.info("load data from file {} & {} reader[format {}, options {}]", file, symbolPaths, fmt, options)
-
- if (file.isEmpty) {
- var outputDf: DataFrame = null
- symbolPaths.zipWithIndex.foreach { case (path, index) =>
- if (index == 0) {
- outputDf = autoFileLoad(openmldbSession, path, fmt, options, columns, loadDataSql)
- } else {
- outputDf = outputDf.union(autoFileLoad(openmldbSession, path, fmt, options, columns,
- loadDataSql))
- }
+ if (columns != null) {
+ if (skipCvt) {
+ val (oriSchema, _, _) = HybridseUtil.extractOriginAndReadSchema(columns)
+ require(checkSchemaColumnsName(df.schema, oriSchema),
+ s"schema mismatch(name), loaded data ${df.schema}!= table $oriSchema, check $file")
+ df
+ } else {
+ autoSchemaMappingAndCheck(df, file, format, columns)
}
- outputDf
} else {
- var outputDf = autoFileLoad(openmldbSession, file, fmt, options, columns, loadDataSql)
- for (path: String <- symbolPaths) {
- outputDf = outputDf.union(autoFileLoad(openmldbSession, path, fmt, options, columns,
- loadDataSql))
+ df
+ }
+ }
+ if (file.isEmpty) {
+ // no file, read all symbol paths
+ var outputDf: DataFrame = null
+ symbolPaths.zipWithIndex.foreach { case (path, index) =>
+ if (index == 0) {
+ outputDf = getDataLoad(path)
+ } else {
+ outputDf = outputDf.union(getDataLoad(path))
}
- outputDf
}
+ outputDf
+ } else {
+ var outputDf: DataFrame = getDataLoad(file)
+ for (path: String <- symbolPaths) {
+ outputDf = outputDf.union(getDataLoad(path))
+ }
+ outputDf
+ }
+ }
+
+ private def autoSchemaMappingAndCheck(df: DataFrame, file: String, format: String,
+ columns: util.List[Common.ColumnDesc]): DataFrame = {
+ var resultDf = df
+ val (oriSchema, _, _) = HybridseUtil.extractOriginAndReadSchema(columns)
+ // tidb schema mapping
+ if (format == "tidb" && !checkSchemaIgnoreNullable(resultDf.schema, oriSchema)) {
+ val convertedColumns = getMappingSchemaColumnsForTidb(resultDf.schema, oriSchema)
+ if (convertedColumns.length != oriSchema.length) {
+ throw new IllegalArgumentException(s"tidb schema mapping failed, " +
+ s"loaded tidb ${resultDf.schema}!= table $oriSchema, check $file")
+ }
+ logger.info(s"convert tidb data columns, convert select: ${convertedColumns}, table: $oriSchema")
+ resultDf = resultDf.select(convertedColumns: _*)
}
+ // check schema
+ (isCatalog(format), format) match {
+ case (true, "tidb") | (false, "parquet") =>
+ //df.schema == oriSchema, hive table always nullable?
+ require(checkSchemaIgnoreNullable(resultDf.schema, oriSchema),
+ s"schema mismatch(name and dataType), loaded data ${resultDf.schema}!= table $oriSchema, check $file")
+ if (!resultDf.schema.equals(oriSchema)) {
+ logger.info(s"df schema: ${resultDf.schema}, reset schema")
+ resultDf = resultDf.sqlContext.createDataFrame(resultDf.rdd, oriSchema)
+ }
+ case _ =>
+ require(resultDf.schema == oriSchema, s"schema mismatch, " +
+ s"loaded ${resultDf.schema} != table $oriSchema, check $file")
+ }
+ resultDf
}
// We want df with oriSchema, but if the file format is csv:
@@ -127,12 +181,13 @@ object DataSourceUtil {
// 2. spark read may change the df schema to all nullable
// So we should fix it.
private def autoFileLoad(openmldbSession: OpenmldbSession, file: String, format: String,
- options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String): DataFrame = {
+ options: Map[String, String], columns: util.List[Common.ColumnDesc],
+ loadDataSql: String): DataFrame = {
require(format.equals("csv") || format.equals("parquet"), s"unsupported format $format")
val reader = openmldbSession.getSparkSession.read.options(options)
val (oriSchema, readSchema, tsCols) = HybridseUtil.extractOriginAndReadSchema(columns)
- var df = if (format.equals("parquet")) {
+ if (format.equals("parquet")) {
// When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.
// ref https://spark.apache.org/docs/3.2.1/sql-data-sources-parquet.html
val df = if (loadDataSql != null && loadDataSql.nonEmpty) {
@@ -141,11 +196,7 @@ object DataSourceUtil {
} else {
reader.format(format).load(file)
}
-
- require(checkSchemaIgnoreNullable(df.schema, oriSchema),
- s"schema mismatch(ignore nullable), loaded ${df.schema}!= table $oriSchema, check $file")
- // reset nullable property
- df.sqlContext.createDataFrame(df.rdd, oriSchema)
+ df
} else {
// csv should auto detect the timestamp format
reader.format(format)
@@ -176,13 +227,10 @@ object DataSourceUtil {
if (!df.schema.equals(oriSchema)) {
logger.info(s"df schema: ${df.schema}, reset schema")
df.sqlContext.createDataFrame(df.rdd, oriSchema)
- } else{
+ } else {
df
}
}
-
- require(df.schema == oriSchema, s"schema mismatch, loaded ${df.schema} != table $oriSchema, check $file")
- df
}
// path can have prefix or not, we should remove it if exists
@@ -190,8 +238,9 @@ object DataSourceUtil {
path.split("://").last
}
- private def catalogLoad(openmldbSession: OpenmldbSession, file: String, columns: util.List[Common.ColumnDesc],
- loadDataSql: String = ""): DataFrame = {
+ private def catalogLoad(openmldbSession: OpenmldbSession, file: String, format: String,
+ options: Map[String, String], columns: util.List[Common.ColumnDesc],
+ loadDataSql: String = ""): DataFrame = {
if (logger.isDebugEnabled()) {
logger.debug("session catalog {}", openmldbSession.getSparkSession.sessionState.catalog)
openmldbSession.sparksql("show tables").show()
@@ -207,22 +256,6 @@ object DataSourceUtil {
logger.debug(s"read dataframe schema: ${df.schema}, count: ${df.count()}")
df.show(10)
}
-
- if (columns != null) {
- val (oriSchema, readSchema, tsCols) = HybridseUtil.extractOriginAndReadSchema(columns)
-
- require(checkSchemaIgnoreNullable(df.schema, oriSchema), //df.schema == oriSchema, hive table always nullable?
- s"schema mismatch(ignore nullable), loaded hive ${df.schema}!= table $oriSchema, check $file")
-
- if (!df.schema.equals(oriSchema)) {
- logger.info(s"df schema: ${df.schema}, reset schema")
- df.sqlContext.createDataFrame(df.rdd, oriSchema)
- } else{
- df
- }
- } else {
- df
- }
-
+ df
}
}
diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala
index 398f674f293..9de50a8bfc8 100644
--- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala
+++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala
@@ -22,12 +22,15 @@ import com._4paradigm.hybridse.node.ConstNode
import com._4paradigm.hybridse.sdk.UnsupportedHybridSeException
import com._4paradigm.hybridse.vm.{PhysicalLoadDataNode, PhysicalOpNode, PhysicalSelectIntoNode}
import com._4paradigm.openmldb.batch.api.OpenmldbSession
+import com._4paradigm.openmldb.batch.{PlanContext}
import com._4paradigm.openmldb.proto
import com._4paradigm.openmldb.proto.Common
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.functions.{col, first}
-import org.apache.spark.sql.types.{BooleanType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType,
- ShortType, StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{
+ BooleanType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType,
+ ShortType, StringType, StructField, StructType, TimestampType
+}
import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
import org.slf4j.LoggerFactory
@@ -134,7 +137,7 @@ object HybridseUtil {
}
def createUnsafeGroupKeyComparator(keyIdxs: Array[Int], dataTypes: Array[DataType]):
- (UnsafeRow, UnsafeRow) => Boolean = {
+ (UnsafeRow, UnsafeRow) => Boolean = {
// TODO(tobe): check for different data types
if (keyIdxs.length == 1) {
@@ -216,7 +219,8 @@ object HybridseUtil {
// If file starts with 'openmldb', format is openmldb, not the detail format in openmldb
// Others, format is the origin format option
// **Result**: format, options(spark write/read options), mode is common, if more options, set them to extra map
- def parseOptions[T](file: String, node: T): (String, Map[String, String], String, Map[String, String]) = {
+ def parseOptions[T](file: String, node: T):
+ (String, Map[String, String], String, Map[String, String]) = {
// load data: read format, select into: write format
// parse hive/iceberg to avoid user forget to set format
val format = if (file.toLowerCase().startsWith("hive://")) {
@@ -234,7 +238,7 @@ object HybridseUtil {
// load data: read options, select into: write options
// parquet/hive format doesn't support any option now, consistent with write options(empty) when deep copy
val options: mutable.Map[String, String] = mutable.Map()
- if (format.equals("csv")){
+ if (format.equals("csv")) {
// default values: https://spark.apache.org/docs/3.2.1/sql-data-sources-csv.html
// delimiter -> sep: ,(the same with spark3 default sep)
// header: true(different with spark)
@@ -272,7 +276,8 @@ object HybridseUtil {
extraOptions += ("coalesce" -> parseOption(getOptionFromNode(node, "coalesce"), "0", getIntOrDefault))
extraOptions += ("create_if_not_exists" -> parseOption(getOptionFromNode(node, "create_if_not_exists"),
"true", getBoolOrDefault))
-
+ extraOptions += ("skip_cvt" -> parseOption(getOptionFromNode(node, "skip_cvt"),
+ "false", getBoolOrDefault))
(format, options.toMap, mode, extraOptions.toMap)
}
diff --git a/java/openmldb-batch/src/test/resources/insert_test_src/test.csv b/java/openmldb-batch/src/test/resources/insert_test_src/test.csv
new file mode 100644
index 00000000000..73b76fd411b
--- /dev/null
+++ b/java/openmldb-batch/src/test/resources/insert_test_src/test.csv
@@ -0,0 +1,3 @@
+c1_x, c2_x, c3_x
+1, 111, 1.1
+2, null, 0.0
\ No newline at end of file
diff --git a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/TestInsertPlan.scala b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/TestInsertPlan.scala
new file mode 100644
index 00000000000..b0cdde88ec7
--- /dev/null
+++ b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/TestInsertPlan.scala
@@ -0,0 +1,249 @@
+/*
+ * Copyright 2021 4Paradigm
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com._4paradigm.openmldb.batch
+
+import com._4paradigm.openmldb.batch.api.OpenmldbSession
+import com._4paradigm.openmldb.batch.utils.SparkUtil
+import com._4paradigm.openmldb.proto.NS.TableInfo
+import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types.{BooleanType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType,
+ StructField, StructType, TimestampType}
+
+import java.sql.{Date, Timestamp}
+import java.util.Properties
+
+
+class TestInsertPlan extends SparkTestSuite {
+ var sparkSession: SparkSession = _
+ var openmldbSession: OpenmldbSession = _
+ var openmldbConnector: SqlClusterExecutor = _
+ val db = "offline_insert_test"
+
+ override def customizedBefore(): Unit = {
+ val prop = new Properties
+ prop.load(getClass.getResourceAsStream("/test.properties"))
+ val cluster = prop.getProperty("openmldb.zk.cluster", "127.0.0.1:6181")
+ val path = prop.getProperty("openmldb.zk.root.path", "/onebox")
+ sparkSession = getSparkSession
+ sparkSession.conf.set("openmldb.zk.cluster", cluster)
+ sparkSession.conf.set("openmldb.zk.root.path", path)
+
+ openmldbSession = new OpenmldbSession(sparkSession)
+ openmldbConnector = openmldbSession.openmldbCatalogService.sqlExecutor
+ openmldbConnector.createDB(db)
+ openmldbConnector.refreshCatalog()
+ }
+
+ override def customizedAfter(): Unit = {
+ val tables = openmldbConnector.getTableNames(db)
+ tables.forEach(table => openmldbConnector.executeDDL(db, s"drop table $table;"))
+ openmldbConnector.dropDB(db)
+ }
+
+ test("Test multi data type") {
+ val table = "t1"
+ openmldbConnector.executeDDL(db,
+ s"create table $table(c1 string, c2 int, c3 bigint, c4 float, c5 double, c6 timestamp, c7 timestamp," +
+ s" c8 date, c9 bool);")
+ openmldbConnector.refreshCatalog()
+ assert(openmldbConnector.getTableInfo(db, table).getName.nonEmpty)
+
+ val sql = s"insert into $db.$table values ('aa', 1, 5, 1.2, 2.4, '2024-04-08 12:00:00', 1712548801000, " +
+ s"'2024-04-08', true)"
+ openmldbSession.sql(sql)
+ val querySess = new OpenmldbSession(sparkSession)
+ val queryResult = querySess.sql(s"select * from $db.$table")
+
+ val schema = StructType(Seq(
+ StructField("c1", StringType, nullable = true),
+ StructField("c2", IntegerType, nullable = true),
+ StructField("c3", LongType, nullable = true),
+ StructField("c4", FloatType, nullable = true),
+ StructField("c5", DoubleType, nullable = true),
+ StructField("c6", TimestampType, nullable = true),
+ StructField("c7", TimestampType, nullable = true),
+ StructField("c8", DateType, nullable = true),
+ StructField("c9", BooleanType, nullable = true)
+ ))
+ val expectDf = sparkSession.createDataFrame(
+ sparkSession.sparkContext.parallelize(Seq(Row("aa", 1, 5L, 1.2f, 2.4d, Timestamp.valueOf("2024-04-08 12:00:00"),
+ Timestamp.valueOf("2024-04-08 12:00:01"), Date.valueOf("2024-04-08"), true))),
+ schema)
+ assert(SparkUtil.approximateDfEqual(expectDf, queryResult.getSparkDf()))
+ }
+
+ test("Test multi rows") {
+ val table = "t2"
+ openmldbConnector.executeDDL(db, s"create table $table(c1 string, c2 int);")
+ openmldbConnector.refreshCatalog()
+ assert(openmldbConnector.getTableInfo(db, table).getName.nonEmpty)
+
+ val sql = s"insert into $db.$table values ('a', 1), ('b', 2)"
+ openmldbSession.sql(sql)
+
+ val querySess = new OpenmldbSession(sparkSession)
+ val queryResult = querySess.sql(s"select * from $db.$table")
+
+ val schema = StructType(Seq(
+ StructField("c1", StringType, nullable = true),
+ StructField("c2", IntegerType, nullable = true)
+ ))
+ val expectDf = sparkSession.createDataFrame(
+ sparkSession.sparkContext.parallelize(Seq(Row("a", 1), Row("b", 2))),
+ schema)
+ assert(SparkUtil.approximateDfEqual(expectDf, queryResult.getSparkDf()))
+ }
+
+ test("Test random columns and empty column") {
+ val table = "t3"
+ openmldbConnector.executeDDL(db, s"create table $table(c1 string, c2 int);")
+ openmldbConnector.refreshCatalog()
+ assert(openmldbConnector.getTableInfo(db, table).getName.nonEmpty)
+
+ val sql1 = s"insert into $db.$table (c2, c1) values (1, 'a')"
+ openmldbSession.sql(sql1)
+ val sql2 = s"insert into $db.$table (c1) values ('b')"
+ openmldbSession.sql(sql2)
+
+ val querySess = new OpenmldbSession(sparkSession)
+ val queryResult = querySess.sql(s"select * from $db.$table")
+
+ val schema = StructType(Seq(
+ StructField("c1", StringType, nullable = true),
+ StructField("c2", IntegerType, nullable = true)
+ ))
+ val expectDf = sparkSession.createDataFrame(
+ sparkSession.sparkContext.parallelize(Seq(Row("a", 1), Row("b", null))),
+ schema)
+ assert(SparkUtil.approximateDfEqual(expectDf, queryResult.getSparkDf()))
+ }
+
+ test("Test exceptions") {
+ val table = "t4"
+ openmldbConnector.executeDDL(db, s"create table $table(c1 int not null, c2 int);")
+ openmldbConnector.refreshCatalog()
+ assert(openmldbConnector.getTableInfo(db, table).getName.nonEmpty)
+
+ val sql1 = s"insert into $db.$table (c1, c2) values (1, 'a')"
+ assertThrows[IllegalArgumentException](openmldbSession.sql(sql1))
+
+ val sql2 = s"insert into $db.$table (c1, c3) values (1, 1)"
+ assertThrows[IllegalArgumentException](openmldbSession.sql(sql2))
+
+ val sql3 = s"insert into $db.$table values (1, 1, 1)"
+ assertThrows[IllegalArgumentException](openmldbSession.sql(sql3))
+
+ val sql4 = s"insert into $db.$table (c2) values (1)"
+ assertThrows[IllegalArgumentException](openmldbSession.sql(sql4))
+
+ val sql5 = s"insert into $db.$table (c1, c2) values (1)"
+ assertThrows[IllegalArgumentException](openmldbSession.sql(sql5))
+
+ val sql6 = s"insert into $db.$table (c1) values (1, 1)"
+ assertThrows[IllegalArgumentException](openmldbSession.sql(sql6))
+ }
+
+ test("Test column with default value") {
+ val table = "t5"
+ openmldbConnector.executeDDL(db, s"create table $table(c1 int default 1, c2 int, c3 string, c4 string);")
+ openmldbConnector.refreshCatalog()
+ assert(openmldbConnector.getTableInfo(db, table).getName.nonEmpty)
+
+ val sql1 = s"insert into $db.$table (c3) values ('a')"
+ openmldbSession.sql(sql1)
+ val sql2 = s"insert into $db.$table values (5, NuLl, 'NuLl', NuLl)"
+ openmldbSession.sql(sql2)
+ val sql3 = s"insert into $db.$table (c1) values (NULL)"
+ openmldbSession.sql(sql3)
+
+ val querySess = new OpenmldbSession(sparkSession)
+ val queryResult = querySess.sql(s"select * from $db.$table")
+
+ val schema = StructType(Seq(
+ StructField("c1", IntegerType, nullable = true),
+ StructField("c2", IntegerType, nullable = true),
+ StructField("c3", StringType, nullable = true),
+ StructField("c4", StringType, nullable = true)
+ ))
+ val expectDf = sparkSession.createDataFrame(
+ sparkSession.sparkContext.parallelize(Seq(
+ Row(1, null, "a", null),
+ // Now if a column's type is string, and insert value is null, InsertPlan can't judge whether the value is null
+ // itself or null string
+ Row(5, null, "NuLl", "null"),
+ Row(null, null, null, null))),
+ schema)
+ assert(SparkUtil.approximateDfEqual(expectDf, queryResult.getSparkDf()))
+ }
+
+ test("Test table with loaded deep copied data") {
+ val table = "t6"
+ openmldbConnector.executeDDL(db, s"create table $table(c1 int, c2 int64, c3 double);")
+ openmldbConnector.refreshCatalog()
+ assert(openmldbConnector.getTableInfo(db, table).getName.nonEmpty)
+
+ val testFileWithHeader = "file://" + getClass.getResource("/insert_test_src/test.csv")
+ .getPath
+ openmldbSession.sql(s"load data infile '$testFileWithHeader' into table $db.$table " +
+ s"options(format='csv', mode='append', deep_copy=true);")
+ val loadInfo = getLatestTableInfo(db, table)
+ val oldFormat = loadInfo.getOfflineTableInfo.getFormat
+
+ var querySess = new OpenmldbSession(sparkSession)
+ var queryResult = querySess.sql(s"select * from $db.$table")
+ val oldCount = queryResult.count()
+
+ val sql = s"insert into $db.$table values (1, 1, 1)"
+ openmldbSession.sql(sql)
+ val info = getLatestTableInfo(db, table)
+ assert(oldFormat.equals(info.getOfflineTableInfo.getFormat))
+
+ querySess = new OpenmldbSession(sparkSession)
+ queryResult = querySess.sql(s"select * from $db.$table")
+ assert(queryResult.count() == oldCount + 1)
+ }
+
+ def getLatestTableInfo(db: String, table: String): TableInfo = {
+ openmldbConnector.refreshCatalog()
+ openmldbConnector.getTableInfo(db, table)
+ }
+
+ test("Test table with loaded soft copied data") {
+ val table = "t7"
+ openmldbConnector.executeDDL(db, s"create table $table(c1 int, c2 int64, c3 double);")
+ openmldbConnector.refreshCatalog()
+ assert(openmldbConnector.getTableInfo(db, table).getName.nonEmpty)
+
+ val testFileWithHeader = "file://" + getClass.getResource("/insert_test_src/test.csv")
+ .getPath
+ openmldbSession.sql(s"load data infile '$testFileWithHeader' into table $db.$table " +
+ "options(format='csv', mode='append', deep_copy=false);")
+ val newSess = new OpenmldbSession(sparkSession)
+ assertThrows[IllegalArgumentException](newSess.sql(s"insert into $db.$table values (1, 1, 1)"))
+ }
+
+ test("Test insert mode") {
+ val table = "t8"
+ openmldbConnector.executeDDL(db, s"create table $table(c1 string, c2 int);")
+ openmldbConnector.refreshCatalog()
+ assert(openmldbConnector.getTableInfo(db, table).getName.nonEmpty)
+
+ val sql = s"insert or ignore into $db.$table values ('a', 1)"
+ assertThrows[IllegalArgumentException](openmldbSession.sql(sql))
+ }
+}
diff --git a/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/InsertOfflineData.scala b/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/InsertOfflineData.scala
new file mode 100644
index 00000000000..94315ef7621
--- /dev/null
+++ b/java/openmldb-batchjob/src/main/scala/com/_4paradigm/openmldb/batchjob/InsertOfflineData.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2021 4Paradigm
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com._4paradigm.openmldb.batchjob
+
+import com._4paradigm.openmldb.batchjob.util.OpenmldbJobUtil
+import org.apache.spark.sql.SparkSession
+
+object InsertOfflineData {
+
+ def main(args: Array[String]): Unit = {
+ // sql
+ OpenmldbJobUtil.checkArgumentSize(args, 1)
+ insertOfflineData(args(0))
+ }
+
+ def insertOfflineData(sqlFilePath: String): Unit = {
+ val builder = SparkSession.builder()
+ val spark = builder.getOrCreate()
+ OpenmldbJobUtil.runOpenmldbSql(spark, sqlFilePath)
+ }
+
+}
diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerInterface.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerInterface.java
index cfb9226c1ed..814c771bc9f 100644
--- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerInterface.java
+++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerInterface.java
@@ -50,6 +50,9 @@ public interface TaskManagerInterface {
@BrpcMeta(serviceName = "openmldb.taskmanager.TaskManagerServer", methodName = "ExportOfflineData")
TaskManager.ShowJobResponse ExportOfflineData(TaskManager.ExportOfflineDataRequest request);
+ @BrpcMeta(serviceName = "openmldb.taskmanager.TaskManagerServer", methodName = "InsertOfflineData")
+ TaskManager.ShowJobResponse InsertOfflineData(TaskManager.InsertOfflineDataRequest request);
+
@BrpcMeta(serviceName = "openmldb.taskmanager.TaskManagerServer", methodName = "DropOfflineTable")
TaskManager.DropOfflineTableResponse DropOfflineTable(TaskManager.DropOfflineTableRequest request);
diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java
index 695338925d8..68e4e9036b4 100644
--- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java
+++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java
@@ -350,6 +350,23 @@ public TaskManager.ShowJobResponse ExportOfflineData(TaskManager.ExportOfflineDa
}
}
+ @Override
+ public TaskManager.ShowJobResponse InsertOfflineData(TaskManager.InsertOfflineDataRequest request) {
+ try {
+ JobInfo jobInfo = OpenmldbBatchjobManager.insertOfflineData(request.getSql(), request.getConfMap(),
+ request.getDefaultDb());
+ if (request.getSyncJob()) {
+ // wait for final state
+ jobInfo = waitJobInfoWrapper(jobInfo.getId());
+ }
+ return TaskManager.ShowJobResponse.newBuilder().setCode(StatusCode.SUCCESS).setJob(jobInfoToProto(jobInfo))
+ .build();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return TaskManager.ShowJobResponse.newBuilder().setCode(StatusCode.FAILED).setMsg(e.getMessage()).build();
+ }
+ }
+
@Override
public TaskManager.DropOfflineTableResponse DropOfflineTable(TaskManager.DropOfflineTableRequest request) {
try {
diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala
index 6d942b1eb9e..409bd77e671 100644
--- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala
+++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala
@@ -133,4 +133,21 @@ object OpenmldbBatchjobManager {
}
}
+ def insertOfflineData(sql: String, sparkConf: java.util.Map[String, String], defaultDb: String): JobInfo = {
+ val jobType = "InsertOfflineData"
+ val mainClass = "com._4paradigm.openmldb.batchjob.InsertOfflineData"
+
+ val tempSqlFile = SqlFileUtil.createTempSqlFile(sql)
+
+ if (TaskManagerConfig.isK8s) {
+ val args = List(sql)
+ K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
+ defaultDb)
+ } else {
+ val args = List(tempSqlFile.getAbsolutePath)
+ SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
+ defaultDb)
+ }
+ }
+
}
diff --git a/src/base/ddl_parser.cc b/src/base/ddl_parser.cc
index 4f3d631a1fc..d0aa30d168c 100644
--- a/src/base/ddl_parser.cc
+++ b/src/base/ddl_parser.cc
@@ -22,6 +22,7 @@
#include
#include
#include
+#include
#include "codec/schema_codec.h"
#include "google/protobuf/util/message_differencer.h"
@@ -441,6 +442,7 @@ std::vector DDLParser::ValidateSQLInRequest(const std::string& sql,
void IndexMapBuilder::Report(absl::string_view db, absl::string_view table, absl::Span keys,
absl::string_view ts, const PhysicalOpNode* expr_node) {
// we encode table, keys and ts to one string
+ // keys may be dup, dedup in encode
auto index = Encode(db, table, keys, ts);
if (index.empty()) {
LOG(WARNING) << "index encode failed for table " << db << "." << table;
@@ -604,8 +606,8 @@ MultiDBIndexMap IndexMapBuilder::ToMap() {
std::string IndexMapBuilder::Encode(absl::string_view db, absl::string_view table, absl::Span keys,
absl::string_view ts) {
// children are ColumnRefNode
- std::vector cols(keys.begin(), keys.end());
- std::sort(cols.begin(), cols.end());
+ // dedup and sort keys
+ std::set cols(keys.begin(), keys.end());
if (cols.empty()) {
return {};
}
diff --git a/src/base/ddl_parser_test.cc b/src/base/ddl_parser_test.cc
index be35b17934e..acb27fdd042 100644
--- a/src/base/ddl_parser_test.cc
+++ b/src/base/ddl_parser_test.cc
@@ -15,6 +15,7 @@
*/
#include "base/ddl_parser.h"
+
#include
#include "absl/cleanup/cleanup.h"
@@ -50,7 +51,7 @@ std::ostream& operator<<(std::ostream& os, const std::map
// a human readable string for one index: key1,key2,...;ts;. (ts is optional and only one, if no ts, it should be
-// key;;) : type,abs_value,lat_value, e.g. abs,10,0 lat,0,20 abs&lat,10,20 abs||lat,10,20
+// key;;) : type,abs_value,lat_value, e.g. abs,10,0 lat,0,20 absandlat,10,20 absorlat,10,20
void CheckEqual(const IndexMap& map, std::map>&& readable_map) {
auto error_message = [](const IndexMap& map, const std::map>& readable) {
std::stringstream ss;
@@ -121,7 +122,7 @@ void CheckEqual(const IndexMap& map, std::map& config,
+ const std::string& default_db, bool sync_job,
+ int job_timeout,
+ ::openmldb::taskmanager::JobInfo* job_info) {
+ ::openmldb::taskmanager::InsertOfflineDataRequest request;
+ ::openmldb::taskmanager::ShowJobResponse response;
+
+ request.set_sql(sql);
+ request.set_default_db(default_db);
+ request.set_sync_job(sync_job);
+ for (const auto& it : config) {
+ (*request.mutable_conf())[it.first] = it.second;
+ }
+
+ auto st = client_.SendRequestSt(&::openmldb::taskmanager::TaskManagerServer_Stub::InsertOfflineData, &request,
+ &response, job_timeout, 1);
+
+ if (st.OK()) {
+ if (response.code() == 0) {
+ if (response.has_job()) {
+ job_info->CopyFrom(response.job());
+ }
+ }
+ return {response.code(), response.msg()};
+ }
+ return st;
+}
+
::openmldb::base::Status TaskManagerClient::DropOfflineTable(const std::string& db, const std::string& table,
int job_timeout) {
::openmldb::taskmanager::DropOfflineTableRequest request;
diff --git a/src/client/taskmanager_client.h b/src/client/taskmanager_client.h
index c45fa273d85..fd17cd7aeeb 100644
--- a/src/client/taskmanager_client.h
+++ b/src/client/taskmanager_client.h
@@ -72,6 +72,10 @@ class TaskManagerClient : public Client {
const std::string& default_db, bool sync_job, int job_timeout,
::openmldb::taskmanager::JobInfo* job_info);
+ ::openmldb::base::Status InsertOfflineData(const std::string& sql, const std::map& config,
+ const std::string& default_db, bool sync_job, int job_timeout,
+ ::openmldb::taskmanager::JobInfo* job_info);
+
::openmldb::base::Status DropOfflineTable(const std::string& db, const std::string& table, int job_timeout);
::openmldb::base::Status CreateFunction(const std::shared_ptr<::openmldb::common::ExternalFun>& fun,
diff --git a/src/nameserver/new_server_env_test.cc b/src/nameserver/new_server_env_test.cc
index 30a83f268b2..f24a29810ee 100644
--- a/src/nameserver/new_server_env_test.cc
+++ b/src/nameserver/new_server_env_test.cc
@@ -79,6 +79,24 @@ void StartNameServer(brpc::Server& server, const std::string& real_ep) { // NOL
sleep(2);
}
+void StartNameServerWithDelay(brpc::Server& server, const std::string& real_ep) { // NOLINT
+ NameServerImpl* nameserver = new NameServerImpl();
+ bool ok = nameserver->Init(real_ep);
+ sleep(5);
+ ASSERT_EQ(true, nameserver->RegisterName());
+ ASSERT_TRUE(ok);
+ brpc::ServerOptions options;
+ if (server.AddService(nameserver, brpc::SERVER_OWNS_SERVICE) != 0) {
+ PDLOG(WARNING, "Fail to add service");
+ exit(1);
+ }
+ if (server.Start(real_ep.c_str(), &options) != 0) {
+ PDLOG(WARNING, "Fail to start server");
+ exit(1);
+ }
+ sleep(2);
+}
+
void StartTablet(brpc::Server& server, const std::string& real_ep) { // NOLINT
::openmldb::tablet::TabletImpl* tablet = new ::openmldb::tablet::TabletImpl();
bool ok = tablet->Init(real_ep);
@@ -230,6 +248,96 @@ TEST_F(NewServerEnvTest, ShowRealEndpoint) {
}
}
+TEST_F(NewServerEnvTest, ShowRealEndpointDelayNameserverStart) {
+ FLAGS_zk_cluster = "127.0.0.1:6181";
+ FLAGS_zk_root_path = "/rtidb4" + ::openmldb::test::GenRand();
+
+ // ns1
+ FLAGS_use_name = true;
+ FLAGS_endpoint = "ns1";
+ std::string ns_real_ep = "127.0.0.1:9631";
+ brpc::Server ns_server;
+ StartNameServerWithDelay(ns_server, ns_real_ep);
+ ::openmldb::RpcClient<::openmldb::nameserver::NameServer_Stub> name_server_client(ns_real_ep);
+ name_server_client.Init();
+
+ // tablet1
+ FLAGS_use_name = true;
+ FLAGS_endpoint = "tb1";
+ std::string tb_real_ep_1 = "127.0.0.1:9831";
+ ::openmldb::test::TempPath tmp_path;
+ FLAGS_db_root_path = tmp_path.GetTempPath();
+ brpc::Server tb_server1;
+ StartTablet(tb_server1, tb_real_ep_1);
+
+ // tablet2
+ FLAGS_use_name = true;
+ FLAGS_endpoint = "tb2";
+ std::string tb_real_ep_2 = "127.0.0.1:9931";
+ FLAGS_db_root_path = tmp_path.GetTempPath();
+ brpc::Server tb_server2;
+ StartTablet(tb_server2, tb_real_ep_2);
+
+ {
+ std::map map;
+ ShowNameServer(&map);
+ ASSERT_EQ(1u, map.size());
+ auto it = map.find("ns1");
+ if (it != map.end()) {
+ ASSERT_EQ(ns_real_ep, it->second);
+ } else {
+ ASSERT_TRUE(false);
+ }
+ }
+ {
+ // showtablet
+ ::openmldb::nameserver::ShowTabletRequest request;
+ ::openmldb::nameserver::ShowTabletResponse response;
+ bool ok = name_server_client.SendRequest(&::openmldb::nameserver::NameServer_Stub::ShowTablet, &request,
+ &response, FLAGS_request_timeout_ms, 1);
+ ASSERT_TRUE(ok);
+
+ ::openmldb::nameserver::TabletStatus status = response.tablets(0);
+ ASSERT_EQ("tb1", status.endpoint());
+ ASSERT_EQ(tb_real_ep_1, status.real_endpoint());
+ ASSERT_EQ("kHealthy", status.state());
+
+ status = response.tablets(1);
+ ASSERT_EQ("tb2", status.endpoint());
+ ASSERT_EQ(tb_real_ep_2, status.real_endpoint());
+ ASSERT_EQ("kHealthy", status.state());
+ }
+ std::string ns_sdk_ep = "127.0.0.1:8881";
+ std::string tb_sdk_ep_1 = "127.0.0.1:8882";
+ std::string tb_sdk_ep_2 = "127.0.0.1:8883";
+ {
+ // set sdkendpoint
+ SetSdkEndpoint(name_server_client, "ns1", ns_sdk_ep);
+ SetSdkEndpoint(name_server_client, "tb1", tb_sdk_ep_1);
+ SetSdkEndpoint(name_server_client, "tb2", tb_sdk_ep_2);
+ }
+ {
+ // show sdkendpoint
+ ::openmldb::nameserver::ShowSdkEndpointRequest request;
+ ::openmldb::nameserver::ShowSdkEndpointResponse response;
+ bool ok = name_server_client.SendRequest(&::openmldb::nameserver::NameServer_Stub::ShowSdkEndpoint, &request,
+ &response, FLAGS_request_timeout_ms, 1);
+ ASSERT_TRUE(ok);
+
+ auto status = response.tablets(0);
+ ASSERT_EQ("ns1", status.endpoint());
+ ASSERT_EQ(ns_sdk_ep, status.real_endpoint());
+
+ status = response.tablets(1);
+ ASSERT_EQ("tb1", status.endpoint());
+ ASSERT_EQ(tb_sdk_ep_1, status.real_endpoint());
+
+ status = response.tablets(2);
+ ASSERT_EQ("tb2", status.endpoint());
+ ASSERT_EQ(tb_sdk_ep_2, status.real_endpoint());
+ }
+}
+
/*TEST_F(NewServerEnvTest, SyncMultiReplicaData) {
FLAGS_zk_cluster = "127.0.0.1:6181";
FLAGS_zk_root_path = "/rtidb4" + GenRand();
diff --git a/src/proto/taskmanager.proto b/src/proto/taskmanager.proto
index 8328920a9c3..907d0d157b2 100644
--- a/src/proto/taskmanager.proto
+++ b/src/proto/taskmanager.proto
@@ -105,6 +105,13 @@ message ExportOfflineDataRequest {
optional bool sync_job = 4 [default = false];
}
+message InsertOfflineDataRequest {
+ required string sql = 1;
+ map conf = 2;
+ optional string default_db = 3 [default = ""];
+ optional bool sync_job = 4 [default = false];
+}
+
message DropOfflineTableRequest {
required string db = 1;
required string table = 2;
@@ -174,6 +181,7 @@ service TaskManagerServer {
rpc ImportOnlineData(ImportOnlineDataRequest) returns (ShowJobResponse);
rpc ImportOfflineData(ImportOfflineDataRequest) returns (ShowJobResponse);
rpc ExportOfflineData(ExportOfflineDataRequest) returns (ShowJobResponse);
+ rpc InsertOfflineData(InsertOfflineDataRequest) returns (ShowJobResponse);
rpc DropOfflineTable(DropOfflineTableRequest) returns (DropOfflineTableResponse);
rpc GetJobLog(GetJobLogRequest) returns (GetJobLogResponse);
rpc CreateFunction(CreateFunctionRequest) returns (CreateFunctionResponse);
diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc
index b302c080846..e84d2b251f8 100644
--- a/src/sdk/sql_cluster_router.cc
+++ b/src/sdk/sql_cluster_router.cc
@@ -2555,6 +2555,18 @@ ::openmldb::base::Status SQLClusterRouter::ExportOfflineData(const std::string&
return taskmanager_client_ptr->ExportOfflineData(sql, config, default_db, sync_job, job_timeout, job_info);
}
+::openmldb::base::Status SQLClusterRouter::InsertOfflineData(const std::string& sql,
+ const std::map& config,
+ const std::string& default_db, bool sync_job,
+ int job_timeout,
+ ::openmldb::taskmanager::JobInfo* job_info) {
+ auto taskmanager_client_ptr = cluster_sdk_->GetTaskManagerClient();
+ if (!taskmanager_client_ptr) {
+ return {base::ReturnCode::kServerConnError, "Fail to get TaskManager client"};
+ }
+ return taskmanager_client_ptr->InsertOfflineData(sql, config, default_db, sync_job, job_timeout, job_info);
+}
+
::openmldb::base::Status SQLClusterRouter::CreatePreAggrTable(const std::string& aggr_db, const std::string& aggr_table,
const ::openmldb::base::LongWindowInfo& window_info,
const ::openmldb::nameserver::TableInfo& base_table_info,
@@ -2801,14 +2813,21 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL(
return {};
}
case hybridse::node::kPlanTypeInsert: {
- if (cluster_sdk_->IsClusterMode() && !is_online_mode) {
- // Not support for inserting into offline storage
- *status = {StatusCode::kCmdError,
- "Can not insert in offline mode, please set @@SESSION.execute_mode='online'"};
- return {};
+ if (!cluster_sdk_->IsClusterMode() || is_online_mode) {
+ ExecuteInsert(db, sql, status);
+ } else {
+ ::openmldb::taskmanager::JobInfo job_info;
+ std::map config = ParseSparkConfigString(GetSparkConfig());
+ ReadSparkConfFromFile(std::dynamic_pointer_cast(options_)->spark_conf_path, &config);
+ AddUserToConfig(&config);
+
+ auto base_status = InsertOfflineData(sql, config, db, is_sync_job, offline_job_timeout, &job_info);
+ if (base_status.OK()) {
+ return this->GetJobResultSet(job_info.id(), status);
+ } else {
+ *status = {StatusCode::kCmdError, base_status.msg};
+ }
}
- // if db name has been specified in sql, db parameter will be ignored
- ExecuteInsert(db, sql, status);
return {};
}
case hybridse::node::kPlanTypeDeploy: {
diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h
index 154a53f17d6..3d13cafa240 100644
--- a/src/sdk/sql_cluster_router.h
+++ b/src/sdk/sql_cluster_router.h
@@ -261,6 +261,10 @@ class SQLClusterRouter : public SQLRouter {
const std::string& default_db, bool sync_job, int job_timeout,
::openmldb::taskmanager::JobInfo* job_info);
+ ::openmldb::base::Status InsertOfflineData(const std::string& sql, const std::map& config,
+ const std::string& default_db, bool sync_job, int job_timeout,
+ ::openmldb::taskmanager::JobInfo* job_info);
+
::openmldb::base::Status CreatePreAggrTable(const std::string& aggr_db, const std::string& aggr_table,
const ::openmldb::base::LongWindowInfo& window_info,
const ::openmldb::nameserver::TableInfo& base_table_info,
diff --git a/src/zk/zk_client.cc b/src/zk/zk_client.cc
index d2224f4c825..b52027cd4c9 100644
--- a/src/zk/zk_client.cc
+++ b/src/zk/zk_client.cc
@@ -228,14 +228,26 @@ bool ZkClient::RegisterName() {
sname_vec.push_back(*it);
}
}
+
if (std::find(sname_vec.begin(), sname_vec.end(), sname) != sname_vec.end()) {
- std::string ep;
- if (GetNodeValue(names_root_path_ + "/" + sname, ep) && ep == real_endpoint_) {
- LOG(INFO) << "node:" << sname << "value:" << ep << " exist";
- return true;
+ auto node_path = names_root_path_ + "/" + sname;
+ if (auto code = IsExistNode(node_path); code == 0) {
+ std::string ep;
+ if (GetNodeValue(node_path, ep)) {
+ if (ep == real_endpoint_) {
+ LOG(INFO) << "node:" << sname << "value:" << ep << " exist";
+ return true;
+ } else {
+ LOG(WARNING) << "server name:" << sname << " duplicate";
+ return false;
+ }
+ } else {
+ LOG(WARNING) << "server name:" << sname << " GetNodeValue failed";
+ return false;
+ }
+ } else {
+ LOG(INFO) << "node:" << sname << "does not exist";
}
- LOG(WARNING) << "server name:" << sname << " duplicate";
- return false;
}
std::string name = names_root_path_ + "/" + sname;