Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3 #20755

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2}
Expand All @@ -42,7 +42,7 @@ object StreamingRelation {
* passing to [[StreamExecution]] to run a query.
*/
case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
extends LeafNode {
extends LeafNode with MultiInstanceRelation {
override def isStreaming: Boolean = true
override def toString: String = sourceName

Expand All @@ -53,6 +53,8 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
override def computeStats(): Statistics = Statistics(
sizeInBytes = BigInt(dataSource.sparkSession.sessionState.conf.defaultSizeInBytes)
)

override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))
}

/**
Expand All @@ -62,7 +64,7 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
case class StreamingExecutionRelation(
source: BaseStreamingSource,
output: Seq[Attribute])(session: SparkSession)
extends LeafNode {
extends LeafNode with MultiInstanceRelation {

override def isStreaming: Boolean = true
override def toString: String = source.toString
Expand All @@ -74,6 +76,8 @@ case class StreamingExecutionRelation(
override def computeStats(): Statistics = Statistics(
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
)

override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
}

// We have to pack in the V1 data source as a shim, for the case when a source implements
Expand All @@ -92,13 +96,15 @@ case class StreamingRelationV2(
extraOptions: Map[String, String],
output: Seq[Attribute],
v1Relation: Option[StreamingRelation])(session: SparkSession)
extends LeafNode {
extends LeafNode with MultiInstanceRelation {
override def isStreaming: Boolean = true
override def toString: String = sourceName

override def computeStats(): Statistics = Statistics(
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
)

override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
}

/**
Expand All @@ -108,7 +114,7 @@ case class ContinuousExecutionRelation(
source: ContinuousReadSupport,
extraOptions: Map[String, String],
output: Seq[Attribute])(session: SparkSession)
extends LeafNode {
extends LeafNode with MultiInstanceRelation {

override def isStreaming: Boolean = true
override def toString: String = source.toString
Expand All @@ -120,6 +126,8 @@ case class ContinuousExecutionRelation(
override def computeStats(): Statistics = Statistics(
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
)

override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution.{FileSourceScanExec, LogicalRDD}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinHelper}
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreProviderId}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -323,6 +325,27 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
assert(e.toString.contains("Stream stream joins without equality predicate is not supported"))
}

test("stream stream self join") {
val input = MemoryStream[Int]
val df = input.toDF
val join =
df.select('value % 5 as "key", 'value as "leftValue").join(
df.select('value % 5 as "key", 'value as "rightValue"), "key")

testStream(join)(
AddData(input, 1, 2),
CheckAnswer((1, 1, 1), (2, 2, 2)),
StopStream,
StartStream(),
AddData(input, 3, 6),
/*
(1, 1) (1, 1)
(2, 2) x (2, 2) = (1, 1, 1), (1, 1, 6), (2, 2, 2), (1, 6, 1), (1, 6, 6)
(1, 6) (1, 6)
*/
CheckAnswer((3, 3, 3), (1, 1, 1), (1, 1, 6), (2, 2, 2), (1, 6, 1), (1, 6, 6)))
}

test("locality preferences of StateStoreAwareZippedRDD") {
import StreamingSymmetricHashJoinHelper._

Expand Down