Skip to content

Commit

Permalink
Fixes doc typos. Fixes partition discovery refresh.
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 12, 2015
1 parent 51be443 commit 5849dd0
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,16 @@ private[sql] object ResolvedDataSource {
case dataSource: SchemaRelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case dataSource: FSBasedRelationProvider =>
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
None
} else {
Some(partitionColumnsSchema(schema, partitionColumns))
}

dataSource.createRelation(
sqlContext,
Some(schema),
Some(partitionColumnsSchema(schema, partitionColumns)),
maybePartitionsSchema,
new CaseInsensitiveMap(options))
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
throw new AnalysisException(s"$className does not allow user-specified schemas.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.sources
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -294,7 +293,7 @@ abstract class OutputWriter {
* filter using selected predicates before producing an RDD containing all matching tuples as
* [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
* systems, it's able to discover partitioning information from the paths of input directories, and
* perform partition pruning before start reading the data.Subclasses of [[FSBasedRelation()]] must
* perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must
* override one of the three `buildScan` methods to implement the read path.
*
* For the write path, it provides the ability to write to both non-partitioned and partitioned
Expand Down Expand Up @@ -329,39 +328,45 @@ abstract class FSBasedRelation private[sql](
/**
* Constructs an [[FSBasedRelation]].
*
* @param paths Base paths of this relation. For partitioned relations, it should be either root
* @param paths Base paths of this relation. For partitioned relations, it should be root
* directories of all partition directories.
*/
def this(paths: Array[String]) = this(paths, None)

private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)

private var _partitionSpec: PartitionSpec = _
refreshPartitions()
private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
spec.copy(partitionColumns = spec.partitionColumns.asNullable)
}.getOrElse {
discoverPartitions()
}

private[sql] def partitionSpec: PartitionSpec = _partitionSpec

/**
* Partition columns. Note that they are always nullable.
*/
def partitionColumns: StructType = partitionSpec.partitionColumns

private[sql] def refresh(): Unit = {
refreshPartitions()
_partitionSpec = discoverPartitions()
}

private def refreshPartitions(): Unit = {
_partitionSpec = maybePartitionSpec.getOrElse {
val basePaths = paths.map(new Path(_))
val leafDirs = basePaths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path))
} else {
Seq.empty[FileStatus]
}
}.map(_.getPath)

if (leafDirs.nonEmpty) {
PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
private def discoverPartitions(): PartitionSpec = {
val basePaths = paths.map(new Path(_))
val leafDirs = basePaths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path))
} else {
PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
Seq.empty[FileStatus]
}
}.map(_.getPath)

if (leafDirs.nonEmpty) {
PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
} else {
PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,34 +269,6 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
}
}

ignore("save()/load() - partitioned table - Append - mismatched partition columns") {
withTempPath { file =>
partitionedTestDF1.save(
source = classOf[SimpleTextSource].getCanonicalName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))

// Using only a subset of all partition columns
intercept[IllegalArgumentException] {
partitionedTestDF2.save(
source = classOf[SimpleTextSource].getCanonicalName,
mode = SaveMode.Append,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1"))
}

// Using different order of partition columns
intercept[IllegalArgumentException] {
partitionedTestDF2.save(
source = classOf[SimpleTextSource].getCanonicalName,
mode = SaveMode.Append,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p2", "p1"))
}
}
}

test("save()/load() - partitioned table - ErrorIfExists") {
withTempDir { file =>
intercept[RuntimeException] {
Expand Down Expand Up @@ -452,7 +424,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
}
}

ignore("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
partitionedTestDF1.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
Expand All @@ -461,21 +433,21 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
partitionColumns = Seq("p1", "p2"))

// Using only a subset of all partition columns
intercept[IllegalArgumentException] {
intercept[Throwable] {
partitionedTestDF2.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
mode = SaveMode.Overwrite,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1"))
}

// Using different order of partition columns
intercept[IllegalArgumentException] {
intercept[Throwable] {
partitionedTestDF2.saveAsTable(
tableName = "t",
source = classOf[SimpleTextSource].getCanonicalName,
mode = SaveMode.Overwrite,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p2", "p1"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ class SimpleTextOutputWriter extends OutputWriter {
class SimpleTextRelation(
paths: Array[String],
val maybeDataSchema: Option[StructType],
val partitionColumns: StructType,
partitionsSchema: StructType,
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
extends FSBasedRelation(paths, partitionColumns) {
extends FSBasedRelation(paths, partitionsSchema) {

import sqlContext.sparkContext

Expand All @@ -106,7 +106,8 @@ class SimpleTextRelation(
case that: SimpleTextRelation =>
this.paths.sameElements(that.paths) &&
this.maybeDataSchema == that.maybeDataSchema &&
this.dataSchema == that.dataSchema
this.dataSchema == that.dataSchema &&
this.partitionColumns == that.partitionColumns

case _ => false
}
Expand Down

0 comments on commit 5849dd0

Please sign in to comment.