Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered #3647

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
receiverInfo(streamId) = newReceiverInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little suspicious to me that we assign to receiverInfo on this line only to remove the value that we set two lines later. I think it would be clearer to remove this line and change the next line to read

listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))

instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference with the above approach is that, the receiverInfo (at the ReceiverTracker) is not up-to-date (if we remove this line) at least with what is being sent to the StreamingListenerBus. Does the below approach make sense?

receiverInfo -= streamId
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's essentially what I was proposing. I was suggesting

listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
receiverInfo.remove(streamId)

but I don't think there's going to be a huge different between this and your suggestion unless there are multi-threading concerns. Feel free to update this with your suggestion.

listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
receiverInfo.remove(streamId)
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
Expand Down