From 1f47364aea322ef3747a16b5fac62754c9ea655c Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 25 Jul 2014 14:25:11 -0700 Subject: [PATCH] Minor fixes. --- .../streaming/flume/FlumePollingInputDStream.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index c93b7fee09f59..148262bb6771e 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -76,7 +76,7 @@ private[streaming] class FlumePollingReceiver( lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build()) - private val connections = new LinkedBlockingQueue[FlumeConnection]() + private lazy val connections = new LinkedBlockingQueue[FlumeConnection]() override def onStart(): Unit = { // Create the connections to each Flume agent. @@ -102,11 +102,11 @@ private[streaming] class FlumePollingReceiver( logDebug( "Received batch of " + events.size() + " events with sequence number: " + seq) try { - // Convert each Flume event to a serializable SparkPollingEvent + // Convert each Flume event to a serializable SparkFlumeEvent val buffer = new ArrayBuffer[SparkFlumeEvent](events.size()) var j = 0 while (j < events.size()) { - buffer += sparkSinkEventToSparkFlumeEvent(events(j)) + buffer += toSparkFlumeEvent(events(j)) j += 1 } store(buffer) @@ -156,9 +156,9 @@ private[streaming] class FlumePollingReceiver( /** * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]] * @param event - Event to convert to SparkFlumeEvent - * @return - The SparkSinkEvent generated from Spar + * @return - The SparkFlumeEvent generated from SparkSinkEvent */ - private def sparkSinkEventToSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = { + private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = { val sparkFlumeEvent = new SparkFlumeEvent() sparkFlumeEvent.event.setBody(event.getBody) sparkFlumeEvent.event.setHeaders(event.getHeaders)