Skip to content

Commit

Permalink
Minor fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Jul 25, 2014
1 parent 73d6f6d commit 1f47364
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[streaming] class FlumePollingReceiver(
lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())

private val connections = new LinkedBlockingQueue[FlumeConnection]()
private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()

override def onStart(): Unit = {
// Create the connections to each Flume agent.
Expand All @@ -102,11 +102,11 @@ private[streaming] class FlumePollingReceiver(
logDebug(
"Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
// Convert each Flume event to a serializable SparkFlumeEvent
val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
var j = 0
while (j < events.size()) {
buffer += sparkSinkEventToSparkFlumeEvent(events(j))
buffer += toSparkFlumeEvent(events(j))
j += 1
}
store(buffer)
Expand Down Expand Up @@ -156,9 +156,9 @@ private[streaming] class FlumePollingReceiver(
/**
* Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
* @param event - Event to convert to SparkFlumeEvent
* @return - The SparkSinkEvent generated from Spar
* @return - The SparkFlumeEvent generated from SparkSinkEvent
*/
private def sparkSinkEventToSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
val sparkFlumeEvent = new SparkFlumeEvent()
sparkFlumeEvent.event.setBody(event.getBody)
sparkFlumeEvent.event.setHeaders(event.getHeaders)
Expand Down

0 comments on commit 1f47364

Please sign in to comment.