Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26626][SQL] Maximum size for repeatedly substituted aliases in SQL expressions #23556

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,8 @@ object CollapseProject extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the purpose to give up this rule basically? Why don't we consider using spark.sql.optimizer.excludedRules? I think it's more general way to resolve such issues.

Copy link
Author

@j-esse j-esse Mar 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon The purpose is to improve the rule so that it only applies when it will yield a performance improvement (and not apply when it could cause memory issues). This was the preferred solution, since if we excluded the rule entirely we wouldn't benefit from it in the instances where it would be beneficial.

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, basically what you want to do is to give up a rule given a condition because processing huge tree causes OOM issue only in the driver. Am I correct?

What's the diff if we set the threshold spark.sql.maxRepeatedAliasSize to set the specific number based upon the rough estimation vs explicitly excluding the rule by spark.sql.optimizer.excludedRules based on user's rough estimation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon it's not that processing a huge tree causes an OOM, it's that the user can write a small tree, that seems very reasonable to execute, but under the hood the optimiser turns it into a huge tree that OOMs. The user doesn't know beforehand that the optimiser issue is going to happen, in order to disable the rule. It takes a lot of debugging, looking through stack traces, etc, to identify that the OOM is caused by CollapseProject and that you can disable it. Also, we typically run many different queries within a spark session, and wouldn't want to disable CollapseProject for all of them.

This change means that we can still run CollapseProject, we just don't substitute overly large aliases. In the types of query we had problems with, this means that it will collapse the query until the aliases get too large, and then stop. So we still do apply CollapseProject to every query, we just stop substituting any alias the gets too large.

spark.sql.maxRepeatedAliasSize just determines the size of alias tree that is determined to be too large to efficiently substitute multiple times. The default value of 100 was determined by some basic testing to find the best perf balance (see charts at top), but happy to tweak this if you don't htink it's appropriate?

case p1 @ Project(_, p2: Project) =>
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) {
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
hasOversizedRepeatedAliases(p1.projectList, p2.projectList)) {
p1
} else {
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))
Expand Down Expand Up @@ -735,6 +736,28 @@ object CollapseProject extends Rule[LogicalPlan] {
}.exists(!_.deterministic))
}

private def hasOversizedRepeatedAliases(
upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Boolean = {
val aliases = collectAliases(lower)

// Count how many times each alias is used in the upper Project.
// If an alias is only used once, we can safely substitute it without increasing the overall
// tree size
val referenceCounts = AttributeMap(
upper
.flatMap(_.collect { case a: Attribute => a })
.groupBy(identity)
.mapValues(_.size).toSeq
)

// Check for any aliases that are used more than once, and are larger than the configured
// maximum size
aliases.exists({ case (attribute, expression) =>
referenceCounts.getOrElse(attribute, 0) > 1 &&
expression.treeSize > SQLConf.get.maxRepeatedAliasSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about using treeSize as the cost of an expression. UDF can be very expensive even if its treeSize is 1.

How about we simplify it with a blacklist? e.g. UDF is expensive and we shouldn't collapse projects if udf is repeated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't trying to determine the cost of the expression - the cost of the expression is irrelevant here, we're just trying to determine the size of the expression itself (using tree size as a proxy for memory size). That way, if the expression is too large (takes up too much memory) we can prevent OOMs by not de-aliasing it multiple times (and thus greatly increasing the amount of heap the expression tree takes up).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so your fix only care about memory usage of the expressions, instead of execution time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan That's right, the primary concern is memory usage, since the exponential increase in memory usage currently causes crashes (due to OOMs), time outs, and performance issues.

})
}

private def buildCleanedProjectList(
upper: Seq[NamedExpression],
lower: Seq[NamedExpression]): Seq[NamedExpression] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf

/**
* A pattern that matches any number of project or filter operations on top of another relational
Expand Down Expand Up @@ -60,8 +61,13 @@ object PhysicalOperation extends PredicateHelper {
plan match {
case Project(fields, child) if fields.forall(_.deterministic) =>
val (_, filters, other, aliases) = collectProjectsAndFilters(child)
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))
if (hasOversizedRepeatedAliases(fields, aliases)) {
// Skip substitution if it could overly increase the overall tree size and risk OOMs
(None, Nil, plan, Map.empty)
} else {
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))
}

case Filter(condition, child) if condition.deterministic =>
val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
Expand All @@ -79,6 +85,26 @@ object PhysicalOperation extends PredicateHelper {
case a @ Alias(child, _) => a.toAttribute -> child
}.toMap

private def hasOversizedRepeatedAliases(fields: Seq[Expression],
aliases: Map[Attribute, Expression]): Boolean = {
// Count how many times each alias is used in the fields.
// If an alias is only used once, we can safely substitute it without increasing the overall
// tree size
val referenceCounts = AttributeMap(
fields
.flatMap(_.collect { case a: Attribute => a })
.groupBy(identity)
.mapValues(_.size).toSeq
)

// Check for any aliases that are used more than once, and are larger than the configured
// maximum size
aliases.exists({ case (attribute, expression) =>
referenceCounts.getOrElse(attribute, 0) > 1 &&
expression.treeSize > SQLConf.get.maxRepeatedAliasSize
})
}

private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = {
expr.transform {
case a @ Alias(ref: AttributeReference, name) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {

lazy val containsChild: Set[TreeNode[_]] = children.toSet

lazy val treeSize: Long = children.map(_.treeSize).sum + 1

private lazy val _hashCode: Int = scala.util.hashing.MurmurHash3.productHash(this)
override def hashCode(): Int = _hashCode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,15 @@ object SQLConf {
"and java.sql.Date are used for the same purpose.")
.booleanConf
.createWithDefault(false)

val MAX_REPEATED_ALIAS_SIZE =
buildConf("spark.sql.maxRepeatedAliasSize")
.internal()
.doc("The maximum size of alias expression that will be substituted multiple times " +
"(size defined by the number of nodes in the expression tree). " +
"Used by the CollapseProject optimizer, and PhysicalOperation.")
.intConf
.createWithDefault(100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does add something automatic stuff but I don't think this is so much worth since we already have a general mechanism. Note that you can also increases the driver side's memory. How does it relate with this configuration?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon increasing the driver memory unfortunately isn't an option, because due to the exponential tree size explosion, the necessary memory would be much larger than that available on most servers. Also, users wouldn't know that they needed a very large driver memory size, because they can be running small queries over small data.

}

/**
Expand Down Expand Up @@ -2162,6 +2171,8 @@ class SQLConf extends Serializable with Logging {
def setCommandRejectsSparkCoreConfs: Boolean =
getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)

def maxRepeatedAliasSize: Int = getConf(SQLConf.MAX_REPEATED_ALIAS_SIZE)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,22 @@ class CollapseProjectSuite extends PlanTest {
val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as 'c)).analyze
comparePlans(optimized, expected)
}

test("ensure oversize aliases are not repeatedly substituted") {
var query: LogicalPlan = testRelation
for( a <- 1 to 100) {
query = query.select(('a + 'b).as('a), ('a - 'b).as('b))
}
val projects = Optimize.execute(query.analyze).collect { case p: Project => p }
assert(projects.size >= 12)
}

test("ensure oversize aliases are still substituted once") {
var query: LogicalPlan = testRelation
for( a <- 1 to 20) {
query = query.select(('a + 'b).as('a), 'b)
}
val projects = Optimize.execute(query.analyze).collect { case p: Project => p }
assert(projects.size === 1)
}
}