Skip to content

Commit

Permalink
Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPa…
Browse files Browse the repository at this point in the history
…ths)
  • Loading branch information
liancheng committed May 12, 2015
1 parent ad4d4de commit 348a922
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.{Row, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.{StructField, StructType}

/**
Expand Down Expand Up @@ -339,6 +340,8 @@ abstract class FSBasedRelation private[sql](

private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)

private val codegenEnabled = sqlContext.conf.codegenEnabled

private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
spec.copy(partitionColumns = spec.partitionColumns.asNullable)
}.getOrElse {
Expand Down Expand Up @@ -421,7 +424,25 @@ abstract class FSBasedRelation private[sql](
* selected partition.
*/
def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = {
buildScan(inputPaths)
// Yeah, to workaround serialization...
val dataSchema = this.dataSchema
val codegenEnabled = this.codegenEnabled

val requiredOutput = requiredColumns.map { col =>
val field = dataSchema(col)
BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable)
}.toSeq

val buildProjection = if (codegenEnabled) {
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
} else {
() => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes)
}

buildScan(inputPaths).mapPartitions { rows =>
val mutableProjection = buildProjection()
rows.map(mutableProjection)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,29 @@ class SimpleTextRelation(
override def outputWriterClass: Class[_ <: OutputWriter] =
classOf[SimpleTextOutputWriter]

override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String]): RDD[Row] = {
val fields = dataSchema.fields
sparkContext.textFile(inputPaths.mkString(",")).map { record =>
val valueMap = record.split(",").zip(fields).map {
case (value, StructField(name, dataType, _, _)) =>
name -> Cast(Literal(value), dataType).eval()
}.toMap
override def buildScan(inputPaths: Array[String]): RDD[Row] = {
val fields = dataSchema.map(_.dataType)

// This mocks a simple projection
Row.fromSeq(requiredColumns.map(valueMap).toSeq)
sparkContext.textFile(inputPaths.mkString(",")).map { record =>
Row(record.split(",").zip(fields).map { case (value, dataType) =>
Cast(Literal(value), dataType).eval()
}: _*)
}
}

// override def buildScan(
// requiredColumns: Array[String],
// filters: Array[Filter],
// inputPaths: Array[String]): RDD[Row] = {
// val fields = dataSchema.fields
// sparkContext.textFile(inputPaths.mkString(",")).map { record =>
// val valueMap = record.split(",").zip(fields).map {
// case (value, StructField(name, dataType, _, _)) =>
// name -> Cast(Literal(value), dataType).eval()
// }.toMap
//
// // This mocks a simple projection
// Row.fromSeq(requiredColumns.map(valueMap).toSeq)
// }
// }
}

0 comments on commit 348a922

Please sign in to comment.