Skip to content

Commit

Permalink
Fixes HadoopFsRelation partition columns initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 15, 2015
1 parent 2fc680a commit 37d1738
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,18 +366,24 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio

private val codegenEnabled = sqlContext.conf.codegenEnabled

private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
spec.copy(partitionColumns = spec.partitionColumns.asNullable)
}.getOrElse {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
discoverPartitions()
} else {
PartitionSpec(StructType(Nil), Array.empty[Partition])
private var _partitionSpec: PartitionSpec = _

final private[sql] def partitionSpec: PartitionSpec = {
if (_partitionSpec == null) {
_partitionSpec = maybePartitionSpec
.map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
.orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
.getOrElse {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
discoverPartitions()
} else {
PartitionSpec(StructType(Nil), Array.empty[Partition])
}
}
}
_partitionSpec
}

final private[sql] def partitionSpec: PartitionSpec = _partitionSpec

/**
* Base paths of this relation. For partitioned relations, it should be either root directories
* of all partition directories.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class SimpleTextSource extends HadoopFsRelationProvider {
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField]))
new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext)
new SimpleTextRelation(paths, schema, partitionColumns, parameters)(sqlContext)
}
}

Expand Down Expand Up @@ -77,17 +76,15 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends
* option `"dataSchema"`.
*/
class SimpleTextRelation(
val paths: Array[String],
override val paths: Array[String],
val maybeDataSchema: Option[StructType],
partitionsSchema: StructType,
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
extends HadoopFsRelation {

import sqlContext.sparkContext

override def userDefinedPartitionColumns: Option[StructType] = Some(partitionsSchema)

override val dataSchema: StructType =
maybeDataSchema.getOrElse(DataType.fromJson(parameters("dataSchema")).asInstanceOf[StructType])

Expand Down

0 comments on commit 37d1738

Please sign in to comment.