Skip to content

Commit

Permalink
Addresses comments from @marmbrus
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 12, 2015
1 parent 7552168 commit c71ac6c
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 24 deletions.
3 changes: 3 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def defaultDataSourceName: String =
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")

private[spark] def partitionDiscoveryEnabled() =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
translate(child).map(sources.Not)

case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringStartsWith(a.name, v.toString()))
Some(sources.StringStartsWith(a.name, v.toString))

case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringEndsWith(a.name, v.toString()))
Some(sources.StringEndsWith(a.name, v.toString))

case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringContains(a.name, v.toString()))
Some(sources.StringContains(a.name, v.toString))

case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

package org.apache.spark.sql.sources

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql._

/**
* ::DeveloperApi::
Expand Down Expand Up @@ -87,7 +89,7 @@ trait SchemaRelationProvider {
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source
* with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a
* USING clause specified (to specify the implemented SchemaRelationProvider), a user defined
* USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined
* schema, and an optional list of partition columns, this interface is used to pass in the
* parameters specified by a user.
*
Expand All @@ -114,7 +116,7 @@ trait FSBasedRelationProvider {
sqlContext: SQLContext,
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): BaseRelation
parameters: Map[String, String]): FSBasedRelation
}

@DeveloperApi
Expand Down Expand Up @@ -282,12 +284,13 @@ abstract class OutputWriter {
* Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
* the task output is committed.
*/
def close(): Unit = ()
def close(): Unit
}

/**
* ::Experimental::
* A [[BaseRelation]] that abstracts file system based data sources.
* A [[BaseRelation]] that provides much of the common code required for formats that store their
* data to an HDFS compatible filesystem.
*
* For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
* filter using selected predicates before producing an RDD containing all matching tuples as
Expand Down Expand Up @@ -338,16 +341,13 @@ abstract class FSBasedRelation private[sql](
private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
spec.copy(partitionColumns = spec.partitionColumns.asNullable)
}.getOrElse {
if (partitionDiscoverEnabled()) {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
discoverPartitions()
} else {
PartitionSpec(StructType(Nil), Array.empty[Partition])
}
}

private def partitionDiscoverEnabled() =
sqlContext.conf.getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean

private[sql] def partitionSpec: PartitionSpec = _partitionSpec

/**
Expand All @@ -356,7 +356,7 @@ abstract class FSBasedRelation private[sql](
def partitionColumns: StructType = partitionSpec.partitionColumns

private[sql] def refresh(): Unit = {
if (partitionDiscoverEnabled()) {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
_partitionSpec = discoverPartitions()
}
}
Expand All @@ -365,11 +365,10 @@ abstract class FSBasedRelation private[sql](
val basePaths = paths.map(new Path(_))
val leafDirs = basePaths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path))
} else {
Seq.empty[FileStatus]
}
Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory)))
.filter(_.isDir)
.map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
.getOrElse(Seq.empty[FileStatus])
}.map(_.getPath)

if (leafDirs.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
}

case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) =>
// OK

case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) =>
// OK

case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK
case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK
case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetTest
import org.apache.spark.sql.types._

// TODO Don't extend ParquetTest
// This test suite extends ParquetTest for some convenient utility methods. These methods should be
// moved to some more general places, maybe QueryTest.
class FSBasedRelationSuite extends QueryTest with ParquetTest {
Expand Down

0 comments on commit c71ac6c

Please sign in to comment.