Skip to content

Commit

Permalink
Checks for missing attributes and unresolved operator for all types o…
Browse files Browse the repository at this point in the history
…f operator
  • Loading branch information
liancheng committed Mar 23, 2015
1 parent 9f3273b commit 029f9bd
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class CheckAnalysis {
s"filter expression '${f.condition.prettyString}' " +
s"of type ${f.condition.dataType.simpleString} is not a boolean.")

case aggregatePlan@Aggregate(groupingExprs, aggregateExprs, child) =>
case Aggregate(groupingExprs, aggregateExprs, child) =>
def checkValidAggregateExpression(expr: Expression): Unit = expr match {
case _: AggregateExpression => // OK
case e: Attribute if !groupingExprs.contains(e) =>
Expand All @@ -85,13 +85,19 @@ class CheckAnalysis {

cleaned.foreach(checkValidAggregateExpression)

case o if o.children.nonEmpty && o.missingInput.nonEmpty =>
val missingAttributes = o.missingInput.map(_.prettyString).mkString(",")
val input = o.inputSet.map(_.prettyString).mkString(",")
case _ => // Fallbacks to the following checks
}

operator match {
case o if o.children.nonEmpty &&
!o.references.filter(_.name != "grouping__id").subsetOf(o.inputSet) =>
val missingAttributes = (o.references -- o.inputSet).mkString(",")
val input = o.inputSet.mkString(",")

failAnalysis(s"resolved attributes $missingAttributes missing from $input")
failAnalysis(
s"resolved attribute(s) $missingAttributes missing from $input " +
s"in operator ${operator.simpleString}")

// Catch all
case o if !o.resolved =>
failAnalysis(
s"unresolved operator ${operator.simpleString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,21 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(pl(3).dataType == DecimalType.Unlimited)
assert(pl(4).dataType == DoubleType)
}

test("SPARK-6452: CheckAnalysis should throw when Aggregate contains missing attributes") {
val plan =
Aggregate(
Nil,
Alias(Sum(AttributeReference("a", StringType)(exprId = ExprId(1))), "b")() :: Nil,
LocalRelation(
AttributeReference("a", StringType)(exprId = ExprId(2))))

assert(plan.resolved)

val message = intercept[AnalysisException] {
caseSensitiveAnalyze(plan)
}.getMessage

assert(message.contains("resolved attribute(s) a#1 missing from a#2"))
}
}

0 comments on commit 029f9bd

Please sign in to comment.