Skip to content

Commit

Permalink
Fixed test cases in SQL except ParquetIOSuite.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jan 27, 2015
1 parent 66d5ef1 commit ce4a5d2
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object NewRelationInstances extends Rule[LogicalPlan] {
.toSet

plan transform {
case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance
case l: MultiInstanceRelation if multiAppearance.contains(l) => l.newInstance()
}
}
}
21 changes: 15 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.ScalaReflection

import scala.language.implicitConversions
import scala.reflect.ClassTag

import com.fasterxml.jackson.core.JsonFactory

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.types.{NumericType, StructType}


class DataFrame(
val sqlContext: SQLContext,
val logicalPlan: LogicalPlan,
val baseLogicalPlan: LogicalPlan,
operatorsEnabled: Boolean)
extends DataFrameSpecificApi with RDDApi[Row] {

Expand All @@ -46,7 +46,16 @@ class DataFrame(
def this(sqlContext: SQLContext, plan: LogicalPlan) = this(sqlContext, plan, true)

@transient
protected[sql] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
protected[sql] lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

@transient protected[sql] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
baseLogicalPlan
}

private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): DataFrame = {
new DataFrame(sqlContext, logicalPlan, true)
Expand Down Expand Up @@ -123,8 +132,8 @@ class DataFrame(
override def as(name: String): DataFrame = Subquery(name, logicalPlan)

@scala.annotation.varargs
override def select(cols: Column*): DataFrame = {
val exprs = cols.zipWithIndex.map {
override def select(col: Column, cols: Column*): DataFrame = {
val exprs = (col +: cols).zipWithIndex.map {
case (Column(expr: NamedExpression), _) =>
expr
case (Column(expr: Expression), _) =>
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait DataFrameSpecificApi {
def apply(projection: Product): DataFrame

@scala.annotation.varargs
def select(cols: Column*): DataFrame
def select(col: Column, cols: Column*): DataFrame

/** Filtering */
def apply(condition: Column): DataFrame = filter(condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
sparkContext
.parallelize(0 to 1000)
.map(i => Tuple1(i / 100.0))
.select('_1 cast decimal)
.select($"_1" cast decimal)

for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
withTempPath { dir =>
Expand Down

0 comments on commit ce4a5d2

Please sign in to comment.