From 3640c868d8010ae765837896d2e22e7924948f75 Mon Sep 17 00:00:00 2001 From: Ilayaperumal Gopinathan Date: Tue, 9 Dec 2014 11:20:47 -0800 Subject: [PATCH 1/2] Remove receiverInfo once receiver is de-registered Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. --- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 32e481dabc8ca..ff3a04ea4c31a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -152,6 +152,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } receiverInfo(streamId) = newReceiverInfo listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId))) + receiverInfo.remove(streamId) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { From 6eb97d5d079b6a6f5edce7a3dac519449e81e117 Mon Sep 17 00:00:00 2001 From: Ilayaperumal Gopinathan Date: Tue, 16 Dec 2014 15:22:30 -0800 Subject: [PATCH 2/2] Polishing based on the review --- .../apache/spark/streaming/scheduler/ReceiverTracker.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index ff3a04ea4c31a..1f0e442a12283 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -150,9 +150,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logWarning("No prior receiver info") ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) } - receiverInfo(streamId) = newReceiverInfo - listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId))) - receiverInfo.remove(streamId) + receiverInfo -= streamId + listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else {