Skip to content

Commit

Permalink
SPARK-1478: Upgrade FlumeInputDStream's Flume...
Browse files Browse the repository at this point in the history
SPARK-1478: Upgrade FlumeInputDStream's FlumeReceiver to support
FLUME-1915
  • Loading branch information
tmalaska committed Jun 21, 2014
1 parent ca5d8b5 commit 12617e5
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
import java.util.concurrent.Executors

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand All @@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer

import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.Logging
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver

import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
import org.jboss.netty.handler.execution.ExecutionHandler

private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
storageLevel: StorageLevel,
enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
new FlumeReceiver(host, port, storageLevel, enableDecompression)
}
}

Expand Down Expand Up @@ -134,22 +143,71 @@ private[streaming]
class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
storageLevel: StorageLevel,
enableDecompression: Boolean
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {

lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
var server: NettyServer = null

private def initServer() = {
if (enableDecompression) {
val channelFactory = new NioServerSocketChannelFactory
(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
val channelPipelieFactory = new CompressionChannelPipelineFactory()

new NettyServer(
responder,
new InetSocketAddress(host, port),
channelFactory,
channelPipelieFactory,
null)
} else {
new NettyServer(responder, new InetSocketAddress(host, port))
}
}

def onStart() {
server.start()
synchronized {
if (server == null) {
server = initServer()
server.start()
} else {
logWarning("Flume receiver being asked to start more then once with out close")
}
}
logInfo("Flume receiver started")
}

def onStop() {
server.close()
synchronized {
if (server != null) {
server.close()
server = null
}
}
logInfo("Flume receiver stopped")
}

override def preferredLocation = Some(host)

/** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
* The compression on the return is required because Flume requires
* a successful response to indicate it can remove the event/batch
* from the configured channel
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {

def getPipeline() = {
val pipeline = Channels.pipeline()
val encoder = new ZlibEncoder(6)
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,25 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
createStream(ssc, hostname, port, storageLevel, false)
}

/**
* Create a input stream from a Flume source.
* @param ssc StreamingContext object
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableDecompression should netty server decompress input stream
*/
def createStream (
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression)
inputStream
}

Expand Down Expand Up @@ -66,6 +84,23 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
createStream(jssc.ssc, hostname, port, storageLevel, false)
}

/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableDecompression should netty server decompress input stream
*/
def createStream(
jssc: JavaStreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ public void testFlumeStream() {
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream

class FlumeStreamSuite extends TestSuiteBase {
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.handler.codec.compression._

val testPort = 9999
class FlumeStreamSuite extends TestSuiteBase {

test("flume input stream") {
runFlumeStreamTest(false, 9998)
}

test("flume input compressed stream") {
runFlumeStreamTest(true, 9997)
}

def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
Expand All @@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
var client: AvroSourceProtocol = null;

if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
new CompressionChannelFactory(6)));
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
}

for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
Expand All @@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase {
clock.addToTime(batchDuration.milliseconds)
}

Thread.sleep(1000)

val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Expand All @@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}

class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
pipeline.addFirst("deflater", encoder);
pipeline.addFirst("inflater", new ZlibDecoder());
super.newChannel(pipeline);
}
}
}

0 comments on commit 12617e5

Please sign in to comment.