diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala index 22941edef2d46..4c5fb3f45bf49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala @@ -47,7 +47,7 @@ object NewRelationInstances extends Rule[LogicalPlan] { .toSet plan transform { - case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance + case l: MultiInstanceRelation if multiAppearance.contains(l) => l.newInstance() } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 15da0dce91a10..dde2a21345e3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.ScalaReflection - import scala.language.implicitConversions import scala.reflect.ClassTag @@ -26,17 +24,19 @@ import com.fasterxml.jackson.core.JsonFactory import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr} import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.json.JsonRDD import org.apache.spark.sql.types.{NumericType, StructType} class DataFrame( val sqlContext: SQLContext, - val logicalPlan: LogicalPlan, + val baseLogicalPlan: LogicalPlan, operatorsEnabled: Boolean) extends DataFrameSpecificApi with RDDApi[Row] { @@ -46,7 +46,16 @@ class DataFrame( def this(sqlContext: SQLContext, plan: LogicalPlan) = this(sqlContext, plan, true) @transient - protected[sql] lazy val queryExecution = sqlContext.executePlan(logicalPlan) + protected[sql] lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan) + + @transient protected[sql] val logicalPlan: LogicalPlan = baseLogicalPlan match { + // For various commands (like DDL) and queries with side effects, we force query optimization to + // happen right away to let these side effects take place eagerly. + case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile => + LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) + case _ => + baseLogicalPlan + } private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): DataFrame = { new DataFrame(sqlContext, logicalPlan, true) @@ -123,8 +132,8 @@ class DataFrame( override def as(name: String): DataFrame = Subquery(name, logicalPlan) @scala.annotation.varargs - override def select(cols: Column*): DataFrame = { - val exprs = cols.zipWithIndex.map { + override def select(col: Column, cols: Column*): DataFrame = { + val exprs = (col +: cols).zipWithIndex.map { case (Column(expr: NamedExpression), _) => expr case (Column(expr: Expression), _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index 47c73692f304b..829ba9d25c203 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -85,7 +85,7 @@ trait DataFrameSpecificApi { def apply(projection: Product): DataFrame @scala.annotation.varargs - def select(cols: Column*): DataFrame + def select(col: Column, cols: Column*): DataFrame /** Filtering */ def apply(condition: Column): DataFrame = filter(condition) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 89f1e6d377102..ad4223fd2de00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -102,7 +102,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest { sparkContext .parallelize(0 to 1000) .map(i => Tuple1(i / 100.0)) - .select('_1 cast decimal) + .select($"_1" cast decimal) for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) { withTempPath { dir =>