Skip to content

Commit

Permalink
[HUDI-7110] Add call procedure for show column stats information (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
majian1998 authored Nov 23, 2023
1 parent aabaa99 commit 8d6d043
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*)
}

private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
// Read Metadata Table's Column Stats Index records into [[HoodieData]] container by
// - Fetching the records from CSI by key-prefixes (encoded column names)
// - Extracting [[HoodieMetadataColumnStats]] records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ object HoodieProcedures {
,(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
,(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
,(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
,(ShowMetadataTableColumnStatsProcedure.NAME, ShowMetadataTableColumnStatsProcedure.builder)
,(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder)
,(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder)
,(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.avro.model._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline, HoodieInstant}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util
import java.util.function.{Function, Supplier}
import scala.collection.{JavaConversions, mutable}
import scala.jdk.CollectionConverters.{asScalaBufferConverter, asScalaIteratorConverter}


class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "partition", DataTypes.StringType),
ProcedureParameter.optional(2, "targetColumns", DataTypes.StringType)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("column_name", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("min_value", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("max_value", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("null_num", DataTypes.LongType, nullable = true, Metadata.empty)
))

def parameters: Array[ProcedureParameter] = PARAMETERS

def outputType: StructType = OUTPUT_TYPE

override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)

val table = getArgValueOrDefault(args, PARAMETERS(0))
val partitions = getArgValueOrDefault(args, PARAMETERS(1)).getOrElse("").toString
val partitionsSeq = partitions.split(",").filter(_.nonEmpty).toSeq

val targetColumns = getArgValueOrDefault(args, PARAMETERS(2)).getOrElse("").toString
val targetColumnsSeq = targetColumns.split(",").toSeq
val basePath = getBasePath(table)
val metadataConfig = HoodieMetadataConfig.newBuilder
.enable(true)
.build
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val schemaUtil = new TableSchemaResolver(metaClient)
val schema = AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = columnStatsIndex.loadColumnStatsIndexRecords(targetColumnsSeq, shouldReadInMemory = false)
val fsView = buildFileSystemView(table)
val allFileSlices: Set[FileSlice] = {
if (partitionsSeq.isEmpty) {
val engineCtx = new HoodieSparkEngineContext(jsc)
val metaTable = HoodieTableMetadata.create(engineCtx, metadataConfig, basePath)
metaTable.getAllPartitionPaths
.asScala
.flatMap(path => fsView.getLatestFileSlices(path).iterator().asScala)
.toSet
} else {
partitionsSeq
.flatMap(partition => fsView.getLatestFileSlices(partition).iterator().asScala)
.toSet
}
}

val allFileNames: Set[String] = allFileSlices.map(_.getBaseFile.get().getFileName)

val rows = mutable.ListBuffer[Row]()
colStatsRecords.collectAsList().asScala
.filter(c => allFileNames.contains(c.getFileName))
.foreach { c =>
rows += Row(c.getFileName, c.getColumnName, getColumnStatsValue(c.getMinValue),
getColumnStatsValue(c.getMaxValue), c.getNullCount.longValue())
}

rows.toList
}

private def getColumnStatsValue(stats_value: Any): String = {
stats_value match {
case _: IntWrapper |
_: BooleanWrapper |
_: DateWrapper |
_: DoubleWrapper |
_: FloatWrapper |
_: LongWrapper |
_: StringWrapper |
_: TimeMicrosWrapper |
_: TimestampMicrosWrapper =>
String.valueOf(stats_value.asInstanceOf[IndexedRecord].get(0))
case _: BytesWrapper =>
val bytes_value = stats_value.asInstanceOf[BytesWrapper].getValue
util.Arrays.toString(bytes_value.array())
case _: DecimalWrapper =>
val decimal_value = stats_value.asInstanceOf[DecimalWrapper].getValue
util.Arrays.toString(decimal_value.array())
case _ =>
throw new HoodieException(s"Unsupported type: ${stats_value.getClass.getSimpleName}")
}
}

def buildFileSystemView(table: Option[Any]): HoodieTableFileSystemView = {
val basePath = getBasePath(table)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val fs = metaClient.getFs
val globPath = s"$basePath/*/*/*"
val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath))

val timeline = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()

val maxInstant = metaClient.createNewInstantTime()
val instants = timeline.getInstants.iterator().asScala.filter(_.getTimestamp < maxInstant)

val details = new Function[HoodieInstant, org.apache.hudi.common.util.Option[Array[Byte]]]
with java.io.Serializable {
override def apply(instant: HoodieInstant): HOption[Array[Byte]] = {
metaClient.getActiveTimeline.getInstantDetails(instant)
}
}

val filteredTimeline = new HoodieDefaultTimeline(
new java.util.ArrayList[HoodieInstant](JavaConversions.asJavaCollection(instants.toList)).stream(), details)

new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new Array[FileStatus](statuses.size)))
}

override def build: Procedure = new ShowMetadataTableColumnStatsProcedure()
}

object ShowMetadataTableColumnStatsProcedure {
val NAME = "show_metadata_table_column_stats"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowMetadataTableColumnStatsProcedure()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,72 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase {
}
}

test("Test Call show_metadata_table_column_stats Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| c1 int,
| c2 boolean,
| c3 binary,
| c4 date,
| c5 decimal(10,1),
| c6 double,
| c7 float,
| c8 long,
| c9 string,
| c10 timestamp
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'c1',
| preCombineField = 'c8',
| hoodie.metadata.enable="true",
| hoodie.metadata.index.column.stats.enable="true"
| )
""".stripMargin)
// insert data to table

spark.sql(
s"""
|insert into table $tableName
|values (1, true, CAST('binary data' AS BINARY), CAST('2021-01-01' AS DATE), CAST(10.5 AS DECIMAL(10,1)), CAST(3.14 AS DOUBLE), CAST(2.5 AS FLOAT), 1000, 'example string', CAST('2021-01-01 00:00:00' AS TIMESTAMP))
|""".stripMargin)
spark.sql(
s"""
|insert into table $tableName
|values (10, false, CAST('binary data' AS BINARY), CAST('2022-02-02' AS DATE), CAST(20.5 AS DECIMAL(10,1)), CAST(6.28 AS DOUBLE), CAST(3.14 AS FLOAT), 2000, 'another string', CAST('2022-02-02 00:00:00' AS TIMESTAMP))
|""".stripMargin)

// Only numerical and string types are compared for clarity on min/max values.
val expectedValues = Map(
1 -> ("1", "10"),
2 -> ("false", "true"),
6 -> ("3.14", "6.28"),
7 -> ("2.5", "3.14"),
8 -> ("1000", "2000"),
9 -> ("another string", "example string")
)

for (i <- 1 to 10) {
val columnName = s"c$i"
val metadataStats = spark.sql(s"""call show_metadata_table_column_stats(table => '$tableName', targetColumns => '$columnName')""").collect()
assertResult(1)(metadataStats.length)
val minVal: String = metadataStats(0).getAs[String]("min_value")
val maxVal: String = metadataStats(0).getAs[String]("max_value")

expectedValues.get(i) match {
case Some((expectedMin, expectedMax)) =>
assertResult(expectedMin)(minVal)
assertResult(expectedMax)(maxVal)
case None => // Do nothing if no expected values found
}
}
}
}

test("Test Call show_metadata_table_stats Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
Expand Down

0 comments on commit 8d6d043

Please sign in to comment.