Skip to content

Commit

Permalink
Adds new interfaces and stub methods for data sources API partitionin…
Browse files Browse the repository at this point in the history
…g support
  • Loading branch information
liancheng committed May 12, 2015
1 parent 4b5e1fe commit 7dd8dd5
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 3 deletions.
30 changes: 30 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,21 @@ class DataFrame private[sql](
save(source, mode, options.toMap)
}

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
* @group output
*/
@Experimental
def save(
source: String,
mode: SaveMode,
options: java.util.Map[String, String],
partitionColumns: java.util.List[String]): Unit = {
???
}

/**
* :: Experimental ::
* (Scala-specific)
Expand All @@ -1432,6 +1447,21 @@ class DataFrame private[sql](
ResolvedDataSource(sqlContext, source, mode, options, this)
}

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
* @group output
*/
@Experimental
def save(
source: String,
mode: SaveMode,
options: Map[String, String],
partitionColumns: Seq[String]): Unit = {
???
}

/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
Expand Down
108 changes: 105 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.sources

import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, DataFrame, Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}

/**
* ::DeveloperApi::
Expand Down Expand Up @@ -78,6 +80,40 @@ trait SchemaRelationProvider {
schema: StructType): BaseRelation
}

/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source
* with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a
* USING clause specified (to specify the implemented SchemaRelationProvider), a user defined
* schema, and an optional list of partition columns, this interface is used to pass in the
* parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
*
* The difference between a [[RelationProvider]] and a [[PartitionedSchemaRelationProvider]] is
* that users need to provide a schema and a (possibly empty) list of partition columns when
* using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]],
* [[SchemaRelationProvider]], and [[PartitionedSchemaRelationProvider]] if it can support schema
* inference, user-specified schemas, and accessing partitioned relations.
*/
trait PartitionedSchemaRelationProvider {
/**
* Returns a new base relation with the given parameters, a user defined schema, and a list of
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
* is enforced by the Map that is passed to the function.
*/
def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType,
partitionColumns: StructType): BaseRelation
}

@DeveloperApi
trait CreatableRelationProvider {
/**
Expand Down Expand Up @@ -207,3 +243,69 @@ trait InsertableRelation {
trait CatalystScan {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}

/**
* ::Experimental::
* [[OutputWriter]] is used together with [[FSBasedPrunedFilteredScan]] for persisting rows to the
* underlying file system. An [[OutputWriter]] instance is created when a new output file is
* opened. This instance is used to persist rows to this single output file.
*/
@Experimental
trait OutputWriter {
/**
* Persists a single row. Invoked on the executor side.
*/
def write(row: Row): Unit

/**
* Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
* the task output is committed.
*/
def close(): Unit
}

/**
* ::Experimental::
* A [[BaseRelation]] that abstracts file system based data sources.
*
* For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
* filter using selected predicates before producing an RDD containing all matching tuples as
* [[Row]] objects.
*
* In addition, when reading from Hive style partitioned tables stored in file systems, it's able to
* discover partitioning information from the paths of input directories, and perform partition
* pruning before start reading the data.
*
* For the write path, it provides the ability to write to both non-partitioned and partitioned
* tables. Directory layout of the partitioned tables is compatible with Hive.
*/
@Experimental
trait FSBasedPrunedFilteredScan extends BaseRelation {
/**
* Builds an `RDD[Row]` containing all rows within this relation.
*
* @param requiredColumns Required columns.
* @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
* of all `filters`. The pushed down filters are currently purely an optimization as they
* will all be evaluated again. This means it is safe to use them with methods that produce
* false positives such as filtering partitions based on a bloom filter.
* @param inputPaths Data files to be read. If the underlying relation is partitioned, only data
* files within required partition directories are included.
*/
def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String]): RDD[Row]

/**
* When writing rows to this relation, this method is invoked on the driver side before the actual
* write job is issued. It provides an opportunity to configure the write job to be performed.
*/
def prepareForWrite(conf: Configuration): Unit

/**
* This method is responsible for producing a new [[OutputWriter]] for each newly opened output
* file on the executor side.
*/
def newOutputWriter(path: String): OutputWriter
}

0 comments on commit 7dd8dd5

Please sign in to comment.