-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered #3647
Conversation
Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`.
Can one of the admins verify this patch? |
This is a good catch, since it looks like the original intent was that we'd remove In fact, a much older version of this code actually had the remove call: spark/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala Line 97 in 61c0016
That file got renamed, but the spark/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala Line 141 in 04c37b6
It looks like the bug was actually introduced in #540, possibly as a copy-paste error when copying repeated code between |
@@ -152,6 +152,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |||
} | |||
receiverInfo(streamId) = newReceiverInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little suspicious to me that we assign to receiverInfo
on this line only to remove the value that we set two lines later. I think it would be clearer to remove this line and change the next line to read
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only difference with the above approach is that, the receiverInfo (at the ReceiverTracker) is not up-to-date (if we remove this line) at least with what is being sent to the StreamingListenerBus. Does the below approach make sense?
receiverInfo -= streamId
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's essentially what I was proposing. I was suggesting
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
receiverInfo.remove(streamId)
but I don't think there's going to be a huge different between this and your suggestion unless there are multi-threading concerns. Feel free to update this with your suggestion.
Jenkins, this is ok to test. |
Test build #24507 has started for PR 3647 at commit
|
Test build #24507 has finished for PR 3647 at commit
|
Test PASSed. |
Test build #24513 has started for PR 3647 at commit
|
@JoshRosen updated the PR. Please see my observation and comments there: https://issues.apache.org/jira/browse/SPARK-2892. There seems to be an issue where the I don't see this issue when running in |
Test build #24513 has finished for PR 3647 at commit
|
Test PASSed. |
This patch looks good to me, so I'm going to merge it into @ilayaperumalg Thanks for the pointer to that other JIRA. Let's keep investigating https://issues.apache.org/jira/browse/SPARK-2892; we can continue discussion on JIRA. |
Actually, one potential concern before I merge this: if the old never removed entries from the |
Hmm, so it looks like /** Report error sent by a receiver */
private def reportError(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
oldInfo.copy(lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
}
receiverInfo(streamId) = newReceiverInfo
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
s"$message"
}
logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
} This means that we'll leak |
@JoshRosen yeah, I too believe it is very unlikely. Upon deregistration of the corresponding receiver, both the |
LGTM. Merging this. |
…stered Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. Author: Ilayaperumal Gopinathan <[email protected]> Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits: 6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review 3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered (cherry picked from commit 10d69e9) Signed-off-by: Tathagata Das <[email protected]>
…stered Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. Author: Ilayaperumal Gopinathan <[email protected]> Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits: 6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review 3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered (cherry picked from commit 10d69e9) Signed-off-by: Tathagata Das <[email protected]> Conflicts: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
Once the streaming receiver is de-registered at executor, the
ReceiverTrackerActor
needs toremove the corresponding reveiverInfo from the
receiverInfo
map atReceiverTracker
.