Skip to content

Commit

Permalink
[SPARK-7591] [SQL] Partitioning support API tweaks
Browse files Browse the repository at this point in the history
Please see [SPARK-7591] [1] for the details.

/cc rxin marmbrus yhuai

[1]: https://issues.apache.org/jira/browse/SPARK-7591

Author: Cheng Lian <[email protected]>

Closes apache#6150 from liancheng/spark-7591 and squashes the following commits:

af422e7 [Cheng Lian] Addresses @rxin's comments
37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization
2fc680a [Cheng Lian] Fixes Scala style issue
189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments
522c24e [Cheng Lian] Adds OutputWriterFactory
047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2
  • Loading branch information
liancheng authored and jeanlyn committed Jun 12, 2015
1 parent 0fe0147 commit 0445be5
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 194 deletions.
14 changes: 7 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
} else if (conf.parquetUseDataSourceApi) {
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
baseRelationToDataFrame(
new FSBasedParquetRelation(
new ParquetRelation2(
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
} else {
DataFrame(this, parquet.ParquetRelation(
Expand Down Expand Up @@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand All @@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand All @@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def jdbc(
url: String,
table: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
Expand Down Expand Up @@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val parts = JDBCRelation.columnPartition(partitioning)
jdbc(url, table, parts, properties)
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand Down Expand Up @@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
jdbc(url, table, parts, properties)
}

private def jdbc(
url: String,
table: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,23 @@ import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}

private[sql] class DefaultSource extends FSBasedRelationProvider {
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): FSBasedRelation = {
parameters: Map[String, String]): HadoopFsRelation = {
val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
}
}

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter extends OutputWriter {
private var recordWriter: RecordWriter[Void, Row] = _
private var taskAttemptContext: TaskAttemptContext = _

override def init(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): Unit = {
private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
extends OutputWriter {

private val recordWriter: RecordWriter[Void, Row] = {
val conf = context.getConfiguration
val outputFormat = {
// When appending new Parquet files to an existing Parquet file directory, to avoid
Expand All @@ -77,7 +73,7 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
if (fs.exists(outputPath)) {
// Pattern used to match task ID in part file names, e.g.:
//
// part-r-00001.gz.part
// part-r-00001.gz.parquet
// ^~~~~
val partFilePattern = """part-.-(\d{1,}).*""".r

Expand All @@ -86,9 +82,8 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
case name => sys.error(
s"""Trying to write Parquet files to directory $outputPath,
|but found items with illegal name "$name"
""".stripMargin.replace('\n', ' ').trim)
s"Trying to write Parquet files to directory $outputPath, " +
s"but found items with illegal name '$name'.")
}.reduceOption(_ max _).getOrElse(0)
} else {
0
Expand All @@ -111,37 +106,39 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
}
}

recordWriter = outputFormat.getRecordWriter(context)
taskAttemptContext = context
outputFormat.getRecordWriter(context)
}

override def write(row: Row): Unit = recordWriter.write(null, row)

override def close(): Unit = recordWriter.close(taskAttemptContext)
override def close(): Unit = recordWriter.close(context)
}

private[sql] class FSBasedParquetRelation(
paths: Array[String],
private[sql] class ParquetRelation2(
override val paths: Array[String],
private val maybeDataSchema: Option[StructType],
private val maybePartitionSpec: Option[PartitionSpec],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends FSBasedRelation(paths, maybePartitionSpec)
extends HadoopFsRelation(maybePartitionSpec)
with Logging {

// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean

private val maybeMetastoreSchema = parameters
.get(FSBasedParquetRelation.METASTORE_SCHEMA)
.get(ParquetRelation2.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])

private val metadataCache = new MetadataCache
metadataCache.refresh()
private lazy val metadataCache: MetadataCache = {
val meta = new MetadataCache
meta.refresh()
meta
}

override def equals(other: scala.Any): Boolean = other match {
case that: FSBasedParquetRelation =>
case that: ParquetRelation2 =>
val schemaEquality = if (shouldMergeSchemas) {
this.shouldMergeSchemas == that.shouldMergeSchemas
} else {
Expand Down Expand Up @@ -175,8 +172,6 @@ private[sql] class FSBasedParquetRelation(
}
}

override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]

override def dataSchema: StructType = metadataCache.dataSchema

override private[sql] def refresh(): Unit = {
Expand All @@ -187,9 +182,12 @@ private[sql] class FSBasedParquetRelation(
// Parquet data source always uses Catalyst internal representations.
override val needConversion: Boolean = false

override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum

override def userDefinedPartitionColumns: Option[StructType] =
maybePartitionSpec.map(_.partitionColumns)

override def prepareForWrite(job: Job): Unit = {
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)

val committerClass =
Expand Down Expand Up @@ -224,6 +222,13 @@ private[sql] class FSBasedParquetRelation(
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toUpperCase,
CompressionCodecName.UNCOMPRESSED).name())

new OutputWriterFactory {
override def newInstance(
path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
new ParquetOutputWriter(path, context)
}
}
}

override def buildScan(
Expand Down Expand Up @@ -385,7 +390,7 @@ private[sql] class FSBasedParquetRelation(
// case insensitivity issue and possible schema mismatch (probably caused by schema
// evolution).
maybeMetastoreSchema
.map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
.map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
.getOrElse(dataSchema0)
}
}
Expand Down Expand Up @@ -439,12 +444,12 @@ private[sql] class FSBasedParquetRelation(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")

FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
}
}
}

private[sql] object FSBasedParquetRelation extends Logging {
private[sql] object ParquetRelation2 extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
private[sql] val MERGE_SCHEMA = "mergeSchema"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, _) => t.buildScan(a)) :: Nil

// Scanning partitioned FSBasedRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation))
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray

Expand Down Expand Up @@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
selectedPartitions) :: Nil

// Scanning non-partitioned FSBasedRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) =>
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
Expand All @@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil

case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty =>
l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty =>
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
execution.ExecutedCommand(
InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil
InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil

case _ => Nil
}
Expand All @@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
partitionColumns: StructType,
partitions: Array[Partition]) = {
val output = projections.map(_.toAttribute)
val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation]
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String)
private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])

private[sql] object PartitioningUtils {
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
// depend on Hive.
private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"

private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
require(columnNames.size == literals.size)
}
Expand Down
23 changes: 10 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource(
}
}

private[sql] case class InsertIntoFSBasedRelation(
@transient relation: FSBasedRelation,
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
partitionColumns: Array[String],
mode: SaveMode)
Expand Down Expand Up @@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation(
insert(new DefaultWriterContainer(relation, job), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__")
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
Expand Down Expand Up @@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation(
}

private[sql] abstract class BaseWriterContainer(
@transient val relation: FSBasedRelation,
@transient val relation: HadoopFsRelation,
@transient job: Job)
extends SparkHadoopMapReduceUtil
with Logging
Expand All @@ -261,15 +261,15 @@ private[sql] abstract class BaseWriterContainer(

protected val dataSchema = relation.dataSchema

protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass
protected var outputWriterFactory: OutputWriterFactory = _

private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _

def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
relation.prepareForWrite(job)
outputWriterFactory = relation.prepareJobForWrite(job)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupJob(jobContext)
Expand Down Expand Up @@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer(
}

private[sql] class DefaultWriterContainer(
@transient relation: FSBasedRelation,
@transient relation: HadoopFsRelation,
@transient job: Job)
extends BaseWriterContainer(relation, job) {

@transient private var writer: OutputWriter = _

override protected def initWriters(): Unit = {
writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
writer.init(getWorkPath, dataSchema, taskAttemptContext)
writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
}

override def outputWriterForRow(row: Row): OutputWriter = writer
Expand All @@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer(
}

private[sql] class DynamicPartitionWriterContainer(
@transient relation: FSBasedRelation,
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
defaultPartitionName: String)
Expand All @@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer(

outputWriters.getOrElseUpdate(partitionPath, {
val path = new Path(getWorkPath, partitionPath)
val writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path",
new Path(outputPath, partitionPath).toString)
writer.init(path.toString, dataSchema, taskAttemptContext)
writer
outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
})
}

Expand Down
Loading

0 comments on commit 0445be5

Please sign in to comment.