Skip to content

Commit

Permalink
HUDI-4687 add show_invalid_parquet procedure (apache#6480)
Browse files Browse the repository at this point in the history
Co-authored-by: zhanshaoxiong <shaoxiong0001@@gmail.com>
  • Loading branch information
2 people authored and jiimmyzhan committed Aug 24, 2022
1 parent 9041320 commit ba6b828
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ object HoodieProcedures {
mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder)
mapBuilder.put(RunCleanProcedure.NAME, RunCleanProcedure.builder)
mapBuilder.put(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
mapBuilder.put(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
mapBuilder.build
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import org.apache.hadoop.fs.Path
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier

class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "path", DataTypes.StringType, None)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("path", DataTypes.StringType, nullable = true, Metadata.empty))
)

def parameters: Array[ProcedureParameter] = PARAMETERS

def outputType: StructType = OUTPUT_TYPE

override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)

val srcPath = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val partitionPaths: java.util.List[String] = FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false)
val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, partitionPaths.size())
val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration())
javaRdd.rdd.map(part => {
val fs = FSUtils.getFs(new Path(srcPath), serHadoopConf.get())
FSUtils.getAllDataFilesInPartition(fs, FSUtils.getPartitionPath(srcPath, part))
}).flatMap(_.toList)
.filter(status => {
val filePath = status.getPath
var isInvalid = false
if (filePath.toString.endsWith(".parquet")) {
try ParquetFileReader.readFooter(serHadoopConf.get(), filePath, SKIP_ROW_GROUPS).getFileMetaData catch {
case e: Exception =>
isInvalid = e.getMessage.contains("is not a Parquet file")
}
}
isInvalid
})
.map(status => Row(status.getPath.toString))
.collect()
}

override def build = new ShowInvalidParquetProcedure()
}

object ShowInvalidParquetProcedure {
val NAME = "show_invalid_parquet"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): ProcedureBuilder = new ShowInvalidParquetProcedure()
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.procedure

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.fs.FSUtils

class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
test("Test Call show_invalid_parquet Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '$basePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")

// Check required fields
checkExceptionContain(s"""call show_invalid_parquet(limit => 10)""")(
s"Argument: path is required")

val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
val invalidPath1 = new Path(basePath, "ts=1000/1.parquet")
val out1 = fs.create(invalidPath1)
out1.write(1)
out1.close()

val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
val out2 = fs.create(invalidPath2)
out2.write(1)
out2.close()

// collect result for table
val result = spark.sql(
s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect()
assertResult(2) {
result.length
}
}
}
}

0 comments on commit ba6b828

Please sign in to comment.