diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index ff129964faea..d974e216d585 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -48,6 +48,8 @@ object HoodieProcedures { mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder) mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder) mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder) + mapBuilder.put(ShowAllFileSystemViewProcedure.NAME, ShowAllFileSystemViewProcedure.builder) + mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala new file mode 100644 index 000000000000..26196e3615f6 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import com.google.common.collect.Lists +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline, HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.util +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.{Function, Supplier} +import java.util.stream.Collectors +import scala.collection.JavaConverters.{asJavaIteratorConverter, asScalaIteratorConverter} + +class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS_ALL: Array[ProcedureParameter] = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""), + ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, false), + ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, false), + ProcedureParameter.optional(4, "exclude_compaction", DataTypes.BooleanType, false), + ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10), + ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*") + ) + + private val OUTPUT_TYPE_ALL: StructType = StructType(Array[StructField]( + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("base_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("data_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("data_file_size", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_delta_files", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_delta_file_size", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("delta_files", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + private val PARAMETERS_LATEST: Array[ProcedureParameter] = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""), + ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, false), + ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, false), + ProcedureParameter.optional(4, "exclude_compaction", DataTypes.BooleanType, false), + ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10), + ProcedureParameter.required(6, "partition_path", DataTypes.StringType, None), + ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true) + + ) + + private val OUTPUT_TYPE_LATEST: StructType = StructType(Array[StructField]( + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("base_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("data_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("data_file_size", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_delta_files", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_delta_file_size", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("delta_size_compaction_scheduled", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("delta_size_compaction_unscheduled", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("delta_to_base_radio_compaction_scheduled", DataTypes.DoubleType, nullable = true, Metadata.empty), + StructField("delta_to_base_radio_compaction_unscheduled", DataTypes.DoubleType, nullable = true, Metadata.empty), + StructField("delta_files_compaction_scheduled", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delta_files_compaction_unscheduled", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + private def buildFileSystemView(table: Option[Any], + globRegex: String, + maxInstant: String, + includeMaxInstant: Boolean, + includeInflight: Boolean, + excludeCompaction: Boolean + ): HoodieTableFileSystemView = { + val basePath = getBasePath(table) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val fs = metaClient.getFs + val globPath = String.format("%s/%s/*", basePath, globRegex) + val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath)) + var timeline: HoodieTimeline = if (excludeCompaction) { + metaClient.getActiveTimeline.getCommitsTimeline + } else { + metaClient.getActiveTimeline.getWriteTimeline + } + if (!includeInflight) { + timeline = timeline.filterCompletedInstants() + } + var instants = timeline.getInstants.iterator().asScala + if (maxInstant.nonEmpty) { + val predicate = if (includeMaxInstant) { + HoodieTimeline.GREATER_THAN_OR_EQUALS + } else { + HoodieTimeline.GREATER_THAN + } + instants = instants.filter(instant => predicate.test(maxInstant, instant.getTimestamp)) + } + + val details = new Function[HoodieInstant, org.apache.hudi.common.util.Option[Array[Byte]]] + with java.io.Serializable { + override def apply(instant: HoodieInstant): util.Option[Array[Byte]] = { + metaClient.getActiveTimeline.getInstantDetails(instant) + } + } + val filteredTimeline = new HoodieDefaultTimeline(Lists.newArrayList(instants.asJava).stream(), details) + new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new Array[FileStatus](0))) + } + + private def showAllFileSlices(fsView: HoodieTableFileSystemView): java.util.List[Row] = { + val rows: java.util.List[Row] = Lists.newArrayList() + fsView.getAllFileGroups.iterator().asScala.foreach(fg => { + fg.getAllFileSlices.iterator().asScala.foreach(fs => { + val fileId = fg.getFileGroupId.getFileId + var baseFilePath = "" + var baseFileSize = 0L + if (fs.getBaseFile.isPresent) { + baseFilePath = fs.getBaseFile.get.getPath + baseFileSize = fs.getBaseFile.get.getFileSize + } + val numLogFiles = fs.getLogFiles.count() + val sumLogFileSize = fs.getLogFiles.iterator().asScala.map(_.getFileSize).sum + val logFiles = fs.getLogFiles.collect(Collectors.toList[HoodieLogFile]).toString + + rows.add(Row(fg.getPartitionPath, fileId, fs.getBaseInstantTime, baseFilePath, baseFileSize, numLogFiles, + sumLogFileSize, logFiles)) + }) + }) + rows + } + + private def showLatestFileSlices(table: Option[Any], + fsView: HoodieTableFileSystemView, + partition: String, + maxInstant: String, + merge: Boolean): java.util.List[Row] = { + var fileSliceStream: java.util.stream.Stream[FileSlice] = null + if (!merge) { + fileSliceStream = fsView.getLatestFileSlices(partition) + } else { + var tmp = maxInstant + if (tmp.isEmpty) { + val basePath = getBasePath(table) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + tmp = metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp + } + fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, tmp) + } + val rows: java.util.List[Row] = Lists.newArrayList() + fileSliceStream.iterator().asScala.foreach { + fs => { + val fileId = fs.getFileId + val baseInstantTime = fs.getBaseInstantTime + var baseFilePath = "" + var baseFileSize = 0L + if (fs.getBaseFile.isPresent) { + baseFilePath = fs.getBaseFile.get.getPath + baseFileSize = fs.getBaseFile.get.getFileSize + } + val numLogFiles = fs.getLogFiles.count() + val sumLogFileSize = fs.getLogFiles.iterator().asScala.map(_.getFileSize).sum + val logFilesScheduledForCompactionTotalSize = fs.getLogFiles.iterator().asScala + .filter(logFile => logFile.getBaseCommitTime.equals(fs.getBaseInstantTime)) + .map(_.getFileSize).sum + val logFilesUnscheduledTotalSize = fs.getLogFiles.iterator().asScala + .filter(logFile => !logFile.getBaseCommitTime.equals(fs.getBaseInstantTime)) + .map(_.getFileSize).sum + val logSelectedForCompactionToBaseRatio = if (baseFileSize > 0) { + logFilesScheduledForCompactionTotalSize / (baseFileSize * 1.0) + } else { + -1 + } + val logUnscheduledToBaseRatio = if (baseFileSize > 0) { + logFilesUnscheduledTotalSize / (baseFileSize * 1.0) + } else { + -1 + } + val logFilesCommitTimeEqualInstantTime = fs.getLogFiles.iterator().asScala + .filter(logFile => logFile.getBaseCommitTime.equals(fs.getBaseInstantTime)) + .mkString("[", ",", "]") + val logFilesCommitTimeNonEqualInstantTime = fs.getLogFiles.iterator().asScala + .filter(logFile => !logFile.getBaseCommitTime.equals(fs.getBaseInstantTime)) + .mkString("[", ",", "]") + rows.add(Row(partition, fileId, baseInstantTime, baseFilePath, baseFileSize, numLogFiles, sumLogFileSize, + logFilesScheduledForCompactionTotalSize, logFilesUnscheduledTotalSize, logSelectedForCompactionToBaseRatio, + logUnscheduledToBaseRatio, logFilesCommitTimeEqualInstantTime, logFilesCommitTimeNonEqualInstantTime + )) + } + } + rows + } + + override def parameters: Array[ProcedureParameter] = if (showLatest) { + PARAMETERS_LATEST + } else { + PARAMETERS_ALL + } + + override def outputType: StructType = if (showLatest) { + OUTPUT_TYPE_LATEST + } else { + OUTPUT_TYPE_ALL + } + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(parameters, args) + val table = getArgValueOrDefault(args, parameters(0)) + val maxInstant = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String] + val includeMax = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Boolean] + val includeInflight = getArgValueOrDefault(args, parameters(3)).get.asInstanceOf[Boolean] + val excludeCompaction = getArgValueOrDefault(args, parameters(4)).get.asInstanceOf[Boolean] + val limit = getArgValueOrDefault(args, parameters(5)).get.asInstanceOf[Int] + val rows: java.util.List[Row] = if (!showLatest) { + val globRegex = getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String] + val fsView = buildFileSystemView(table, globRegex, maxInstant, includeMax, includeInflight, excludeCompaction) + showAllFileSlices(fsView) + } else { + val partitionPath = getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String] + val merge = getArgValueOrDefault(args, parameters(7)).get.asInstanceOf[Boolean] + val fsView = buildFileSystemView(table, partitionPath, maxInstant, includeMax, includeInflight, excludeCompaction) + showLatestFileSlices(table, fsView, partitionPath, maxInstant, merge) + } + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ShowFileSystemViewProcedure(showLatest) +} + +object ShowAllFileSystemViewProcedure { + val NAME = "show_fsview_all" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowFileSystemViewProcedure(false) + } +} + +object ShowLatestFileSystemViewProcedure { + val NAME = "show_fsview_latest" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new ShowFileSystemViewProcedure(true) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala new file mode 100644 index 000000000000..69d08e37dfc0 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestFsViewProcedure extends HoodieSparkSqlTestBase { + test("Test Call show_fsview_all Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | partitioned by (ts) + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // Check required fields + checkExceptionContain(s"""call show_fsview_all(limit => 10)""")( + s"Argument: table is required") + + // collect result for table + val result = spark.sql( + s"""call show_fsview_all(table => '$tableName', path_regex => '*/', limit => 10)""".stripMargin).collect() + assertResult(2) { + result.length + } + } + } + + test("Test Call show_fsview_latest Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | partitioned by (ts) + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // Check required fields + checkExceptionContain(s"""call show_fsview_latest(limit => 10)""")( + s"Argument: table is required") + + // collect result for table + val result = spark.sql( + s"""call show_fsview_latest(table => '$tableName', partition_path => 'ts=1000', limit => 10)""".stripMargin).collect() + assertResult(1) { + result.length + } + } + } +}