From d6fa3aa25e21be508c695067a858afd0d3ddbd64 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 9 Jun 2014 22:27:19 -0700 Subject: [PATCH] SPARK-1729. New Flume-Spark integration. Made the Flume Sink considerably simpler. Added a lot of documentation. --- .../apache/spark/flume/sink/SparkSink.scala | 527 ++++++++++-------- .../flume/FlumePollingInputDStream.scala | 27 +- 2 files changed, 323 insertions(+), 231 deletions(-) diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala index 030396f12844a..e830b388c12b2 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala @@ -16,13 +16,11 @@ */ package org.apache.spark.flume.sink - import java.net.InetSocketAddress import java.nio.ByteBuffer import java.util import java.util.concurrent._ import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.locks.ReentrantLock import scala.util.control.Breaks @@ -33,63 +31,68 @@ import org.apache.commons.lang.RandomStringUtils import org.apache.flume.Sink.Status import org.apache.flume.conf.{ConfigurationException, Configurable} import org.apache.flume.sink.AbstractSink -import org.apache.flume.{FlumeException, Context} - -import org.apache.spark.flume.{SparkSinkEvent, EventBatch, SparkFlumeProtocol} +import org.apache.flume.{Channel, Transaction, FlumeException, Context} import org.slf4j.LoggerFactory - - +import org.apache.spark.flume.{SparkSinkEvent, EventBatch, SparkFlumeProtocol} +/** + * A sink that uses Avro RPC to run a server that can be polled by Spark's + * FlumePollingInputDStream. This sink has the following configuration parameters: + * + * hostname - The hostname to bind to. Default: 0.0.0.0 + * port - The port to bind to. (No default - mandatory) + * timeout - Time in seconds after which a transaction is rolled back, + * if an ACK is not received from Spark within that time + * threads - Number of threads to use to receive requests from Spark (Default: 10) + * + */ +// Flume forces transactions to be thread-local. So each transaction *must* be committed, or +// rolled back from the thread it was originally created in. So each getEvents call from Spark +// creates a TransactionProcessor which runs in a new thread, in which the transaction is created +// and events are pulled off the channel. Once the events are sent to spark, +// that thread is blocked and the TransactionProcessor is saved in a map, +// until an ACK or NACK comes back or the transaction times out (after the specified timeout). +// When the response comes, the TransactionProcessor is retrieved and then unblocked, +// at which point the transaction is committed or rolled back. class SparkSink extends AbstractSink with Configurable { - private val LOG = LoggerFactory.getLogger(this.getClass) - - // This sink will not persist sequence numbers and reuses them if it gets restarted. - // So it is possible to commit a transaction which may have been meant for the sink before the - // restart. - // Since the new txn may not have the same sequence number we must guard against accidentally - // committing - // a new transaction. To reduce the probability of that happening a random string is prepended - // to the sequence number. - // Does not change for life of sink - private val seqBase = RandomStringUtils.randomAlphanumeric(8) - // Incremented for each transaction - private val seqNum = new AtomicLong(0) - private var transactionExecutorOpt: Option[ExecutorService] = None + // Size of the pool to use for holding transaction processors. + private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS - private var numProcessors: Integer = SparkSinkConfig.DEFAULT_PROCESSOR_COUNT + // Timeout for each transaction. If spark does not respond in this much time, + // rollback the transaction private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT - private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]() - - private var processorManager: Option[TransactionProcessorManager] = None + // Address info to bind on private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME private var port: Int = 0 - private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS + + // Handle to the server private var serverOpt: Option[NettyServer] = None + // The handler that handles the callback from Avro + private var handler: Option[SparkAvroCallbackHandler] = None + + // Latch that blocks off the Flume framework from wasting 1 thread. private val blockingLatch = new CountDownLatch(1) override def start() { - transactionExecutorOpt = Option(Executors.newFixedThreadPool(numProcessors, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Spark Sink, " + getName + " Processor Thread - %d").build())) - - processorManager = Option(new TransactionProcessorManager(numProcessors)) - - val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler()) - + handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout)) + val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get) // Using the constructor that takes specific thread-pools requires bringing in netty // dependencies which are being excluded in the build. In practice, // Netty dependencies are already available on the JVM as Flume would have pulled them in. serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) - - serverOpt.map(server => server.start()) + serverOpt.map(server => { + server.start() + }) super.start() } override def stop() { - transactionExecutorOpt.map(executor => executor.shutdownNow()) + handler.map(callbackHandler => { + callbackHandler.shutdown() + }) serverOpt.map(server => { server.close() server.join() @@ -98,14 +101,16 @@ class SparkSink extends AbstractSink with Configurable { super.stop() } + /** + * @param ctx + */ override def configure(ctx: Context) { import SparkSinkConfig._ hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME) port = Option(ctx.getInteger(CONF_PORT)). getOrElse(throw new ConfigurationException("The port to bind to must be specified")) - numProcessors = ctx.getInteger(PROCESSOR_COUNT, DEFAULT_PROCESSOR_COUNT) + poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS) transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT) - maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS) } override def process(): Status = { @@ -115,233 +120,306 @@ class SparkSink extends AbstractSink with Configurable { blockingLatch.await() Status.BACKOFF } +} +/** + * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process + * requests. Each getEvents, ack and nack call is forwarded to an instance of this class. + * @param threads Number of threads to use to process requests. + * @param channel The channel that the sink pulls events from + * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark + * is rolled back. + */ +private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, + val transactionTimeout: Int) extends SparkFlumeProtocol { + private val LOG = LoggerFactory.getLogger(classOf[SparkAvroCallbackHandler]) + val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Spark Sink Processor Thread - %d").build())) + private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]() + // This sink will not persist sequence numbers and reuses them if it gets restarted. + // So it is possible to commit a transaction which may have been meant for the sink before the + // restart. + // Since the new txn may not have the same sequence number we must guard against accidentally + // committing a new transaction. To reduce the probability of that happening a random string is + // prepended to the sequence number. Does not change for life of sink + private val seqBase = RandomStringUtils.randomAlphanumeric(8) + private val seqCounter = new AtomicLong(0) - // Object representing an empty batch returned by the txn processor due to some error. - case object ErrorEventBatch extends EventBatch + /** + * Returns a bunch of events to Spark over Avro RPC. + * @param n Maximum number of events to return in a batch + * @return [[EventBatch]] instance that has a sequence number and an array of at most n events + */ + override def getEventBatch(n: Int): EventBatch = { + val sequenceNumber = seqBase + seqCounter.incrementAndGet() + val processor = new TransactionProcessor(channel, sequenceNumber, + n, transactionTimeout, this) + transactionExecutorOpt.map(executor => { + executor.submit(processor) + }) + // Wait until a batch is available - can be null if some error was thrown + processor.getEventBatch match { + case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" + + " retrieved from channel.") + case eventBatch: EventBatch => + processorMap.put(sequenceNumber, processor) + if (LOG.isDebugEnabled()) { + LOG.debug("Sent " + eventBatch.getEvents.size() + + " events with sequence number: " + eventBatch.getSequenceNumber) + } + eventBatch + } + } - private class AvroCallbackHandler extends SparkFlumeProtocol { + /** + * Called by Spark to indicate successful commit of a batch + * @param sequenceNumber The sequence number of the event batch that was successful + */ + override def ack(sequenceNumber: CharSequence): Void = { + completeTransaction(sequenceNumber, success = true) + null + } - override def getEventBatch(n: Int): EventBatch = { - val processor = processorManager.get.checkOut(n) - transactionExecutorOpt.map(executor => executor.submit(processor)) - // Wait until a batch is available - can be null if some error was thrown - processor.eventQueue.take() match { - case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" + - " retrieved from channel.") - case eventBatch: EventBatch => - processorMap.put(eventBatch.getSequenceNumber, processor) - if (LOG.isDebugEnabled) { - LOG.debug("Sent " + eventBatch.getEvents.size() + - " events with sequence number: " + eventBatch.getSequenceNumber) - } - eventBatch - } - } + /** + * Called by Spark to indicate failed commit of a batch + * @param sequenceNumber The sequence number of the event batch that failed + * @return + */ + override def nack(sequenceNumber: CharSequence): Void = { + completeTransaction(sequenceNumber, success = false) + LOG.info("Spark failed to commit transaction. Will reattempt events.") + null + } - override def ack(sequenceNumber: CharSequence): Void = { - completeTransaction(sequenceNumber, success = true) - null - } + /** + * Helper method to commit or rollback a transaction. + * @param sequenceNumber The sequence number of the batch that was completed + * @param success Whether the batch was successful or not. + */ + private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { + Option(removeAndGetProcessor(sequenceNumber)).map(processor => { + processor.batchProcessed(success) + }) + } - override def nack(sequenceNumber: CharSequence): Void = { - completeTransaction(sequenceNumber, success = false) - LOG.info("Spark failed to commit transaction. Will reattempt events.") - null - } + /** + * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak. + * @param sequenceNumber + * @return The transaction processor for the corresponding batch. Note that this instance is no + * longer tracked and the caller is responsible for that txn processor. + */ + private[flume] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = { + processorMap.remove(sequenceNumber.toString) // The toString is required! + } - def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { - val processorOpt = Option(processorMap.remove(sequenceNumber)) - if (processorOpt.isDefined) { - val processor = processorOpt.get - processor.resultQueueUpdateLock.lock() - try { - // Is the sequence number the same as the one the processor is processing? If not, - // don't update { - if (processor.eventBatch.getSequenceNumber.equals(sequenceNumber)) { - processor.resultQueue.put(success) - } - } finally { - processor.resultQueueUpdateLock.unlock() - } - } - } + /** + * Shuts down the executor used to process transactions. + */ + def shutdown() { + transactionExecutorOpt.map(executor => { + executor.shutdownNow() + }) } +} + +/** + * Object representing an empty batch returned by the txn processor due to some error. + */ +case object ErrorEventBatch extends EventBatch + +// Flume forces transactions to be thread-local (horrible, I know!) +// So the sink basically spawns a new thread to pull the events out within a transaction. +// The thread fills in the event batch object that is set before the thread is scheduled. +// After filling it in, the thread waits on a condition - which is released only +// when the success message comes back for the specific sequence number for that event batch. +/** + * This class represents a transaction on the Flume channel. This class runs a separate thread + * which owns the transaction. The thread is blocked until the success call for that transaction + * comes back with an ACK or NACK. + * @param channel The channel from which to pull events + * @param seqNum The sequence number to use for the transaction. Must be unique + * @param maxBatchSize The maximum number of events to process per batch + * @param transactionTimeout Time in seconds after which a transaction must be rolled back + * without waiting for an ACK from Spark + * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts + */ +private class TransactionProcessor(val channel: Channel, val seqNum: String, + var maxBatchSize: Int, val transactionTimeout: Int, + val parent: SparkAvroCallbackHandler) extends Callable[Void] { + + private val LOG = LoggerFactory.getLogger(classOf[TransactionProcessor]) + + // If a real batch is not returned, we always have to return an error batch. + @volatile private var eventBatch: EventBatch = ErrorEventBatch + + // Synchronization primitives + val batchGeneratedLatch = new CountDownLatch(1) + val batchAckLatch = new CountDownLatch(1) + + // Sanity check to ensure we don't loop like crazy + val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2 + + // OK to use volatile, since the change would only make this true (otherwise it will be + // changed to false - we never apply a negation operation to this) - which means the transaction + // succeeded. + @volatile private var batchSuccess = false + + // The transaction that this processor would handle + var txOpt: Option[Transaction] = None - // Flume forces transactions to be thread-local (horrible, I know!) - // So the sink basically spawns a new thread to pull the events out within a transaction. - // The thread fills in the event batch object that is set before the thread is scheduled. - // After filling it in, the thread waits on a condition - which is released only - // when the success message comes back for the specific sequence number for that event batch. /** - * This class represents a transaction on the Flume channel. This class runs a separate thread - * which owns the transaction. It is blocked until the success call for that transaction comes - * back. - * @param maxBatchSize + * Get an event batch from the channel. This method will block until a batch of events is + * available from the channel. If no events are available after a large number of attempts of + * polling the channel, this method will return [[ErrorEventBatch]]. + * + * @return An [[EventBatch]] instance with sequence number set to [[seqNum]], filled with a + * maximum of [[maxBatchSize]] events */ - private class TransactionProcessor(var maxBatchSize: Int) extends Callable[Void] { - // Must be set to a new event batch before scheduling this!! - val eventBatch = new EventBatch("", new util.LinkedList[SparkSinkEvent]) - val eventQueue = new SynchronousQueue[EventBatch]() - val resultQueue = new SynchronousQueue[Boolean]() - val resultQueueUpdateLock = new ReentrantLock() - - object Zero { - val zero = "0" // Oh, I miss static finals - } + def getEventBatch: EventBatch = { + batchGeneratedLatch.await() + eventBatch + } + /** + * This method is to be called by the sink when it receives an ACK or NACK from Spark. This + * method is a no-op if it is called after [[transactionTimeout]] has expired since + * [[getEventBatch]] returned a batch of events. + * @param success True if an ACK was received and the transaction should be committed, else false. + */ + def batchProcessed(success: Boolean) { + if (LOG.isDebugEnabled) { + LOG.debug("Batch processed for sequence number: " + seqNum) + } + batchSuccess = success + batchAckLatch.countDown() + } - override def call(): Void = { - val tx = getChannel.getTransaction - tx.begin() - try { - eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet()) - val events = eventBatch.getEvents - events.clear() + /** + * Populates events into the event batch. If the batch cannot be populated, + * this method will not set the event batch which will stay [[ErrorEventBatch]] + */ + private def populateEvents() { + try { + txOpt = Option(channel.getTransaction) + txOpt.map(tx => { + tx.begin() + val events = new util.ArrayList[SparkSinkEvent](maxBatchSize) val loop = new Breaks var gotEventsInThisTxn = false + var loopCounter: Int = 0 loop.breakable { - while (events.size() < maxBatchSize) { - Option(getChannel.take()) match { + while (events.size() < maxBatchSize + && loopCounter < totalAttemptsToRemoveFromChannel) { + loopCounter += 1 + Option(channel.take()) match { case Some(event) => events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders), ByteBuffer.wrap(event.getBody))) gotEventsInThisTxn = true case None => if (!gotEventsInThisTxn) { - Thread.sleep(500) + TimeUnit.MILLISECONDS.sleep(500) } else { loop.break() } } } } - // Make the data available to the sender thread - eventQueue.put(eventBatch) - - // Wait till timeout for the ack/nack - val maybeResult = Option(resultQueue.poll(transactionTimeout, TimeUnit.SECONDS)) - // There is a race condition here. - // 1. This times out. - // 2. The result is empty, so timeout exception is thrown. - // 3. The ack comes in before the finally block is entered - // 4. The thread with the ack has a handle to this processor, - // and another thread has the same processor checked out - // (since the finally block was executed and the processor checked back in) - // 5. The thread with the ack now updates the result queue, - // so the processor thinks it is the ack for the current batch. - // To avoid this - update the sequence number to "0" (with or without a result - does not - // matter). - // In the ack method, check if the seq number is the same as the processor's - - // if they are then update the result queue. Now if the - // processor updates the seq number first - the ack/nack never updates the result. If the - // ack/nack updates the - // result after the timeout but before the seq number is updated to "0" it does not - // matter - the processor would - // still timeout and the result is cleared before reusing the processor. - // Unfortunately, this needs to be done from within a lock - // to make sure that the new sequence number is actually visible to the ack thread - // (happens-before) - resultQueueUpdateLock.lock() + if (!gotEventsInThisTxn) { + throw new FlumeException("Tried too many times, didn't get any events from the channel") + } + // At this point, the events are available, so fill them into the event batch + eventBatch = new EventBatch(seqNum, events) + }) + } catch { + case e: Throwable => + LOG.error("Error while processing transaction.", e) try { - eventBatch.setSequenceNumber(Zero.zero) + txOpt.map(tx => { + rollbackAndClose(tx, close = true) + }) } finally { - resultQueueUpdateLock.unlock() + // Avro might serialize the exception and cause a NACK, + // so don't bother with the transaction + txOpt = None } - eventBatch.getEvents.clear() - // If the batch failed on spark side, throw a FlumeException - maybeResult.map(success => - if (!success) { - throw new - FlumeException("Spark could not accept events. The transaction will be retried.") - } - ) - // If the operation timed out, throw a TimeoutException - if (maybeResult.isEmpty) { - throw new TimeoutException("Spark did not respond within the timeout period of " + - transactionTimeout + "seconds. Transaction will be retried") - } - null - } catch { - case e: Throwable => - try { - LOG.warn("Error while attempting to remove events from the channel.", e) - tx.rollback() - } catch { - case e1: Throwable => LOG.error( - "Rollback failed while attempting to rollback due to commit failure.", e1) - } - null // No point rethrowing the exception - } finally { - // Must *always* release the caller thread - eventQueue.put(ErrorEventBatch) - // In the case of success coming after the timeout, but before resetting the seq number - // remove the event from the map and then clear the value - resultQueue.clear() - processorMap.remove(eventBatch.getSequenceNumber) - processorManager.get.checkIn(this) - tx.close() - } - } - - def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence, - CharSequence] = { - val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size()) - charSeqMap.putAll(inMap) - charSeqMap + } finally { + batchGeneratedLatch.countDown() } } - private class TransactionProcessorManager(val maxInstances: Int) { - val queue = new scala.collection.mutable.Queue[TransactionProcessor] - val queueModificationLock = new ReentrantLock() - var currentSize = 0 - val waitForCheckIn = queueModificationLock.newCondition() - - def checkOut(n: Int): TransactionProcessor = { - def getProcessor = { - val processor = queue.dequeue() - processor.maxBatchSize = n - processor - } - queueModificationLock.lock() - try { - if (queue.size > 0) { - getProcessor - } - else { - if (currentSize < maxInstances) { - currentSize += 1 - new TransactionProcessor(n) - } else { - // No events in queue and cannot initialize more! - // Since currentSize never reduces, queue size increasing is the only hope - while (queue.size == 0 && currentSize >= maxInstances) { - waitForCheckIn.await() - } - getProcessor - } + /** + * Waits for upto [[transactionTimeout]] seconds for an ACK. If an ACK comes in, + * this method commits the transaction with the channel. If the ACK does not come in within + * that time or a NACK comes in, this method rolls back the transaction. + */ + private def processAckOrNack() { + batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS) + txOpt.map(tx => { + if (batchSuccess) { + try { + tx.commit() + } catch { + case e: Throwable => + rollbackAndClose(tx, close = false) // tx will be closed later anyway + } finally { + tx.close() } - } finally { - queueModificationLock.unlock() + } else { + rollbackAndClose(tx, close = true) + // This might have been due to timeout or a NACK. Either way the following call does not + // cause issues. This is required to ensure the TransactionProcessor instance is not leaked + parent.removeAndGetProcessor(seqNum) } - } + }) + } - def checkIn(processor: TransactionProcessor) { - queueModificationLock.lock() - try { - queue.enqueue(processor) - waitForCheckIn.signal() - } finally { - queueModificationLock.unlock() + /** + * Helper method to rollback and optionally close a transaction + * @param tx The transaction to rollback + * @param close Whether the transaction should be closed or not after rolling back + */ + private def rollbackAndClose(tx: Transaction, close: Boolean) { + try { + tx.rollback() + LOG.warn("Spark was unable to successfully process the events. Transaction is being " + + "rolled back.") + } catch { + case e: Throwable => + LOG.error("Error rolling back transaction. Rollback may have failed!", e) + } finally { + if (close) { + tx.close() } } } + + /** + * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence] + * @param inMap The map to be converted + * @return The converted map + */ + private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence, + CharSequence] = { + val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size()) + charSeqMap.putAll(inMap) + charSeqMap + } + + override def call(): Void = { + populateEvents() + processAckOrNack() + null + } } +/** + * Configuration parameters and their defaults. + */ object SparkSinkConfig { - val PROCESSOR_COUNT = "processorCount" - val DEFAULT_PROCESSOR_COUNT = 10 + val THREADS = "threads" + val DEFAULT_THREADS = 10 val CONF_TRANSACTION_TIMEOUT = "timeout" val DEFAULT_TRANSACTION_TIMEOUT = 60 @@ -350,7 +428,4 @@ object SparkSinkConfig { val DEFAULT_HOSTNAME = "0.0.0.0" val CONF_PORT = "port" - - val CONF_MAX_THREADS = "maxThreads" - val DEFAULT_MAX_THREADS = 5 } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index ee337b5f5507f..324f9551287b1 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -23,7 +23,6 @@ import java.nio.ByteBuffer import java.util.concurrent.{TimeUnit, Executors} import scala.collection.JavaConversions._ -import scala.collection.mutable import scala.reflect.ClassTag import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -38,8 +37,16 @@ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver - - +/** + * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running + * [[org.apache.spark.flume.sink.SparkSink]]s. + * @param ssc_ Streaming context that will execute this input stream + * @param addresses List of addresses at which SparkSinks are listening + * @param maxBatchSize Maximum size of a batch + * @param parallelism Number of parallel connections to open + * @param storageLevel The storage level to use. + * @tparam T Class type of the object of this stream + */ class FlumePollingInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, val addresses: Seq[InetSocketAddress], @@ -84,6 +91,7 @@ private[streaming] class FlumePollingReceiver( new FlumeConnection(transceiver, client) }).toArray + // Threads that pull data from Flume. val dataReceiver = new Runnable { override def run(): Unit = { var counter = 0 @@ -96,11 +104,14 @@ private[streaming] class FlumePollingReceiver( val events: java.util.List[SparkSinkEvent] = batch.getEvents logDebug("Received batch of " + events.size() + " events with sequence number: " + seq) try { + // Convert each Flume event to a serializable SparkPollingEvent events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event))) + // Send an ack to Flume so that Flume discards the events from its channels. client.ack(seq) } catch { case e: Throwable => try { + // Let Flume know that the events need to be pushed back into the channel. client.nack(seq) // If the agent is down, even this could fail and throw } catch { case e: Throwable => logError("Sending Nack also failed. A Flume agent is down.") @@ -111,6 +122,7 @@ private[streaming] class FlumePollingReceiver( } } } + // Create multiple threads and start all of them. for (i <- 0 until parallelism) { logInfo("Starting Flume Polling Receiver worker threads starting..") receiverExecutor.submit(dataReceiver) @@ -129,13 +141,18 @@ private[streaming] class FlumePollingReceiver( logInfo("Shutting down Flume Polling Receiver") receiverExecutor.shutdownNow() connections.map(connection => { - connection.tranceiver.close() + connection.transceiver.close() }) channelFactory.releaseExternalResources() } } -private class FlumeConnection(val tranceiver: NettyTransceiver, +/** + * A wrapper around the transceiver and the Avro IPC API. + * @param transceiver The transceiver to use for communication with Flume + * @param client The client that the callbacks are received on. + */ +private class FlumeConnection(val transceiver: NettyTransceiver, val client: SparkFlumeProtocol.Callback) private[streaming] object SparkPollingEvent {