Skip to content

Commit

Permalink
[SPARK-17356][SQL] Fix out of memory issue when generating JSON for T…
Browse files Browse the repository at this point in the history
…reeNode

## What changes were proposed in this pull request?

class `org.apache.spark.sql.types.Metadata` is widely used in mllib to store some ml attributes. `Metadata` is commonly stored in `Alias` expression.

```
case class Alias(child: Expression, name: String)(
    val exprId: ExprId = NamedExpression.newExprId,
    val qualifier: Option[String] = None,
    val explicitMetadata: Option[Metadata] = None,
    override val isGenerated: java.lang.Boolean = false)
```

The `Metadata` can take a big memory footprint since the number of attributes is big ( in scale of million). When `toJSON` is called on `Alias` expression, the `Metadata` will also be converted to a big JSON string.
If a plan contains many such kind of `Alias` expressions, it may trigger out of memory error when `toJSON` is called, since converting all `Metadata` references to JSON will take huge memory.

With this PR, we will skip scanning Metadata when doing JSON conversion. For a reproducer of the OOM, and analysis, please look at jira https://issues.apache.org/jira/browse/SPARK-17356.

## How was this patch tested?

Existing tests.

Author: Sean Zhong <[email protected]>

Closes #14915 from clockfly/json_oom.

(cherry picked from commit 6f13aa7)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
clockfly authored and cloud-fan committed Sep 6, 2016
1 parent 286ccd6 commit c0f1f53
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case s: String => JString(s)
case u: UUID => JString(u.toString)
case dt: DataType => dt.jsonValue
case m: Metadata => m.jsonValue
// SPARK-17356: In usage of mllib, Metadata may store a huge vector of data, transforming
// it to JSON may trigger OutOfMemoryError.
case m: Metadata => Metadata.empty.jsonValue
case s: StorageLevel =>
("useDisk" -> s.useDisk) ~ ("useMemory" -> s.useMemory) ~ ("useOffHeap" -> s.useOffHeap) ~
("deserialized" -> s.deserialized) ~ ("replication" -> s.replication)
Expand Down
10 changes: 9 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.streaming.MemoryPlan
import org.apache.spark.sql.types.ObjectType
import org.apache.spark.sql.types.{Metadata, ObjectType}


abstract class QueryTest extends PlanTest {
Expand Down Expand Up @@ -267,6 +267,14 @@ abstract class QueryTest extends PlanTest {
val normalized1 = logicalPlan.transformAllExpressions {
case udf: ScalaUDF => udf.copy(function = null)
case gen: UserDefinedGenerator => gen.copy(function = null)
// After SPARK-17356: the JSON representation no longer has the Metadata. We need to remove
// the Metadata from the normalized plan so that we can compare this plan with the
// JSON-deserialzed plan.
case a @ Alias(child, name) if a.explicitMetadata.isDefined =>
Alias(child, name)(a.exprId, a.qualifier, Some(Metadata.empty), a.isGenerated)
case a: AttributeReference if a.metadata != Metadata.empty =>
AttributeReference(a.name, a.dataType, a.nullable, Metadata.empty)(a.exprId, a.qualifier,
a.isGenerated)
}

// RDDs/data are not serializable to JSON, so we need to collect LogicalPlans that contains
Expand Down

0 comments on commit c0f1f53

Please sign in to comment.