Skip to content

Commit

Permalink
[SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sou…
Browse files Browse the repository at this point in the history
…rces API

This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path.  Existing data sources like JSON and Parquet can be simplified with this work.

## New features provided

1. Hive compatible partition discovery

   This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.

1. Generalized partition pruning optimization

   Now partition pruning is handled during physical planning phase.  Specific data sources don't need to worry about this harness anymore.

   (This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)

1. Insertion with dynamic partitions

   When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns.

## New structures provided

### Developer API

1. `FSBasedRelation`

   Base abstract class for file system based data sources.

1. `OutputWriter`

   Base abstract class for output row writers, responsible for writing a single row object.

1. `FSBasedRelationProvider`

   A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`.

### User API

New overloaded versions of

1. `DataFrame.save()`
1. `DataFrame.saveAsTable()`
1. `SQLContext.load()`

are provided to allow users to save/load DataFrames with user defined dynamic partition columns.

### Spark SQL query planning

1. `InsertIntoFSBasedRelation`

   Used to implement write path for `FSBasedRelation`s.

1. New rules for `FSBasedRelation` in `DataSourceStrategy`

   These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning.

## TODO

- [ ] Use scratch directories when overwriting a table with data selected from itself.

      Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.

- [ ] When inserting with dynamic partition columns, use external sorter to group the data first.

      This ensures that we only need to open a single `OutputWriter` at a time.  For data sources like Parquet, `OutputWriter`s can be quite memory consuming.  One issue is that, this approach breaks the row distribution in the original DataFrame.  However, we did't promise to preserve data distribution when writing a DataFrame.

- [x] More tests.  Specifically, test cases for

      - [x] Self-join
      - [x] Loading partitioned relations with a subset of partition columns stored in data files.
      - [x] `SQLContext.load()` with user defined dynamic partition columns.

## Parquet data source migration

Parquet data source migration is covered in PR liancheng#6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.

Author: Cheng Lian <[email protected]>

Closes apache#5526 from liancheng/partitioning-support and squashes the following commits:

5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing
1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations
43ba50e [Cheng Lian] Avoids serializing generated projection code
edf49e7 [Cheng Lian] Removed commented stale code block
348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths)
ad4d4de [Cheng Lian] Enables HDFS style globbing
8d12e69 [Cheng Lian] Fixes compilation error
c71ac6c [Cheng Lian] Addresses comments from @marmbrus
7552168 [Cheng Lian] Fixes typo in MimaExclude.scala
0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing
52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala
c466de6 [Cheng Lian] Addresses comments
bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows
795920a [Cheng Lian] Fixes compilation error after rebasing
0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables
fa543f3 [Cheng Lian] Addresses comments
5849dd0 [Cheng Lian] Fixes doc typos.  Fixes partition discovery refresh.
51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
c4ed4fe [Cheng Lian] Bug fixes and a new test suite
a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat
54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used
be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init
0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers
422ff4a [Cheng Lian] Fixes style issue
ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns
8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations
ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet
f18dec2 [Cheng Lian] More strict schema checking
b746ab5 [Cheng Lian] More tests
9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing
ea6c8dd [Cheng Lian] Removes remote debugging stuff
327bb1d [Cheng Lian] Implements partitioning support for data sources API
3c5073a [Cheng Lian] Fixes SaveModes used in test cases
fb5a607 [Cheng Lian] Fixes compilation error
9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor
5de194a [Cheng Lian] Forgot Apache licence header
95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
770b5ba [Cheng Lian] Adds tests for FSBasedRelation
3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning
1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation
aa8ba9a [Cheng Lian] Javadoc fix
012ed2d [Cheng Lian] Adds PartitioningOptions
7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support
  • Loading branch information
liancheng authored and jeanlyn committed May 28, 2015
1 parent 50c9e3d commit b4103f6
Show file tree
Hide file tree
Showing 21 changed files with 2,042 additions and 255 deletions.
52 changes: 41 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.{Logging, SparkConf, SparkException}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -199,13 +199,43 @@ class SparkHadoopUtil extends Logging {
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
def recurse(path: Path): Array[FileStatus] = {
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
listLeafStatuses(fs, fs.getFileStatus(basePath))
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}

val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
}

def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
listLeafDirStatuses(fs, fs.getFileStatus(basePath))
}

def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
}

assert(baseStatus.isDir)
recurse(baseStatus)
}

def globPath(pattern: Path): Seq[Path] = {
val fs = pattern.getFileSystem(conf)
Option(fs.globStatus(pattern)).map { statuses =>
statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
}.getOrElse(Seq.empty[Path])
}

/**
Expand Down
14 changes: 13 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,23 @@ object MimaExcludes {
// This `protected[sql]` method was removed in 1.3.1
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.sql.SQLContext.checkAnalysis"),
// This `private[sql]` class was removed in 1.4.0:
// These `private[sql]` class were removed in 1.4.0:
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.execution.AddExchange"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.execution.AddExchange$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.PartitionSpec"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.Partition"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.Partition$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues$"),
// These test support classes were moved out of src/main and into src/test:
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTestData"),
Expand Down
107 changes: 96 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import com.fasterxml.jackson.core.JsonFactory

import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, ResolvedStar}
import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
import org.apache.spark.sql.jdbc.JDBCWriteDetails
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -400,7 +400,9 @@ class DataFrame private[sql](
joined.left,
joined.right,
joinType = Inner,
Some(EqualTo(joined.left.resolve(usingColumn), joined.right.resolve(usingColumn))))
Some(expressions.EqualTo(
joined.left.resolve(usingColumn),
joined.right.resolve(usingColumn))))
)
}

Expand Down Expand Up @@ -465,8 +467,8 @@ class DataFrame private[sql](
// By the time we get here, since we have already run analysis, all attributes should've been
// resolved and become AttributeReference.
val cond = plan.condition.map { _.transform {
case EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) =>
EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
case expressions.EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) =>
expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
}}
plan.copy(condition = cond)
}
Expand Down Expand Up @@ -1324,6 +1326,28 @@ class DataFrame private[sql](
saveAsTable(tableName, source, mode, options.toMap)
}

/**
* :: Experimental ::
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of
* partition columns.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: java.util.Map[String, String],
partitionColumns: java.util.List[String]): Unit = {
saveAsTable(tableName, source, mode, options.toMap, partitionColumns)
}

/**
* :: Experimental ::
* (Scala-specific)
Expand All @@ -1350,13 +1374,44 @@ class DataFrame private[sql](
tableName,
source,
temporary = false,
Array.empty[String],
mode,
options,
logicalPlan)

sqlContext.executePlan(cmd).toRdd
}

/**
* :: Experimental ::
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of
* partition columns.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: Map[String, String],
partitionColumns: Seq[String]): Unit = {
sqlContext.executePlan(
CreateTableUsingAsSelect(
tableName,
source,
temporary = false,
partitionColumns.toArray,
mode,
options,
logicalPlan)).toRdd
}

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path,
Expand Down Expand Up @@ -1417,6 +1472,21 @@ class DataFrame private[sql](
save(source, mode, options.toMap)
}

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
* @group output
*/
@Experimental
def save(
source: String,
mode: SaveMode,
options: java.util.Map[String, String],
partitionColumns: java.util.List[String]): Unit = {
save(source, mode, options.toMap, partitionColumns)
}

/**
* :: Experimental ::
* (Scala-specific)
Expand All @@ -1429,7 +1499,22 @@ class DataFrame private[sql](
source: String,
mode: SaveMode,
options: Map[String, String]): Unit = {
ResolvedDataSource(sqlContext, source, mode, options, this)
ResolvedDataSource(sqlContext, source, Array.empty[String], mode, options, this)
}

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
* @group output
*/
@Experimental
def save(
source: String,
mode: SaveMode,
options: Map[String, String],
partitionColumns: Seq[String]): Unit = {
ResolvedDataSource(sqlContext, source, partitionColumns.toArray, mode, options, this)
}

/**
Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ private[spark] object SQLConf {
// to its length exceeds the threshold.
val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"

// Whether to perform partition discovery when loading external data sources. Default to true.
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"

// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
Expand Down Expand Up @@ -241,6 +244,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def defaultDataSourceName: String =
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")

private[spark] def partitionDiscoveryEnabled() =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int =
Expand Down
36 changes: 34 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def load(source: String, options: Map[String, String]): DataFrame = {
val resolved = ResolvedDataSource(this, None, source, options)
val resolved = ResolvedDataSource(this, None, Array.empty[String], source, options)
DataFrame(this, LogicalRelation(resolved.relation))
}

Expand All @@ -781,6 +781,37 @@ class SQLContext(@transient val sparkContext: SparkContext)
load(source, schema, options.toMap)
}

/**
* :: Experimental ::
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
*/
@Experimental
def load(
source: String,
schema: StructType,
partitionColumns: Array[String],
options: java.util.Map[String, String]): DataFrame = {
load(source, schema, partitionColumns, options.toMap)
}

/**
* :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
* @group genericdata
*/
@Experimental
def load(
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
val resolved = ResolvedDataSource(this, Some(schema), Array.empty[String], source, options)
DataFrame(this, LogicalRelation(resolved.relation))
}

/**
* :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
Expand All @@ -791,8 +822,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
def load(
source: String,
schema: StructType,
partitionColumns: Array[String],
options: Map[String, String]): DataFrame = {
val resolved = ResolvedDataSource(this, Some(schema), source, options)
val resolved = ResolvedDataSource(this, Some(schema), partitionColumns, source, options)
DataFrame(this, LogicalRelation(resolved.relation))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case c: CreateTableUsing if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")

case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) =>
val cmd =
CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query)
case CreateTableUsingAsSelect(tableName, provider, true, partitionsCols, mode, opts, query)
if partitionsCols.nonEmpty =>
sys.error("Cannot create temporary partitioned table.")

case CreateTableUsingAsSelect(tableName, provider, true, _, mode, opts, query) =>
val cmd = CreateTempTableUsingAsSelect(
tableName, provider, Array.empty[String], mode, opts, query)
ExecutedCommand(cmd) :: Nil
case c: CreateTableUsingAsSelect if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
Expand Down
Loading

0 comments on commit b4103f6

Please sign in to comment.