From 0f10788487f10234aa39277d4c20556f7c846796 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Sat, 24 May 2014 01:32:32 -0700 Subject: [PATCH] SPARK-1729. Make Flume pull data from source, rather than the current push model Added support for polling several Flume agents from a single receiver. --- .../flume/FlumePollingInputDStream.scala | 39 +++++-- .../spark/streaming/flume/FlumeUtils.scala | 17 ++- .../flume/FlumePollingReceiverSuite.scala | 108 ++++++++++++++---- 3 files changed, 120 insertions(+), 44 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 71b0f72f85f53..2571f2d36f3f1 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -32,11 +32,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import java.io.{ObjectOutput, ObjectInput, Externalizable} import java.nio.ByteBuffer import scala.collection.JavaConversions._ +import scala.collection.mutable class FlumePollingInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, - val host: String, - val port: Int, + val addresses: Seq[InetSocketAddress], val maxBatchSize: Int, val parallelism: Int, storageLevel: StorageLevel @@ -47,30 +47,44 @@ class FlumePollingInputDStream[T: ClassTag]( * of a NetworkInputDStream. */ override def getReceiver(): Receiver[SparkPollingEvent] = { - new FlumePollingReceiver(host, port, maxBatchSize, parallelism, storageLevel) + new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) } } private[streaming] class FlumePollingReceiver( - host: String, - port: Int, + addresses: Seq[InetSocketAddress], maxBatchSize: Int, parallelism: Int, storageLevel: StorageLevel ) extends Receiver[SparkPollingEvent](storageLevel) with Logging { + lazy val channelFactoryExecutor = + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true). + setNameFormat("Flume Receiver Channel Thread - %d").build()) + lazy val channelFactory = - new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(), - Executors.newSingleThreadExecutor()) - lazy val transceiver = new NettyTransceiver(new InetSocketAddress(host, port), channelFactory) - lazy val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) + new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor) + lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build()) + private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later + override def onStart(): Unit = { + val connectionBuilder = new mutable.ArrayBuilder.ofRef[FlumeConnection]() + addresses.map(host => { + val transceiver = new NettyTransceiver(host, channelFactory) + val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) + connectionBuilder += new FlumeConnection(transceiver, client) + }) + connections = connectionBuilder.result() val dataReceiver = new Runnable { override def run(): Unit = { + var counter = 0 while (true) { + counter = counter % connections.size + val client = connections(counter).client + counter += 1 val batch = client.getEventBatch(maxBatchSize) val seq = batch.getSequenceNumber val events: java.util.List[SparkSinkEvent] = batch.getEventBatch @@ -104,11 +118,16 @@ private[streaming] class FlumePollingReceiver( override def onStop(): Unit = { logInfo("Shutting down Flume Polling Receiver") receiverExecutor.shutdownNow() - transceiver.close() + connections.map(connection => { + connection.tranceiver.close() + }) channelFactory.releaseExternalResources() } } +private class FlumeConnection(val tranceiver: NettyTransceiver, + val client: SparkFlumeProtocol.Callback) + private[streaming] object SparkPollingEvent { def fromSparkSinkEvent(in: SparkSinkEvent): SparkPollingEvent = { val event = new SparkPollingEvent() diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index f7d9bd3c6e2ab..ca0059ff04dab 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -21,6 +21,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream +import java.net.InetSocketAddress object FlumeUtils { /** @@ -72,8 +73,7 @@ object FlumeUtils { /** * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. - * @param host The host on which the Flume agent is running - * @param port The port the Spark Sink is accepting connections on + * @param addresses List of InetSocketAddresses representing the hosts to connect to. * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a * single RPC call * @param parallelism Number of concurrent requests this stream should send to the sink. Note @@ -83,21 +83,19 @@ object FlumeUtils { */ def createPollingStream ( ssc: StreamingContext, - host: String, - port: Int, + addresses: Seq[InetSocketAddress], maxBatchSize: Int = 100, parallelism: Int = 5, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[SparkPollingEvent] = { - new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize, + new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } /** * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. - * @param host The host on which the Flume agent is running - * @param port The port the Spark Sink is accepting connections on + * @param addresses List of InetSocketAddresses representing the hosts to connect to. * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a * single RPC call * @param parallelism Number of concurrent requests this stream should send to the sink. Note @@ -107,13 +105,12 @@ object FlumeUtils { */ def createJavaPollingStream ( ssc: StreamingContext, - host: String, - port: Int, + addresses: Seq[InetSocketAddress], maxBatchSize: Int = 100, parallelism: Int = 5, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): JavaReceiverInputDStream[SparkPollingEvent] = { - new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize, + new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala index aa5db4d94ff17..404759f291f39 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala @@ -29,6 +29,8 @@ import org.apache.spark.flume.sink.{SparkSinkConfig, SparkSink} import scala.collection.JavaConversions._ import org.apache.flume.event.EventBuilder import org.apache.spark.streaming.dstream.ReceiverInputDStream +import java.net.InetSocketAddress +import java.util.concurrent.{Callable, ExecutorCompletionService, Executors} class FlumePollingReceiverSuite extends TestSuiteBase { @@ -38,7 +40,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: ReceiverInputDStream[SparkPollingEvent] = - FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 1, + FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 5, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]] with SynchronizedBuffer[Seq[SparkPollingEvent]] @@ -60,26 +62,67 @@ class FlumePollingReceiverSuite extends TestSuiteBase { sink.setChannel(channel) sink.start() ssc.start() + writeAndVerify(Seq(channel), ssc, outputBuffer) + sink.stop() + channel.stop() + } - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - var t = 0 - for (i <- 0 until 5) { - val tx = channel.getTransaction - tx.begin() - for (j <- 0 until 5) { - channel.put(EventBuilder.withBody( - String.valueOf(t).getBytes("utf-8"), - Map[String, String]("test-" + t.toString -> "header"))) - t += 1 - } + test("flume polling test multiple hosts") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val flumeStream: ReceiverInputDStream[SparkPollingEvent] = + FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort), + new InetSocketAddress("localhost", testPort + 1)), 100, 5, + StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]] + with SynchronizedBuffer[Seq[SparkPollingEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + outputStream.register() + + // Start the channel and sink. + val context = new Context() + context.put("capacity", "5000") + context.put("transactionCapacity", "1000") + context.put("keep-alive", "0") + val channel = new MemoryChannel() + Configurables.configure(channel, context) + + val channel2 = new MemoryChannel() + Configurables.configure(channel2, context) - tx.commit() - tx.close() - Thread.sleep(500) // Allow some time for the events to reach - clock.addToTime(batchDuration.milliseconds) + val sink = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort)) + Configurables.configure(sink, context) + sink.setChannel(channel) + sink.start() + + val sink2 = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1)) + Configurables.configure(sink2, context) + sink2.setChannel(channel2) + sink2.start() + ssc.start() + writeAndVerify(Seq(channel, channel2), ssc, outputBuffer) + sink.stop() + channel.stop() + + } + + def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext, + outputBuffer: ArrayBuffer[Seq[SparkPollingEvent]]) { + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val executor = Executors.newCachedThreadPool() + val executorCompletion = new ExecutorCompletionService[Void](executor) + channels.map(channel => { + executorCompletion.submit(new TxnSubmitter(channel, clock)) + }) + for(i <- 0 until channels.size) { + executorCompletion.take() } val startTime = System.currentTimeMillis() - while (outputBuffer.size < 5 && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + while (outputBuffer.size < 5 * channels.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + outputBuffer.size) Thread.sleep(100) } @@ -87,15 +130,13 @@ class FlumePollingReceiverSuite extends TestSuiteBase { assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") logInfo("Stopping context") ssc.stop() - sink.stop() - channel.stop() val flattenedBuffer = outputBuffer.flatten - assert(flattenedBuffer.size === 25) + assert(flattenedBuffer.size === 25 * channels.size) var counter = 0 - for (i <- 0 until 25) { - val eventToVerify = EventBuilder.withBody( - String.valueOf(i).getBytes("utf-8"), + for (k <- 0 until channels.size; i <- 0 until 25) { + val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " + + String.valueOf(i)).getBytes("utf-8"), Map[String, String]("test-" + i.toString -> "header")) var found = false var j = 0 @@ -110,7 +151,26 @@ class FlumePollingReceiverSuite extends TestSuiteBase { j += 1 } } - assert (counter === 25) + assert(counter === 25 * channels.size) } + private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] { + override def call(): Void = { + var t = 0 + for (i <- 0 until 5) { + val tx = channel.getTransaction + tx.begin() + for (j <- 0 until 5) { + channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes("utf-8"), + Map[String, String]("test-" + t.toString -> "header"))) + t += 1 + } + tx.commit() + tx.close() + Thread.sleep(500) // Allow some time for the events to reach + clock.addToTime(batchDuration.milliseconds) + } + null + } + } }