diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index ed35e34ad45ab..07ae88febf916 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer +import java.util.concurrent.Executors import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.flume.source.avro.Status import org.apache.avro.ipc.specific.SpecificResponder import org.apache.avro.ipc.NettyServer - +import org.apache.spark.Logging import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ -import org.apache.spark.Logging +import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.receiver.Receiver +import org.jboss.netty.channel.ChannelPipelineFactory +import org.jboss.netty.channel.Channels +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.channel.ChannelFactory +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.handler.codec.compression._ +import org.jboss.netty.handler.execution.ExecutionHandler + private[streaming] class FlumeInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, - storageLevel: StorageLevel + storageLevel: StorageLevel, + enableDecompression: Boolean ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { override def getReceiver(): Receiver[SparkFlumeEvent] = { - new FlumeReceiver(host, port, storageLevel) + new FlumeReceiver(host, port, storageLevel, enableDecompression) } } @@ -134,22 +143,71 @@ private[streaming] class FlumeReceiver( host: String, port: Int, - storageLevel: StorageLevel + storageLevel: StorageLevel, + enableDecompression: Boolean ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { lazy val responder = new SpecificResponder( classOf[AvroSourceProtocol], new FlumeEventServer(this)) - lazy val server = new NettyServer(responder, new InetSocketAddress(host, port)) + var server: NettyServer = null + + private def initServer() = { + if (enableDecompression) { + val channelFactory = new NioServerSocketChannelFactory + (Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + val channelPipelieFactory = new CompressionChannelPipelineFactory() + + new NettyServer( + responder, + new InetSocketAddress(host, port), + channelFactory, + channelPipelieFactory, + null) + } else { + new NettyServer(responder, new InetSocketAddress(host, port)) + } + } def onStart() { - server.start() + synchronized { + if (server == null) { + server = initServer() + server.start() + } else { + logWarning("Flume receiver being asked to start more then once with out close") + } + } logInfo("Flume receiver started") } def onStop() { - server.close() + synchronized { + if (server != null) { + server.close() + server = null + } + } logInfo("Flume receiver stopped") } override def preferredLocation = Some(host) + + /** A Netty Pipeline factory that will decompress incoming data from + * and the Netty client and compress data going back to the client. + * + * The compression on the return is required because Flume requires + * a successful response to indicate it can remove the event/batch + * from the configured channel + */ + private[streaming] + class CompressionChannelPipelineFactory extends ChannelPipelineFactory { + + def getPipeline() = { + val pipeline = Channels.pipeline() + val encoder = new ZlibEncoder(6) + pipeline.addFirst("deflater", encoder) + pipeline.addFirst("inflater", new ZlibDecoder()) + pipeline + } +} } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 499f3560ef768..ec4eefd12ee9b 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -36,7 +36,25 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) + createStream(ssc, hostname, port, storageLevel, false) + } + + /** + * Create a input stream from a Flume source. + * @param ssc StreamingContext object + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + * @param enableDecompression should netty server decompress input stream + */ + def createStream ( + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean + ): ReceiverInputDStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression) inputStream } @@ -66,6 +84,23 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createStream(jssc.ssc, hostname, port, storageLevel) + createStream(jssc.ssc, hostname, port, storageLevel, false) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + * @param enableDecompression should netty server decompress input stream + */ + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression) } } diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java index e0ad4f1015205..3b5e0c7746b2c 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -30,5 +30,7 @@ public void testFlumeStream() { JavaReceiverInputDStream test1 = FlumeUtils.createStream(ssc, "localhost", 12345); JavaReceiverInputDStream test2 = FlumeUtils.createStream(ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test3 = FlumeUtils.createStream(ssc, "localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2(), false); } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index dd287d0ef90a0..73dffef953309 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.api.java.JavaReceiverInputDStream -class FlumeStreamSuite extends TestSuiteBase { +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.jboss.netty.channel.socket.SocketChannel +import org.jboss.netty.handler.codec.compression._ - val testPort = 9999 +class FlumeStreamSuite extends TestSuiteBase { test("flume input stream") { + runFlumeStreamTest(false, 9998) + } + + test("flume input compressed stream") { + runFlumeStreamTest(true, 9997) + } + + def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] = - FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) + FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer) @@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase { val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver) + var client: AvroSourceProtocol = null; + + if (enableDecompression) { + client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], + new NettyTransceiver(new InetSocketAddress("localhost", testPort), + new CompressionChannelFactory(6))); + } else { + client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], transceiver) + } for (i <- 0 until input.size) { val event = new AvroFlumeEvent @@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase { clock.addToTime(batchDuration.milliseconds) } + Thread.sleep(1000) + val startTime = System.currentTimeMillis() while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) @@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase { assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") } } + + class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory { + override def newChannel(pipeline:ChannelPipeline) : SocketChannel = { + var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel); + pipeline.addFirst("deflater", encoder); + pipeline.addFirst("inflater", new ZlibDecoder()); + super.newChannel(pipeline); + } + } }