Skip to content

Commit

Permalink
[SPARK-2094][SQL] "Exactly once" semantics for DDL and command statem…
Browse files Browse the repository at this point in the history
…ents

## Related JIRA issues

- Main issue:

  - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands

- Issues resolved as dependencies:

  - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses
  - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE
  - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to

- Other related issue:

  - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view

    Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue.

## PR Overview

This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once.  Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results.

This PR defines schemas for the following DDL/commands:

- EXPLAIN command

  - `plan`: String, the plan explanation

- SET command

  - `key`: String, the key(s) of the propert(y/ies) being set or queried
  - `value`: String, the value(s) of the propert(y/ies) being queried

- Other Hive native command

  - `result`: String, execution result returned by Hive

  **NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future.

## Examples

### EXPLAIN command

Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL:

```
scala> loadTestTable("src")
...

scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key")
...
q0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExplainCommandPhysical [plan#11:0]
 Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L]
  Exchange (HashPartitioning [key#4:0], 200)
   Exchange (HashPartitioning [key#4:0], 200)
    Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L]
     HiveTableScan [key#4], (MetastoreRelation default, src, None), None

scala> q0.select('plan).collect()
...
[ExplainCommandPhysical [plan#24:0]
 Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L]
  Exchange (HashPartitioning [key#17:0], 200)
   Exchange (HashPartitioning [key#17:0], 200)
    Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L]
     HiveTableScan [key#17], (MetastoreRelation default, src, None), None]

scala>
```

### SET command

In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL:

```
scala> val q1 = hql("SET")
...
q1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[7] at RDD at SchemaRDD.scala:98
== Query Plan ==
<SET command: executed by Hive, and noted by SQLContext>

scala> q1.registerAsTable("properties")

scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println)
...
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
 Project [key#51:0,value#52:1]
  SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents
14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
 Project [key#51:0,value#52:1]
  SetCommandPhysical None, None, [key#55:0,value#56:1])
...
[datanucleus.autoCreateSchema,true]
[datanucleus.autoStartMechanismMode,checked]
[datanucleus.cache.level2,false]
[datanucleus.cache.level2.type,none]
[datanucleus.connectionPoolingType,BONECP]
[datanucleus.fixedDatastore,false]
[datanucleus.identifierFactory,datanucleus1]
[datanucleus.plugin.pluginRegistryBundleCheck,LOG]
[datanucleus.rdbms.useLegacyNativeValueStrategy,true]
[datanucleus.storeManagerType,rdbms]

scala>
```

### "Exactly once" semantics

At last, an example of the "exactly once" semantics:

```
scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)")
...
q2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[28] at RDD at SchemaRDD.scala:98
== Query Plan ==
<Native command: executed by Hive>

scala> table("t1")
...
res9: org.apache.spark.sql.SchemaRDD =
SchemaRDD[32] at RDD at SchemaRDD.scala:98
== Query Plan ==
HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None

scala> q2.collect()
...
res10: Array[org.apache.spark.sql.Row] = Array([])

scala>
```

As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution.

Author: Cheng Lian <[email protected]>

Closes apache#1071 from liancheng/exactlyOnceCommand and squashes the following commits:

d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair
f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs
1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs
5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching
48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row]
cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands
74789c1 [Cheng Lian] Fixed failing test cases
0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
  • Loading branch information
liancheng authored and marmbrus committed Jun 13, 2014
1 parent 1c2fd01 commit ac96d96
Show file tree
Hide file tree
Showing 15 changed files with 251 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
import org.apache.spark.sql.catalyst.types.StringType

/**
Expand All @@ -26,35 +26,37 @@ import org.apache.spark.sql.catalyst.types.StringType
*/
abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
def output: Seq[Attribute] = Seq.empty
}

/**
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
* commands that are passed directly to another system.
*/
case class NativeCommand(cmd: String) extends Command
case class NativeCommand(cmd: String) extends Command {
override def output =
Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
}

/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)()
)
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
}

/**
* Returned by a parser when the users only wants to see what query plan would be executed, without
* actually performing the execution.
*/
case class ExplainCommand(plan: LogicalPlan) extends Command {
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
override def output =
Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
}

/**
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {

comparePlans(optimized, correctAnswer)
}

test("joins: push down left outer join #1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
Expand Down
39 changes: 5 additions & 34 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.columnar.InMemoryRelation
Expand Down Expand Up @@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = {
val result = new SchemaRDD(this, parseSql(sqlText))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))

/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
Expand Down Expand Up @@ -259,8 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] val planner = new SparkPlanner

@transient
protected[sql] lazy val emptyResult =
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)

/**
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
Expand All @@ -280,35 +272,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected abstract class QueryExecution {
def logical: LogicalPlan

def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
case SetCommand(key, value) =>
// Only this case needs to be executed eagerly. The other cases will
// be taken care of when the actual results are being extracted.
// In the case of HiveContext, sqlConf is overridden to also pass the
// pair into its HiveConf.
if (key.isDefined && value.isDefined) {
set(key.get, value.get)
}
// It doesn't matter what we return here, since this is only used
// to force the evaluation to happen eagerly. To query the results,
// one must use SchemaRDD operations to extract them.
emptyResult
case _ => executedPlan.execute()
}

lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = {
logical match {
case s: SetCommand => eagerlyProcess(s)
case _ => executedPlan.execute()
}
}
lazy val toRdd: RDD[Row] = executedPlan.execute()

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
Expand All @@ -330,7 +301,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* TODO: We only support primitive types, add support for nested types.
*/
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
val schema = rdd.first.map { case (fieldName, obj) =>
val schema = rdd.first().map { case (fieldName, obj) =>
val dataType = obj.getClass match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ import java.util.{Map => JMap}
@AlphaComponent
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient protected[spark] val logicalPlan: LogicalPlan)
@transient val baseLogicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {

def baseSchemaRDD = this
Expand Down
15 changes: 13 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.sql
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkLogicalPlan

/**
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
*/
private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient protected[spark] val logicalPlan: LogicalPlan
@transient val baseLogicalPlan: LogicalPlan

private[sql] def baseSchemaRDD: SchemaRDD

Expand All @@ -48,7 +49,17 @@ private[sql] trait SchemaRDDLike {
*/
@transient
@DeveloperApi
lazy val queryExecution = sqlContext.executePlan(logicalPlan)
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
queryExecution.toRdd
SparkLogicalPlan(queryExecution.executedPlan)
case _ =>
baseLogicalPlan
}

override def toString =
s"""${super.toString}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
*/
class JavaSchemaRDD(
@transient val sqlContext: SQLContext,
@transient protected[spark] val logicalPlan: LogicalPlan)
@transient val baseLogicalPlan: LogicalPlan)
extends JavaRDDLike[Row, JavaRDD[Row]]
with SchemaRDDLike {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLConf, SQLContext, execution}
import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -157,7 +157,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
(filters: Seq[Expression]) => {
Expand Down Expand Up @@ -186,7 +186,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
filters,
prunePushedDownFilters,
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
}

case _ => Nil
}
Expand Down Expand Up @@ -250,12 +249,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case class CommandStrategy(context: SQLContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(child) =>
val executedPlan = context.executePlan(child).executedPlan
Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
Seq(execution.ExplainCommand(executedPlan, plan.output)(context))
case logical.CacheCommand(tableName, cache) =>
Seq(execution.CacheCommandPhysical(tableName, cache)(context))
Seq(execution.CacheCommand(tableName, cache)(context))
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}

trait Command {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
(@transient context: SQLContext) extends LeafNode {
def execute(): RDD[Row] = (key, value) match {
// Set value for key k; the action itself would
// have been performed in QueryExecution eagerly.
case (Some(k), Some(v)) => context.emptyResult
case class SetCommand(
key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
context.set(k, v)
Array(k -> v)

// Query the value bound to key k.
case (Some(k), None) =>
val resultString = context.getOption(k) match {
case Some(v) => s"$k=$v"
case None => s"$k is undefined"
}
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
case (Some(k), _) =>
Array(k -> context.getOption(k).getOrElse("<undefined>"))

// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
val pairs = context.getAll
val rows = pairs.map { case (k, v) =>
new GenericRow(Array[Any](s"$k=$v"))
}.toSeq
// Assume config parameters can fit into one split (machine) ;)
context.sparkContext.parallelize(rows, 1)
// The only other case is invalid semantics and is impossible.
case _ => context.emptyResult
context.getAll

case _ =>
throw new IllegalArgumentException()
}

def execute(): RDD[Row] = {
val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
(@transient context: SQLContext) extends UnaryNode {
case class ExplainCommand(
child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends UnaryNode with Command {

// Actually "EXPLAIN" command doesn't cause any side effect.
override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n")

def execute(): RDD[Row] = {
val planString = new GenericRow(Array[Any](child.toString))
context.sparkContext.parallelize(Seq(planString))
val explanation = sideEffectResult.mkString("\n")
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1)
}

override def otherCopyArgs = context :: Nil
Expand All @@ -70,19 +94,20 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
* :: DeveloperApi ::
*/
@DeveloperApi
case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
extends LeafNode {
case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
extends LeafNode with Command {

lazy val commandSideEffect = {
override protected[sql] lazy val sideEffectResult = {
if (doCache) {
context.cacheTable(tableName)
} else {
context.uncacheTable(tableName)
}
Seq.empty[Any]
}

override def execute(): RDD[Row] = {
commandSideEffect
sideEffectResult
context.emptyResult
}

Expand Down
Loading

0 comments on commit ac96d96

Please sign in to comment.