-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25297][Streaming][Test] Fix blocking unit tests for Scala 2.12 #22304
Conversation
There is simplified project for this specific problem. |
@@ -65,7 +65,8 @@ private[streaming] class FileBasedWriteAheadLog( | |||
"WriteAheadLogManager" + callerName.map(c => s" for $c").getOrElse("") | |||
} | |||
private val forkJoinPool = ThreadUtils.newForkJoinPool(threadpoolName, 20) | |||
private val executionContext = ExecutionContext.fromExecutorService(forkJoinPool) | |||
private val executionContext = ExecutionContext | |||
.fromExecutorService(forkJoinPool, { e: Throwable => throw e }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean in scala 2.12 the default reporter is not throwing the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default reporter does not throw the exception in Scala 2.11 either.
I guess there are bugs in the Future impl in Scala 2.12, since there is a big change at Future.scala in Scala 2.12.
This is a temporary fix.
I will dig into Future.scala, and hopefully propose a PR for Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sadhen I was just going to start looking into this. This seems like a reasonable solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The suggested solution will not work as intended as it will prevent subsequent callbacks (esp for other ExecutionContexts) to be submitted, esp. in the case of RejectedExecutionException.
See:
https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/Promise.scala#L68
https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/Promise.scala#L284
Or, in other words, rethrowing is not "reporting" the error.
The new Future encoding included in Scala 2.13.x deals much better with RejectedExecutionExceptions and InterruptedExceptions, but it is unclear right now what parts could be backported within the binary compatibility constraints.
Test build #95540 has finished for PR 22304 at commit
|
More info ... the test failure symptom is that this test never completes, just repeating a
|
The one concern is whether this behavior change impacts anything else. Now some exceptions will cause an actual exception in a caller to methods like Await, rather than just logging and continuing. That might even be a good thing, and is crucial here. But that's what I have in mind. That said, tests all pass. |
@srowen are we targeting this patch to Spark 2.4? |
@srowen @cloud-fan The merged #22292 already fixed it. |
What changes were proposed in this pull request?
Customize ExecutorContext's reporter to fix blocking unit tests for Scala 2.12
How was this patch tested?