Skip to content

Commit

Permalink
[SPARK-24214][SS] Fix toJSON for StreamingRelationV2/StreamingExecuti…
Browse files Browse the repository at this point in the history
…onRelation/ContinuousExecutionRelation

## What changes were proposed in this pull request?

We should overwrite "otherCopyArgs" to provide the SparkSession parameter otherwise TreeNode.toJSON cannot get the full constructor parameter list.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <[email protected]>

Closes apache#21275 from zsxwing/SPARK-24214.

(cherry picked from commit fd1179c)
Signed-off-by: Shixiong Zhu <[email protected]>
  • Loading branch information
zsxwing committed May 9, 2018
1 parent aba52f4 commit 8889d78
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ case class StreamingExecutionRelation(
output: Seq[Attribute])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {

override def otherCopyArgs: Seq[AnyRef] = session :: Nil
override def isStreaming: Boolean = true
override def toString: String = source.toString

Expand Down Expand Up @@ -97,6 +98,7 @@ case class StreamingRelationV2(
output: Seq[Attribute],
v1Relation: Option[StreamingRelation])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {
override def otherCopyArgs: Seq[AnyRef] = session :: Nil
override def isStreaming: Boolean = true
override def toString: String = sourceName

Expand All @@ -116,6 +118,7 @@ case class ContinuousExecutionRelation(
output: Seq[Attribute])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {

override def otherCopyArgs: Seq[AnyRef] = session :: Nil
override def isStreaming: Boolean = true
override def toString: String = source.toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,21 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
CheckLastBatch(("A", 1)))
}

test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON " +
"should not fail") {
val df = spark.readStream.format("rate").load()
assert(df.logicalPlan.toJSON.contains("StreamingRelationV2"))

testStream(df)(
AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingExecutionRelation"))
)

testStream(df, useV2Sink = true)(
StartStream(trigger = Trigger.Continuous(100)),
AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
)
}

/** Create a streaming DF that only execute one batch in which it returns the given static DF */
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
require(!triggerDF.isStreaming)
Expand Down

0 comments on commit 8889d78

Please sign in to comment.