Skip to content

Commit

Permalink
Adds tests for FSBasedRelation
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 12, 2015
1 parent 3ba9bbf commit 770b5ba
Show file tree
Hide file tree
Showing 2 changed files with 458 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.sources

import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand Down Expand Up @@ -251,17 +249,20 @@ trait CatalystScan {
* opened. This instance is used to persist rows to this single output file.
*/
@Experimental
trait OutputWriter {
abstract class OutputWriter {
def init(): Unit = ()

/**
* Persists a single row. Invoked on the executor side.
* Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
* tables, dynamic partition columns are not included in rows to be written.
*/
def write(row: Row): Unit

/**
* Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
* the task output is committed.
*/
def close(): Unit
def close(): Unit = ()
}

/**
Expand All @@ -270,17 +271,54 @@ trait OutputWriter {
*
* For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
* filter using selected predicates before producing an RDD containing all matching tuples as
* [[Row]] objects.
*
* In addition, when reading from Hive style partitioned tables stored in file systems, it's able to
* discover partitioning information from the paths of input directories, and perform partition
* pruning before start reading the data.
* [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
* systems, it's able to discover partitioning information from the paths of input directories, and
* perform partition pruning before start reading the data.Subclasses of [[FSBasedRelation()]] must
* override one of the three `buildScan` methods to implement the read path.
*
* For the write path, it provides the ability to write to both non-partitioned and partitioned
* tables. Directory layout of the partitioned tables is compatible with Hive.
*/
@Experimental
trait FSBasedRelation extends BaseRelation {
abstract class FSBasedRelation extends BaseRelation {
// Discovers partitioned columns, and merge them with `dataSchema`. All partition columns not
// existed in `dataSchema` should be appended to `dataSchema`.
override val schema: StructType = ???

/**
* Base path of this relation. For partitioned relations, `path` should be the root directory of
* all partition directories.
*/
def path: String

/**
* Specifies schema of actual data files. For partitioned relations, if one or more partitioned
* columns are contained in the data files, they should also appear in `dataSchema`.
*/
def dataSchema: StructType

/**
* Builds an `RDD[Row` containing all rows within this relation.
*
* @param inputPaths Data files to be read. If the underlying relation is partitioned, only data
* files within required partition directories are included.
*/
def buildScan(inputPaths: Array[String]): RDD[Row] = {
throw new RuntimeException(
"At least one buildScan() method should be overridden to read the relation.")
}

/**
* Builds an `RDD[Row` containing all rows within this relation.
*
* @param requiredColumns Required columns.
* @param inputPaths Data files to be read. If the underlying relation is partitioned, only data
* files within required partition directories are included.
*/
def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = {
buildScan(inputPaths)
}

/**
* Builds an `RDD[Row]` containing all rows within this relation.
*
Expand All @@ -295,13 +333,9 @@ trait FSBasedRelation extends BaseRelation {
def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String]): RDD[Row]

/**
* When writing rows to this relation, this method is invoked on the driver side before the actual
* write job is issued. It provides an opportunity to configure the write job to be performed.
*/
def prepareForWrite(conf: Configuration): Unit
inputPaths: Array[String]): RDD[Row] = {
buildScan(requiredColumns, inputPaths)
}

/**
* This method is responsible for producing a new [[OutputWriter]] for each newly opened output
Expand Down
Loading

0 comments on commit 770b5ba

Please sign in to comment.