Skip to content

Commit

Permalink
Bug fixes, fixes test suites, and rebases to new partitioning support…
Browse files Browse the repository at this point in the history
… branch
  • Loading branch information
liancheng committed May 6, 2015
1 parent 8ed5db8 commit 828d4d4
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class SparkHadoopUtil extends Logging {

def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
val directories = fs.listStatus(status.getPath).filter(_.isDir)
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import org.apache.spark.{Partition, SparkContext}
* spark-sql> SELECT * FROM src LIMIT 1;
*
*-- Exception will be thrown and switch to dialect
*-- "sql" (for SQLContext) or
*-- "sql" (for SQLContext) or
*-- "hiveql" (for HiveContext)
* }}}
*/
Expand Down Expand Up @@ -597,7 +597,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
if (paths.isEmpty) {
emptyDataFrame
} else if (conf.parquetUseDataSourceApi) {
// baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this))
baseRelationToDataFrame(
new parquet.FSBasedParquetRelation(
paths.toArray, None, None, Map.empty[String, String])(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import parquet.filter2.predicate.FilterApi
import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop._
import parquet.hadoop.codec.CodecConfig
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
Expand Down Expand Up @@ -65,19 +67,7 @@ private[sql] class ParquetOutputWriter extends OutputWriter with SparkHadoopMapR
path: String,
dataSchema: StructType,
context: TaskAttemptContext): Unit = {
// TODO There's no need to use two kinds of WriteSupport
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
// complex types.
val writeSupportClass = {
if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
classOf[MutableRowWriteSupport].getName
} else {
classOf[RowWriteSupport].getName
}
}

val conf = context.getConfiguration
conf.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS, writeSupportClass)
RowWriteSupport.setSchema(dataSchema.toAttributes, conf)

val outputFormat = new ParquetOutputFormat[Row]()
Expand Down Expand Up @@ -168,10 +158,6 @@ private[sql] class FSBasedParquetRelation(
classOf[ParquetOutputWriter]
}

override def outputCommitterClass: Class[_ <: FileOutputCommitter] = {
classOf[ParquetOutputCommitter]
}

// Skips type conversion
override val needConversion: Boolean = false

Expand All @@ -180,6 +166,32 @@ private[sql] class FSBasedParquetRelation(
// whole Parquet file disables some optimizations in this case (e.g. broadcast join).
override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum

override def prepareForWrite(job: Job): Unit = {
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

// TODO There's no need to use two kinds of WriteSupport
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
// complex types.
val writeSupportClass =
if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
classOf[MutableRowWriteSupport]
} else {
classOf[RowWriteSupport]
}

ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
RowWriteSupport.setSchema(dataSchema.toAttributes, job.getConfiguration)

// Sets compression scheme
ContextUtil.getConfiguration(job).set(
ParquetOutputFormat.COMPRESSION,
ParquetRelation
.shortParquetCompressionCodecNames
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toUpperCase,
CompressionCodecName.UNCOMPRESSED).name())
}

override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
Expand Down Expand Up @@ -376,7 +388,9 @@ private[sql] class FSBasedParquetRelation(
.toSeq
}

assert(filesToTouch.nonEmpty, "")
assert(
filesToTouch.nonEmpty,
s"No Parquet data file or summary file found under ${paths.mkString(", ")}.")

ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.sources

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -350,11 +352,10 @@ abstract class FSBasedRelation private[sql](
val basePaths = paths.map(new Path(_))
val leafDirs = basePaths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path))
} else {
Seq.empty[FileStatus]
}
Try(fs.getFileStatus(fs.makeQualified(path)))
.filter(_.isDir)
.map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
.getOrElse(Seq.empty[FileStatus])
}.map(_.getPath)

if (leafDirs.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}.flatten.reduceOption(_ && _)

val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters
}.flatten.reduceOption(_ && _)

forParquetTableScan.orElse(forParquetDataSource)
Expand Down Expand Up @@ -350,7 +350,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
override protected def afterAll(): Unit = {
sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}

test("SPARK-6742: don't push down predicates which reference partition columns") {
import sqlContext.implicits._

Expand All @@ -365,7 +365,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
path,
Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
Seq(AttributeReference("part", IntegerType, false)()) ))

checkAnswer(
df.filter("a = 1 or part = 1"),
(1 to 3).map(i => Row(1, i, i.toString)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import parquet.example.data.simple.SimpleGroup
import parquet.example.data.{Group, GroupWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
Expand Down Expand Up @@ -101,7 +102,6 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}

test("fixed-length decimals") {

def makeDecimalRDD(decimal: DecimalType): DataFrame =
sparkContext
.parallelize(0 to 1000)
Expand All @@ -119,15 +119,15 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}

// Decimals with precision above 18 are not yet supported
intercept[RuntimeException] {
intercept[SparkException] {
withTempPath { dir =>
makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
parquetFile(dir.getCanonicalPath).collect()
}
}

// Unlimited-length decimals are not yet supported
intercept[RuntimeException] {
intercept[SparkException] {
withTempPath { dir =>
makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
parquetFile(dir.getCanonicalPath).collect()
Expand Down Expand Up @@ -310,7 +310,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
test("save - overwrite") {
withParquetFile((1 to 10).map(i => (i, i.toString))) { file =>
val newData = (11 to 20).map(i => (i, i.toString))
newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Overwrite, Map("path" -> file))
newData.toDF().save("parquet", SaveMode.Overwrite, Map("path" -> file))
checkAnswer(parquetFile(file), newData.map(Row.fromTuple))
}
}
Expand All @@ -319,7 +319,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val data = (1 to 10).map(i => (i, i.toString))
withParquetFile(data) { file =>
val newData = (11 to 20).map(i => (i, i.toString))
newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Ignore, Map("path" -> file))
newData.toDF().save("parquet", SaveMode.Ignore, Map("path" -> file))
checkAnswer(parquetFile(file), data.map(Row.fromTuple))
}
}
Expand All @@ -330,7 +330,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val newData = (11 to 20).map(i => (i, i.toString))
val errorMessage = intercept[Throwable] {
newData.toDF().save(
"org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> file))
"parquet", SaveMode.ErrorIfExists, Map("path" -> file))
}.getMessage
assert(errorMessage.contains("already exists"))
}
Expand All @@ -340,7 +340,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val data = (1 to 10).map(i => (i, i.toString))
withParquetFile(data) { file =>
val newData = (11 to 20).map(i => (i, i.toString))
newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Append, Map("path" -> file))
newData.toDF().save("parquet", SaveMode.Append, Map("path" -> file))
checkAnswer(parquetFile(file), (data ++ newData).map(Row.fromTuple))
}
}
Expand Down Expand Up @@ -419,7 +419,7 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA
test("SPARK-6330 regression test") {
// In 1.3.0, save to fs other than file: without configuring core-site.xml would get:
// IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
intercept[java.io.FileNotFoundException] {
intercept[AssertionError] {
sqlContext.parquetFile("file:///nonexistent")
}
val errorMessage = intercept[Throwable] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark.Logging
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, NoSuchTableException, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand Down Expand Up @@ -273,7 +273,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
s"as Parquet. However, we are getting a $other from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
Expand Down Expand Up @@ -319,6 +319,71 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
result.newInstance()
}

/**
* Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
* schema and another given schema.
*
* Hive doesn't retain case information, while other data sources may be case sensitive. On the
* other hand, type information within schemas read from other data sources may be incomplete or
* inaccurate (e.g. older versions of Parquet doesn't distinguish binary and string). This method
* generates a correct schema by merging Metastore schema data types and data source schema field
* names.
*/
private[sql] def mergeMetastoreSchema(
metastoreSchema: StructType,
schema: StructType): StructType = {
def schemaConflictMessage: String =
s"""Converting Hive Metastore table, but detected conflicting schemas. Metastore schema:
|${metastoreSchema.prettyJson}
|
|Data source schema:
|${schema.prettyJson}
""".stripMargin

val mergedSchema = mergeMissingNullableFields(metastoreSchema, schema)

assert(metastoreSchema.size <= mergedSchema.size, schemaConflictMessage)

val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap
val reorderedSchema = mergedSchema.sortBy(f =>
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))

StructType(metastoreSchema.zip(reorderedSchema).map {
// Uses data source schema field names but retains Metastore data types.
case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
mSchema.copy(name = pSchema.name)
case _ =>
sys.error(schemaConflictMessage)
})
}

/**
* Returns the original schema from the data source with any missing nullable fields from the
* Hive Metastore schema merged in.
*
* When constructing a DataFrame from a collection of structured data, the resulting object has
* a schema corresponding to the union of the fields present in each element of the collection.
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
* In some cases, it is possible that a given table partition stored as a data source file doesn't
* contain a particular nullable field in its schema despite that field being present in the
* table schema obtained from the Hive Metastore. This method returns a schema representing the
* data source schema along with any additional nullable fields from the Metastore schema
* merged in.
*/
private def mergeMissingNullableFields(
metastoreSchema: StructType,
schema: StructType): StructType = {
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
val missingFields = metastoreSchema
.map(_.name.toLowerCase)
.diff(schema.map(_.name.toLowerCase))
.map(fieldMap(_))
.filter(_.nullable)
StructType(schema ++ missingFields)
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
val dbName = if (!caseSensitive) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
Expand Down

0 comments on commit 828d4d4

Please sign in to comment.