diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala index 24e9ad8a21f02..add858530862b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala @@ -38,4 +38,4 @@ object JavaPairInputDStream { inputDStream: InputDStream[(K, V)]): JavaPairInputDStream[K, V] = { new JavaPairInputDStream[K, V](inputDStream) } -} \ No newline at end of file +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutorImpl.scala index 7796d2a64bf8a..88badca476f10 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutorImpl.scala @@ -144,7 +144,8 @@ private[streaming] class NetworkReceiverExecutorImpl( /** Report pushed block */ def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) { - trackerActor ! AddBlock(ReceivedBlockInfo(receiverId, blockId, numRecords, optionalMetadata.orNull)) + val blockInfo = ReceivedBlockInfo(receiverId, blockId, numRecords, optionalMetadata.orNull) + trackerActor ! AddBlock(blockInfo) logDebug("Reported block " + blockId) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala index f29ea065f8767..1ca592102e2ce 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -21,15 +21,14 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkConf +import org.apache.spark.storage.{StorageLevel, StreamBlockId} +import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, NetworkReceiver, NetworkReceiverExecutor} import org.scalatest.FunSuite import org.scalatest.concurrent.Timeouts import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkConf -import org.apache.spark.storage.{StorageLevel, StreamBlockId} -import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, NetworkReceiver, NetworkReceiverExecutor} -import org.apache.spark.streaming.receiver.NetworkReceiverExecutor /** Testsuite for testing the network receiver behavior */ class NetworkReceiverSuite extends FunSuite with Timeouts {