From 58d4ca8b92d3ebcd6eaa9801fb0e9834089eaec7 Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Thu, 2 Jun 2022 14:33:18 +0800 Subject: [PATCH 1/6] Add Call Procedure for marker deletion --- .../procedures/DeleteMarkerProcedure.scala | 78 +++++++++++++++++++ .../command/procedures/HoodieProcedures.scala | 1 + .../hudi/procedure/TestCallProcedure.scala | 40 ++++++++++ 3 files changed, 119 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala new file mode 100644 index 000000000000..61226de393b9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.table.HoodieSparkTable +import org.apache.hudi.table.marker.WriteMarkersFactory +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier +import scala.util.{Failure, Success, Try} + +class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "instant_Time", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("delete_marker_result", DataTypes.BooleanType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val basePath: String = getBasePath(tableName) + + val result = Try { + val client = createHoodieClient(jsc, basePath) + val config = client.getConfig + val context = client.getEngineContext + val table = HoodieSparkTable.create(config, context, true) + WriteMarkersFactory.get(config.getMarkersType, table, instantTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism) + } match { + case Success(_) => + logInfo(s"Marker $instantTime deleted.") + true + case Failure(e) => + logWarning(s"Failed: Could not clean marker instantTime: $instantTime.", e) + false + } + + Seq(Row(result)) + } + + override def build: Procedure = new DeleteMarkerProcedure() +} + +object DeleteMarkerProcedure { + val NAME: String = "delete_marker" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): DeleteMarkerProcedure = new DeleteMarkerProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index e7de3e784a2f..2b720bb94d2d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -44,6 +44,7 @@ object HoodieProcedures { mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder) mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder) mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder) + mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala index f75569a1171f..848d09ab62bd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hudi.procedure +import org.apache.hudi.common.model.IOType +import org.apache.hudi.common.testutils.FileCreateUtils import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase class TestCallProcedure extends HoodieSparkSqlTestBase { @@ -131,4 +133,42 @@ class TestCallProcedure extends HoodieSparkSqlTestBase { assertResult(1){commits.length} } } + + test("Test Call delete_marker Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // Check required fields + checkExceptionContain(s"""call delete_marker(table => '$tableName')""")( + s"Argument: instant_Time is required") + + val instantTime = "101" + FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0", IOType.APPEND) + assertResult(1) { + FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND) + } + + checkAnswer(s"""call delete_marker(table => '$tableName', instant_Time => '$instantTime')""")(Seq(true)) + + assertResult(0) { + FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND) + } + } + } } From 0986625246fb299c910cbc6538371035ad3c0c17 Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Thu, 2 Jun 2022 14:48:34 +0800 Subject: [PATCH 2/6] Remove type declaration --- .../sql/hudi/command/procedures/DeleteMarkerProcedure.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala index 61226de393b9..1931a07bb56e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala @@ -45,7 +45,7 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log val tableName = getArgValueOrDefault(args, PARAMETERS(0)) val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] - val basePath: String = getBasePath(tableName) + val basePath = getBasePath(tableName) val result = Try { val client = createHoodieClient(jsc, basePath) From 5940ac85b4a63683d106aead8018583880531aa9 Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Thu, 2 Jun 2022 18:50:25 +0800 Subject: [PATCH 3/6] specify paramenter type --- .../sql/hudi/command/procedures/DeleteMarkerProcedure.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala index 1931a07bb56e..5ac886e2eefa 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi.command.procedures +import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.table.HoodieSparkTable import org.apache.hudi.table.marker.WriteMarkersFactory import org.apache.spark.internal.Logging @@ -51,7 +52,7 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log val client = createHoodieClient(jsc, basePath) val config = client.getConfig val context = client.getEngineContext - val table = HoodieSparkTable.create(config, context, true) + val table = HoodieSparkTable.create(config, context : HoodieEngineContext, true) WriteMarkersFactory.get(config.getMarkersType, table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism) } match { From c9b6d662d6ad1db8628b22da36ecd8cac04426b9 Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Thu, 2 Jun 2022 19:41:01 +0800 Subject: [PATCH 4/6] remove ambuigity --- .../command/procedures/DeleteMarkerProcedure.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala index 5ac886e2eefa..acdda740f926 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala @@ -17,7 +17,11 @@ package org.apache.spark.sql.hudi.command.procedures +import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.engine.HoodieEngineContext +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion +import org.apache.hudi.common.util.Option import org.apache.hudi.table.HoodieSparkTable import org.apache.hudi.table.marker.WriteMarkersFactory import org.apache.spark.internal.Logging @@ -52,7 +56,13 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log val client = createHoodieClient(jsc, basePath) val config = client.getConfig val context = client.getEngineContext - val table = HoodieSparkTable.create(config, context : HoodieEngineContext, true) + val metaClient = HoodieTableMetaClient.builder() + .setConf(context.getHadoopConf.get).setBasePath(config.getBasePath) + .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion))) + .setFileSystemRetryConfig(config.getFileSystemRetryConfig).setProperties(config.getProps) + .build + val table = HoodieSparkTable.create(config, context.asInstanceOf[HoodieSparkEngineContext], metaClient, true) WriteMarkersFactory.get(config.getMarkersType, table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism) } match { From 06ded41960f22828281d7fcffa856a142a5e10f3 Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Thu, 2 Jun 2022 22:01:41 +0800 Subject: [PATCH 5/6] try again --- .../command/procedures/DeleteMarkerProcedure.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala index acdda740f926..19a8d595dc2a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala @@ -17,11 +17,7 @@ package org.apache.spark.sql.hudi.command.procedures -import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.engine.HoodieEngineContext -import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion -import org.apache.hudi.common.util.Option import org.apache.hudi.table.HoodieSparkTable import org.apache.hudi.table.marker.WriteMarkersFactory import org.apache.spark.internal.Logging @@ -55,14 +51,8 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log val result = Try { val client = createHoodieClient(jsc, basePath) val config = client.getConfig - val context = client.getEngineContext - val metaClient = HoodieTableMetaClient.builder() - .setConf(context.getHadoopConf.get).setBasePath(config.getBasePath) - .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig) - .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion))) - .setFileSystemRetryConfig(config.getFileSystemRetryConfig).setProperties(config.getProps) - .build - val table = HoodieSparkTable.create(config, context.asInstanceOf[HoodieSparkEngineContext], metaClient, true) + val context: HoodieEngineContext = client.getEngineContext + val table = HoodieSparkTable.create(config, context, java.lang.Boolean.TRUE) WriteMarkersFactory.get(config.getMarkersType, table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism) } match { From cc44bc0b5993b34b9269a2e3c3b691dcb7fde549 Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Thu, 2 Jun 2022 22:30:10 +0800 Subject: [PATCH 6/6] another try --- .../sql/hudi/command/procedures/DeleteMarkerProcedure.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala index 19a8d595dc2a..8804d9fb5fcb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hudi.command.procedures -import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.table.HoodieSparkTable import org.apache.hudi.table.marker.WriteMarkersFactory import org.apache.spark.internal.Logging @@ -51,8 +50,8 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log val result = Try { val client = createHoodieClient(jsc, basePath) val config = client.getConfig - val context: HoodieEngineContext = client.getEngineContext - val table = HoodieSparkTable.create(config, context, java.lang.Boolean.TRUE) + val context = client.getEngineContext + val table = HoodieSparkTable.create(config, context) WriteMarkersFactory.get(config.getMarkersType, table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism) } match {