-
Notifications
You must be signed in to change notification settings - Fork 320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support insert function in offline mode #3854
Conversation
SDK Test Report102 files +1 102 suites +1 2m 12s ⏱️ -1s Results for commit 05dbe85. ± Comparison against base commit 7f758af. This pull request removes 30 and adds 17 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
val newOfflineInfo = OfflineTableInfo | ||
.newBuilder() | ||
.setPath(offlineDataPath) | ||
.setFormat("csv") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format parquet
val spark = ctx.getSparkSession | ||
var insertDf = spark.createDataFrame(spark.sparkContext.parallelize(insertRows), insertSchema) | ||
val schemaDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], oriSchema) | ||
insertDf = schemaDf.unionByName(insertDf, allowMissingColumns = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default value
val offlineDataPath = getOfflineDataPath(ctx, db, table) | ||
val newTableInfoBuilder = tableInfo.toBuilder | ||
val hasOfflineTableInfo = tableInfo.hasOfflineTableInfo | ||
if (!hasOfflineTableInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果有软链接,直接抛异常,拒绝这次写入
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
symbolic path
|
||
|
||
class TestInsertPlan extends SparkTestSuite { | ||
var sparkSession: SparkSession = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
考虑写入已load data的table,对应的情况
add desc about offline insert in |
已做出如下修改点: |
- 默认`INSERT`不会去重,`INSERT OR IGNORE` 则可以忽略已存在于表中的数据,可以反复重试。 | ||
- 离线模式仅支持`INSERT`,不支持`INSERT OR IGNORE` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
还有限制:“离线insert不能用于有软链接的表“,由于format对一张表唯一,如果format为hive等,我们没法给它建硬拷贝地址,并保存insert数据到硬拷贝地址的parquet文件。使用insert只能用户先保证无软链接。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What kind of change does this PR introduce? (Bug fix, feature, docs update, ...)
feature: Enable insert function in offline mode, add corresponding test cases
What is the current behavior? (You can also link to an open issue here)
Insert function is not supported in offline mode
What is the new behavior (if this is a feature change)?
We can use insert in offline mode