From 828d4d4a8381dad8d9cf5b466ab95168829324aa Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 6 May 2015 21:46:36 +0800 Subject: [PATCH] Bug fixes, fixes test suites, and rebases to new partitioning support branch --- .../apache/spark/deploy/SparkHadoopUtil.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 3 +- .../spark/sql/parquet/fsBasedParquet.scala | 48 ++++++++----- .../apache/spark/sql/sources/interfaces.scala | 11 +-- .../sql/parquet/ParquetFilterSuite.scala | 6 +- .../spark/sql/parquet/ParquetIOSuite.scala | 18 ++--- .../spark/sql/hive/HiveMetastoreCatalog.scala | 69 ++++++++++++++++++- 7 files changed, 118 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 31b07998d8630..20d59ce6b4d5e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -222,7 +222,7 @@ class SparkHadoopUtil extends Logging { def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = { def recurse(status: FileStatus): Seq[FileStatus] = { - val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir) + val directories = fs.listStatus(status.getPath).filter(_.isDir) val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus] leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 55cd440f775ef..53f3c503b3954 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -70,7 +70,7 @@ import org.apache.spark.{Partition, SparkContext} * spark-sql> SELECT * FROM src LIMIT 1; * *-- Exception will be thrown and switch to dialect - *-- "sql" (for SQLContext) or + *-- "sql" (for SQLContext) or *-- "hiveql" (for HiveContext) * }}} */ @@ -597,7 +597,6 @@ class SQLContext(@transient val sparkContext: SparkContext) if (paths.isEmpty) { emptyDataFrame } else if (conf.parquetUseDataSourceApi) { - // baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this)) baseRelationToDataFrame( new parquet.FSBasedParquetRelation( paths.toArray, None, None, Map.empty[String, String])(this)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala index 057836645ccd6..793622542db89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala @@ -34,6 +34,8 @@ import parquet.filter2.predicate.FilterApi import parquet.format.converter.ParquetMetadataConverter import parquet.hadoop._ import parquet.hadoop.codec.CodecConfig +import parquet.hadoop.metadata.CompressionCodecName +import parquet.hadoop.util.ContextUtil import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.mapred.SparkHadoopMapRedUtil @@ -65,19 +67,7 @@ private[sql] class ParquetOutputWriter extends OutputWriter with SparkHadoopMapR path: String, dataSchema: StructType, context: TaskAttemptContext): Unit = { - // TODO There's no need to use two kinds of WriteSupport - // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and - // complex types. - val writeSupportClass = { - if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { - classOf[MutableRowWriteSupport].getName - } else { - classOf[RowWriteSupport].getName - } - } - val conf = context.getConfiguration - conf.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS, writeSupportClass) RowWriteSupport.setSchema(dataSchema.toAttributes, conf) val outputFormat = new ParquetOutputFormat[Row]() @@ -168,10 +158,6 @@ private[sql] class FSBasedParquetRelation( classOf[ParquetOutputWriter] } - override def outputCommitterClass: Class[_ <: FileOutputCommitter] = { - classOf[ParquetOutputCommitter] - } - // Skips type conversion override val needConversion: Boolean = false @@ -180,6 +166,32 @@ private[sql] class FSBasedParquetRelation( // whole Parquet file disables some optimizations in this case (e.g. broadcast join). override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum + override def prepareForWrite(job: Job): Unit = { + job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) + + // TODO There's no need to use two kinds of WriteSupport + // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and + // complex types. + val writeSupportClass = + if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { + classOf[MutableRowWriteSupport] + } else { + classOf[RowWriteSupport] + } + + ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass) + RowWriteSupport.setSchema(dataSchema.toAttributes, job.getConfiguration) + + // Sets compression scheme + ContextUtil.getConfiguration(job).set( + ParquetOutputFormat.COMPRESSION, + ParquetRelation + .shortParquetCompressionCodecNames + .getOrElse( + sqlContext.conf.parquetCompressionCodec.toUpperCase, + CompressionCodecName.UNCOMPRESSED).name()) + } + override def buildScan( requiredColumns: Array[String], filters: Array[Filter], @@ -376,7 +388,9 @@ private[sql] class FSBasedParquetRelation( .toSeq } - assert(filesToTouch.nonEmpty, "") + assert( + filesToTouch.nonEmpty, + s"No Parquet data file or summary file found under ${paths.mkString(", ")}.") ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index e42384c4cee32..3ddd3d683d88e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources +import scala.util.Try + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -350,11 +352,10 @@ abstract class FSBasedRelation private[sql]( val basePaths = paths.map(new Path(_)) val leafDirs = basePaths.flatMap { path => val fs = path.getFileSystem(hadoopConf) - if (fs.exists(path)) { - SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path)) - } else { - Seq.empty[FileStatus] - } + Try(fs.getFileStatus(fs.makeQualified(path))) + .filter(_.isDir) + .map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _)) + .getOrElse(Seq.empty[FileStatus]) }.map(_.getPath) if (leafDirs.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index 10d0ede4dc0dc..3bbc5b05868af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { }.flatten.reduceOption(_ && _) val forParquetDataSource = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters + case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters }.flatten.reduceOption(_ && _) forParquetTableScan.orElse(forParquetDataSource) @@ -350,7 +350,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before override protected def afterAll(): Unit = { sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } - + test("SPARK-6742: don't push down predicates which reference partition columns") { import sqlContext.implicits._ @@ -365,7 +365,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before path, Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext, Seq(AttributeReference("part", IntegerType, false)()) )) - + checkAnswer( df.filter("a = 1 or part = 1"), (1 to 3).map(i => Row(1, i, i.toString))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index b504842053690..b648614384798 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -28,11 +28,12 @@ import parquet.example.data.simple.SimpleGroup import parquet.example.data.{Group, GroupWriter} import parquet.hadoop.api.WriteSupport import parquet.hadoop.api.WriteSupport.WriteContext -import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName} +import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter} import parquet.io.api.RecordConsumer import parquet.schema.{MessageType, MessageTypeParser} +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.test.TestSQLContext @@ -101,7 +102,6 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } test("fixed-length decimals") { - def makeDecimalRDD(decimal: DecimalType): DataFrame = sparkContext .parallelize(0 to 1000) @@ -119,7 +119,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } // Decimals with precision above 18 are not yet supported - intercept[RuntimeException] { + intercept[SparkException] { withTempPath { dir => makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath) parquetFile(dir.getCanonicalPath).collect() @@ -127,7 +127,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } // Unlimited-length decimals are not yet supported - intercept[RuntimeException] { + intercept[SparkException] { withTempPath { dir => makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath) parquetFile(dir.getCanonicalPath).collect() @@ -310,7 +310,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { test("save - overwrite") { withParquetFile((1 to 10).map(i => (i, i.toString))) { file => val newData = (11 to 20).map(i => (i, i.toString)) - newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Overwrite, Map("path" -> file)) + newData.toDF().save("parquet", SaveMode.Overwrite, Map("path" -> file)) checkAnswer(parquetFile(file), newData.map(Row.fromTuple)) } } @@ -319,7 +319,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { val data = (1 to 10).map(i => (i, i.toString)) withParquetFile(data) { file => val newData = (11 to 20).map(i => (i, i.toString)) - newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Ignore, Map("path" -> file)) + newData.toDF().save("parquet", SaveMode.Ignore, Map("path" -> file)) checkAnswer(parquetFile(file), data.map(Row.fromTuple)) } } @@ -330,7 +330,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { val newData = (11 to 20).map(i => (i, i.toString)) val errorMessage = intercept[Throwable] { newData.toDF().save( - "org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> file)) + "parquet", SaveMode.ErrorIfExists, Map("path" -> file)) }.getMessage assert(errorMessage.contains("already exists")) } @@ -340,7 +340,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { val data = (1 to 10).map(i => (i, i.toString)) withParquetFile(data) { file => val newData = (11 to 20).map(i => (i, i.toString)) - newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Append, Map("path" -> file)) + newData.toDF().save("parquet", SaveMode.Append, Map("path" -> file)) checkAnswer(parquetFile(file), (data ++ newData).map(Row.fromTuple)) } } @@ -419,7 +419,7 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA test("SPARK-6330 regression test") { // In 1.3.0, save to fs other than file: without configuring core-site.xml would get: // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:/// - intercept[java.io.FileNotFoundException] { + intercept[AssertionError] { sqlContext.parquetFile("file:///nonexistent") } val errorMessage = intercept[Throwable] { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0a2292f6539af..c28c60bcddd89 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.util.ReflectionUtils -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, NoSuchTableException, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -273,7 +273,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with case other => logWarning( s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as Parquet. However, we are getting a ${other} from the metastore cache. " + + s"as Parquet. However, we are getting a $other from the metastore cache. " + s"This cached entry will be invalidated.") cachedDataSourceTables.invalidate(tableIdentifier) None @@ -319,6 +319,71 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with result.newInstance() } + /** + * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore + * schema and another given schema. + * + * Hive doesn't retain case information, while other data sources may be case sensitive. On the + * other hand, type information within schemas read from other data sources may be incomplete or + * inaccurate (e.g. older versions of Parquet doesn't distinguish binary and string). This method + * generates a correct schema by merging Metastore schema data types and data source schema field + * names. + */ + private[sql] def mergeMetastoreSchema( + metastoreSchema: StructType, + schema: StructType): StructType = { + def schemaConflictMessage: String = + s"""Converting Hive Metastore table, but detected conflicting schemas. Metastore schema: + |${metastoreSchema.prettyJson} + | + |Data source schema: + |${schema.prettyJson} + """.stripMargin + + val mergedSchema = mergeMissingNullableFields(metastoreSchema, schema) + + assert(metastoreSchema.size <= mergedSchema.size, schemaConflictMessage) + + val ordinalMap = metastoreSchema.zipWithIndex.map { + case (field, index) => field.name.toLowerCase -> index + }.toMap + val reorderedSchema = mergedSchema.sortBy(f => + ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) + + StructType(metastoreSchema.zip(reorderedSchema).map { + // Uses data source schema field names but retains Metastore data types. + case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase => + mSchema.copy(name = pSchema.name) + case _ => + sys.error(schemaConflictMessage) + }) + } + + /** + * Returns the original schema from the data source with any missing nullable fields from the + * Hive Metastore schema merged in. + * + * When constructing a DataFrame from a collection of structured data, the resulting object has + * a schema corresponding to the union of the fields present in each element of the collection. + * Spark SQL simply assigns a null value to any field that isn't present for a particular row. + * In some cases, it is possible that a given table partition stored as a data source file doesn't + * contain a particular nullable field in its schema despite that field being present in the + * table schema obtained from the Hive Metastore. This method returns a schema representing the + * data source schema along with any additional nullable fields from the Metastore schema + * merged in. + */ + private def mergeMissingNullableFields( + metastoreSchema: StructType, + schema: StructType): StructType = { + val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap + val missingFields = metastoreSchema + .map(_.name.toLowerCase) + .diff(schema.map(_.name.toLowerCase)) + .map(fieldMap(_)) + .filter(_.nullable) + StructType(schema ++ missingFields) + } + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized { val dbName = if (!caseSensitive) { if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None