Skip to content

Commit

Permalink
[SPARK-23408][SS][BRANCH-2.3] Synchronize successive AddData actions …
Browse files Browse the repository at this point in the history
…in Streaming*JoinSuite

## What changes were proposed in this pull request?

**The best way to review this PR is to ignore whitespace/indent changes. Use this link - https://github.com/apache/spark/pull/20650/files?w=1**

The stream-stream join tests add data to multiple sources and expect it all to show up in the next batch. But there's a race condition; the new batch might trigger when only one of the AddData actions has been reached.

Prior attempt to solve this issue by jose-torres in #20646 attempted to simultaneously synchronize on all memory sources together when consecutive AddData was found in the actions. However, this carries the risk of deadlock as well as unintended modification of stress tests (see the above PR for a detailed explanation). Instead, this PR attempts the following.

- A new action called `StreamProgressBlockedActions` that allows multiple actions to be executed while the streaming query is blocked from making progress. This allows data to be added to multiple sources that are made visible simultaneously in the next batch.
- An alias of `StreamProgressBlockedActions` called `MultiAddData` is explicitly used in the `Streaming*JoinSuites` to add data to two memory sources simultaneously.

This should avoid unintentional modification of the stress tests (or any other test for that matter) while making sure that the flaky tests are deterministic.

NOTE: This patch is modified a bit from origin PR (#20650) to cover DSv2 incompatibility between Spark 2.3 and 2.4: StreamingDataSourceV2Relation is a class for 2.3, whereas it is a case class for 2.4

## How was this patch tested?

Modified test cases in `Streaming*JoinSuites` where there are consecutive `AddData` actions.

Closes #23757 from HeartSaVioR/fix-streaming-join-test-flakiness-branch-2.3.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Co-authored-by: Tathagata Das <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
2 people authored and srowen committed Feb 12, 2019
1 parent 02e9890 commit abce846
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,16 @@ class MicroBatchExecution(
}
}

/** Execute a function while locking the stream from making an progress */
private[sql] def withProgressLocked(f: => Unit): Unit = {
awaitProgressLock.lock()
try {
f
} finally {
awaitProgressLock.unlock()
}
}

private def toJava(scalaOption: Option[OffsetV2]): Optional[OffsetV2] = {
Optional.ofNullable(scalaOption.orNull)
}
Expand Down
Loading

0 comments on commit abce846

Please sign in to comment.