From 4b0c7fcdf654023f56d3e85b8d52ee1d049d8c65 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 17 Jun 2014 22:47:49 -0700 Subject: [PATCH] FLUME-1729. New Flume-Spark integration. Avro does not support inheritance, so the error message needs to be part of the message itself. --- .../flume-sink/src/main/avro/sparkflume.avdl | 1 + .../apache/spark/flume/ErrorEventBatch.scala | 28 --------- .../flume/sink/TransactionProcessor.scala | 21 ++++--- .../flume/FlumePollingInputDStream.scala | 59 ++++++++++--------- 4 files changed, 42 insertions(+), 67 deletions(-) delete mode 100644 external/flume-sink/src/main/scala/org/apache/spark/flume/ErrorEventBatch.scala diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl index fa00b2310a17b..f8edd92f67e3e 100644 --- a/external/flume-sink/src/main/avro/sparkflume.avdl +++ b/external/flume-sink/src/main/avro/sparkflume.avdl @@ -27,6 +27,7 @@ protocol SparkFlumeProtocol { } record EventBatch { + string errorMsg = ""; // If this is empty it is a valid message, else it represents an error string sequenceNumber; array events; } diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/ErrorEventBatch.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/ErrorEventBatch.scala deleted file mode 100644 index 9bee61c71304c..0000000000000 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/ErrorEventBatch.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.flume - -import java.util - -/** - * Object representing an empty batch returned by the txn processor due to some error. - */ -case class ErrorEventBatch(var message: String) extends EventBatch { - // Make sure the internal data structures are initialized with non-null values. - setEvents(util.Collections.emptyList()) - setSequenceNumber("") -} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala index b1499cdbf54e1..8c4860d364aee 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{TimeUnit, CountDownLatch, Callable} import scala.util.control.Breaks import org.apache.flume.{Transaction, Channel} -import org.apache.spark.flume.{SparkSinkEvent, ErrorEventBatch, EventBatch} +import org.apache.spark.flume.{SparkSinkEvent, EventBatch} import org.slf4j.LoggerFactory @@ -50,7 +50,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, private val LOG = LoggerFactory.getLogger(classOf[TransactionProcessor]) // If a real batch is not returned, we always have to return an error batch. - @volatile private var eventBatch: EventBatch = new ErrorEventBatch("Unknown Error") + @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "", + util.Collections.emptyList()) // Synchronization primitives val batchGeneratedLatch = new CountDownLatch(1) @@ -70,7 +71,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, /** * Get an event batch from the channel. This method will block until a batch of events is * available from the channel. If no events are available after a large number of attempts of - * polling the channel, this method will return [[ErrorEventBatch]]. + * polling the channel, this method will return an [[EventBatch]] with a non-empty error message * * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a * maximum of maxBatchSize events @@ -96,16 +97,14 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, /** * Populates events into the event batch. If the batch cannot be populated, - * this method will not set the event batch which will stay [[ErrorEventBatch]] + * this method will not set the events into the event batch, but it sets an error message. */ private def populateEvents() { try { txOpt = Option(channel.getTransaction) if(txOpt.isEmpty) { - assert(eventBatch.isInstanceOf[ErrorEventBatch]) - eventBatch.asInstanceOf[ErrorEventBatch].message = "Something went wrong. Channel was " + - "unable to create a transaction!" - eventBatch + eventBatch.setErrorMsg("Something went wrong. Channel was " + + "unable to create a transaction!") } txOpt.map(tx => { tx.begin() @@ -135,16 +134,16 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, val msg = "Tried several times, " + "but did not get any events from the channel!" LOG.warn(msg) - eventBatch.asInstanceOf[ErrorEventBatch].message = msg + eventBatch.setErrorMsg(msg) } else { // At this point, the events are available, so fill them into the event batch - eventBatch = new EventBatch(seqNum, events) + eventBatch = new EventBatch("",seqNum, events) } }) } catch { case e: Exception => LOG.error("Error while processing transaction.", e) - eventBatch.asInstanceOf[ErrorEventBatch].message = e.getMessage + eventBatch.setErrorMsg(e.getMessage) try { txOpt.map(tx => { rollbackAndClose(tx, close = true) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 90a0b7113e2f7..ff6a5b5ce1d04 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -31,7 +31,7 @@ import org.apache.avro.ipc.specific.SpecificRequestor import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.apache.spark.Logging -import org.apache.spark.flume.{EventBatch, ErrorEventBatch, SparkSinkEvent, SparkFlumeProtocol} +import org.apache.spark.flume.{EventBatch, SparkSinkEvent, SparkFlumeProtocol} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream @@ -99,37 +99,40 @@ private[streaming] class FlumePollingReceiver( counter = counter % connections.size val client = connections(counter).client counter += 1 - client.getEventBatch(maxBatchSize) match { - case errorBatch: ErrorEventBatch => - logWarning("Error Event Batch received from Spark Sink. " + errorBatch.message) - case batch: EventBatch => - val seq = batch.getSequenceNumber - val events: java.util.List[SparkSinkEvent] = batch.getEvents - logDebug( - "Received batch of " + events.size() + " events with sequence number: " + seq) - try { - // Convert each Flume event to a serializable SparkPollingEvent - events.foreach(event => { - store(SparkFlumePollingEvent.fromSparkSinkEvent(event)) - }) - // Send an ack to Flume so that Flume discards the events from its channels. - client.ack(seq) - } catch { - case e: Throwable => - try { - // Let Flume know that the events need to be pushed back into the channel. - client.nack(seq) // If the agent is down, even this could fail and throw - } catch { - case e: Throwable => logError( - "Sending Nack also failed. A Flume agent is down.") - } - TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds. - logWarning("Error while attempting to store events", e) - } + val eventBatch = client.getEventBatch(maxBatchSize) + val errorMsg = eventBatch.getErrorMsg + if (errorMsg.toString.equals("")) { // No error, proceed with processing data + val seq = eventBatch.getSequenceNumber + val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents + logDebug( + "Received batch of " + events.size() + " events with sequence number: " + seq) + try { + // Convert each Flume event to a serializable SparkPollingEvent + events.foreach(event => { + store(SparkFlumePollingEvent.fromSparkSinkEvent(event)) + }) + // Send an ack to Flume so that Flume discards the events from its channels. + client.ack(seq) + } catch { + case e: Exception => + try { + // Let Flume know that the events need to be pushed back into the channel. + client.nack(seq) // If the agent is down, even this could fail and throw + } catch { + case e: Exception => logError( + "Sending Nack also failed. A Flume agent is down.") + } + TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds. + logWarning("Error while attempting to store events", e) + } + } else { + logWarning("Did not receive events from Flume agent due to error on the Flume agent: " + + "" + errorMsg.toString) } } } } + // Create multiple threads and start all of them. for (i <- 0 until parallelism) { logInfo("Starting Flume Polling Receiver worker threads starting..")