diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala new file mode 100644 index 0000000000000..1cc8c8d5c23b6 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam +import java.net.InetSocketAddress + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with the Spark Sink running in a Flume agent. See + * the Spark Streaming programming guide for more details. + * + * Usage: FlumePollingEventCount + * `host` is the host on which the Spark Sink is running. + * `port` is the port at which the Spark Sink is listening. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] ` + */ +object FlumePollingEventCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: FlumePollingEventCount ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(host, IntParam(port)) = args + + val batchInterval = Milliseconds(2000) + + // Create the context and set the batch size + val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") + val ssc = new StreamingContext(sparkConf, batchInterval) + + // Create a flume stream that polls the Spark Sink running in a Flume agent + val stream = FlumeUtils.createPollingStream(ssc, host, port) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml new file mode 100644 index 0000000000000..d11129ce8d89d --- /dev/null +++ b/external/flume-sink/pom.xml @@ -0,0 +1,100 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.1.0-SNAPSHOT + ../../pom.xml + + + spark-streaming-flume-sink_2.10 + + streaming-flume-sink + + + jar + Spark Project External Flume Sink + http://spark.apache.org/ + + + org.apache.flume + flume-ng-sdk + 1.4.0 + + + io.netty + netty + + + org.apache.thrift + libthrift + + + + + org.apache.flume + flume-ng-core + 1.4.0 + + + io.netty + netty + + + org.apache.thrift + libthrift + + + + + org.scala-lang + scala-library + 2.10.4 + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + org.apache.avro + avro-maven-plugin + 1.7.3 + + + ${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro + + + + generate-sources + + idl-protocol + + + + + + + diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl new file mode 100644 index 0000000000000..8806e863ac7c6 --- /dev/null +++ b/external/flume-sink/src/main/avro/sparkflume.avdl @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +@namespace("org.apache.spark.streaming.flume.sink") + +protocol SparkFlumeProtocol { + + record SparkSinkEvent { + map headers; + bytes body; + } + + record EventBatch { + string errorMsg = ""; // If this is empty it is a valid message, else it represents an error + string sequenceNumber; + array events; + } + + EventBatch getEventBatch (int n); + + void ack (string sequenceNumber); + + void nack (string sequenceNumber); +} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala new file mode 100644 index 0000000000000..17cbc6707b5ea --- /dev/null +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume.sink + +import org.slf4j.{Logger, LoggerFactory} + +/** + * Copy of the org.apache.spark.Logging for being used in the Spark Sink. + * The org.apache.spark.Logging is not used so that all of Spark is not brought + * in as a dependency. + */ +private[sink] trait Logging { + // Make the log field transient so that objects with Logging can + // be serialized and used on another machine + @transient private var log_ : Logger = null + + // Method to get or create the logger for this object + protected def log: Logger = { + if (log_ == null) { + initializeIfNecessary() + var className = this.getClass.getName + // Ignore trailing $'s in the class names for Scala objects + if (className.endsWith("$")) { + className = className.substring(0, className.length - 1) + } + log_ = LoggerFactory.getLogger(className) + } + log_ + } + + // Log methods that take only a String + protected def logInfo(msg: => String) { + if (log.isInfoEnabled) log.info(msg) + } + + protected def logDebug(msg: => String) { + if (log.isDebugEnabled) log.debug(msg) + } + + protected def logTrace(msg: => String) { + if (log.isTraceEnabled) log.trace(msg) + } + + protected def logWarning(msg: => String) { + if (log.isWarnEnabled) log.warn(msg) + } + + protected def logError(msg: => String) { + if (log.isErrorEnabled) log.error(msg) + } + + // Log methods that take Throwables (Exceptions/Errors) too + protected def logInfo(msg: => String, throwable: Throwable) { + if (log.isInfoEnabled) log.info(msg, throwable) + } + + protected def logDebug(msg: => String, throwable: Throwable) { + if (log.isDebugEnabled) log.debug(msg, throwable) + } + + protected def logTrace(msg: => String, throwable: Throwable) { + if (log.isTraceEnabled) log.trace(msg, throwable) + } + + protected def logWarning(msg: => String, throwable: Throwable) { + if (log.isWarnEnabled) log.warn(msg, throwable) + } + + protected def logError(msg: => String, throwable: Throwable) { + if (log.isErrorEnabled) log.error(msg, throwable) + } + + protected def isTraceEnabled(): Boolean = { + log.isTraceEnabled + } + + private def initializeIfNecessary() { + if (!Logging.initialized) { + Logging.initLock.synchronized { + if (!Logging.initialized) { + initializeLogging() + } + } + } + } + + private def initializeLogging() { + Logging.initialized = true + + // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads + // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html + log + } +} + +private[sink] object Logging { + @volatile private var initialized = false + val initLock = new Object() + try { + // We use reflection here to handle the case where users remove the + // slf4j-to-jul bridge order to route their logs to JUL. + val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler") + bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null) + val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean] + if (!installed) { + bridgeClass.getMethod("install").invoke(null) + } + } catch { + case e: ClassNotFoundException => // can't log anything yet so just fail silently + } +} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala new file mode 100644 index 0000000000000..7da8eb3e35912 --- /dev/null +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume.sink + +import java.util.concurrent.{ConcurrentHashMap, Executors} +import java.util.concurrent.atomic.AtomicLong + +import org.apache.flume.Channel +import org.apache.commons.lang.RandomStringUtils +import com.google.common.util.concurrent.ThreadFactoryBuilder + +/** + * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process + * requests. Each getEvents, ack and nack call is forwarded to an instance of this class. + * @param threads Number of threads to use to process requests. + * @param channel The channel that the sink pulls events from + * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark + * is rolled back. + */ +// Flume forces transactions to be thread-local. So each transaction *must* be committed, or +// rolled back from the thread it was originally created in. So each getEvents call from Spark +// creates a TransactionProcessor which runs in a new thread, in which the transaction is created +// and events are pulled off the channel. Once the events are sent to spark, +// that thread is blocked and the TransactionProcessor is saved in a map, +// until an ACK or NACK comes back or the transaction times out (after the specified timeout). +// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then +// unblocked, at which point the transaction is committed or rolled back. + +private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, + val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging { + val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Spark Sink Processor Thread - %d").build())) + private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]() + // This sink will not persist sequence numbers and reuses them if it gets restarted. + // So it is possible to commit a transaction which may have been meant for the sink before the + // restart. + // Since the new txn may not have the same sequence number we must guard against accidentally + // committing a new transaction. To reduce the probability of that happening a random string is + // prepended to the sequence number. Does not change for life of sink + private val seqBase = RandomStringUtils.randomAlphanumeric(8) + private val seqCounter = new AtomicLong(0) + + /** + * Returns a bunch of events to Spark over Avro RPC. + * @param n Maximum number of events to return in a batch + * @return [[EventBatch]] instance that has a sequence number and an array of at most n events + */ + override def getEventBatch(n: Int): EventBatch = { + logDebug("Got getEventBatch call from Spark.") + val sequenceNumber = seqBase + seqCounter.incrementAndGet() + val processor = new TransactionProcessor(channel, sequenceNumber, + n, transactionTimeout, backOffInterval, this) + transactionExecutorOpt.foreach(executor => { + executor.submit(processor) + }) + // Wait until a batch is available - will be an error if error message is non-empty + val batch = processor.getEventBatch + if (!SparkSinkUtils.isErrorBatch(batch)) { + processorMap.put(sequenceNumber.toString, processor) + logDebug("Sending event batch with sequence number: " + sequenceNumber) + } + batch + } + + /** + * Called by Spark to indicate successful commit of a batch + * @param sequenceNumber The sequence number of the event batch that was successful + */ + override def ack(sequenceNumber: CharSequence): Void = { + logDebug("Received Ack for batch with sequence number: " + sequenceNumber) + completeTransaction(sequenceNumber, success = true) + null + } + + /** + * Called by Spark to indicate failed commit of a batch + * @param sequenceNumber The sequence number of the event batch that failed + * @return + */ + override def nack(sequenceNumber: CharSequence): Void = { + completeTransaction(sequenceNumber, success = false) + logInfo("Spark failed to commit transaction. Will reattempt events.") + null + } + + /** + * Helper method to commit or rollback a transaction. + * @param sequenceNumber The sequence number of the batch that was completed + * @param success Whether the batch was successful or not. + */ + private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { + Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => { + processor.batchProcessed(success) + }) + } + + /** + * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak. + * @param sequenceNumber + * @return The transaction processor for the corresponding batch. Note that this instance is no + * longer tracked and the caller is responsible for that txn processor. + */ + private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = { + processorMap.remove(sequenceNumber.toString) // The toString is required! + } + + /** + * Shuts down the executor used to process transactions. + */ + def shutdown() { + logInfo("Shutting down Spark Avro Callback Handler") + transactionExecutorOpt.foreach(executor => { + executor.shutdownNow() + }) + } +} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala new file mode 100644 index 0000000000000..7b735133e3d14 --- /dev/null +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume.sink + +import java.net.InetSocketAddress +import java.util.concurrent._ + +import org.apache.avro.ipc.NettyServer +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.flume.Context +import org.apache.flume.Sink.Status +import org.apache.flume.conf.{Configurable, ConfigurationException} +import org.apache.flume.sink.AbstractSink + +/** + * A sink that uses Avro RPC to run a server that can be polled by Spark's + * FlumePollingInputDStream. This sink has the following configuration parameters: + * + * hostname - The hostname to bind to. Default: 0.0.0.0 + * port - The port to bind to. (No default - mandatory) + * timeout - Time in seconds after which a transaction is rolled back, + * if an ACK is not received from Spark within that time + * threads - Number of threads to use to receive requests from Spark (Default: 10) + * + * This sink is unlike other Flume sinks in the sense that it does not push data, + * instead the process method in this sink simply blocks the SinkRunner the first time it is + * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol. + * + * Each time a getEventBatch call comes, creates a transaction and reads events + * from the channel. When enough events are read, the events are sent to the Spark receiver and + * the thread itself is blocked and a reference to it saved off. + * + * When the ack for that batch is received, + * the thread which created the transaction is is retrieved and it commits the transaction with the + * channel from the same thread it was originally created in (since Flume transactions are + * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack + * is received within the specified timeout, the transaction is rolled back too. If an ack comes + * after that, it is simply ignored and the events get re-sent. + * + */ + +private[flume] +class SparkSink extends AbstractSink with Logging with Configurable { + + // Size of the pool to use for holding transaction processors. + private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS + + // Timeout for each transaction. If spark does not respond in this much time, + // rollback the transaction + private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT + + // Address info to bind on + private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME + private var port: Int = 0 + + private var backOffInterval: Int = 200 + + // Handle to the server + private var serverOpt: Option[NettyServer] = None + + // The handler that handles the callback from Avro + private var handler: Option[SparkAvroCallbackHandler] = None + + // Latch that blocks off the Flume framework from wasting 1 thread. + private val blockingLatch = new CountDownLatch(1) + + override def start() { + logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " + + hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " + + transactionTimeout + ".") + handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout, + backOffInterval)) + val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get) + // Using the constructor that takes specific thread-pools requires bringing in netty + // dependencies which are being excluded in the build. In practice, + // Netty dependencies are already available on the JVM as Flume would have pulled them in. + serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) + serverOpt.foreach(server => { + logInfo("Starting Avro server for sink: " + getName) + server.start() + }) + super.start() + } + + override def stop() { + logInfo("Stopping Spark Sink: " + getName) + handler.foreach(callbackHandler => { + callbackHandler.shutdown() + }) + serverOpt.foreach(server => { + logInfo("Stopping Avro Server for sink: " + getName) + server.close() + server.join() + }) + blockingLatch.countDown() + super.stop() + } + + override def configure(ctx: Context) { + import SparkSinkConfig._ + hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME) + port = Option(ctx.getInteger(CONF_PORT)). + getOrElse(throw new ConfigurationException("The port to bind to must be specified")) + poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS) + transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT) + backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL) + logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " + + "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " + + "backoffInterval: " + backOffInterval) + } + + override def process(): Status = { + // This method is called in a loop by the Flume framework - block it until the sink is + // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is + // being shut down. + logInfo("Blocking Sink Runner, sink will continue to run..") + blockingLatch.await() + Status.BACKOFF + } +} + +/** + * Configuration parameters and their defaults. + */ +private[flume] +object SparkSinkConfig { + val THREADS = "threads" + val DEFAULT_THREADS = 10 + + val CONF_TRANSACTION_TIMEOUT = "timeout" + val DEFAULT_TRANSACTION_TIMEOUT = 60 + + val CONF_HOSTNAME = "hostname" + val DEFAULT_HOSTNAME = "0.0.0.0" + + val CONF_PORT = "port" + + val CONF_BACKOFF_INTERVAL = "backoffInterval" + val DEFAULT_BACKOFF_INTERVAL = 200 +} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala new file mode 100644 index 0000000000000..47c0e294d6b52 --- /dev/null +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume.sink + +private[flume] object SparkSinkUtils { + /** + * This method determines if this batch represents an error or not. + * @param batch - The batch to check + * @return - true if the batch represents an error + */ + def isErrorBatch(batch: EventBatch): Boolean = { + !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch. + } +} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala new file mode 100644 index 0000000000000..b9e3c786ebb3b --- /dev/null +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume.sink + +import java.nio.ByteBuffer +import java.util +import java.util.concurrent.{Callable, CountDownLatch, TimeUnit} + +import scala.util.control.Breaks + +import org.apache.flume.{Transaction, Channel} + +// Flume forces transactions to be thread-local (horrible, I know!) +// So the sink basically spawns a new thread to pull the events out within a transaction. +// The thread fills in the event batch object that is set before the thread is scheduled. +// After filling it in, the thread waits on a condition - which is released only +// when the success message comes back for the specific sequence number for that event batch. +/** + * This class represents a transaction on the Flume channel. This class runs a separate thread + * which owns the transaction. The thread is blocked until the success call for that transaction + * comes back with an ACK or NACK. + * @param channel The channel from which to pull events + * @param seqNum The sequence number to use for the transaction. Must be unique + * @param maxBatchSize The maximum number of events to process per batch + * @param transactionTimeout Time in seconds after which a transaction must be rolled back + * without waiting for an ACK from Spark + * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts + */ +private class TransactionProcessor(val channel: Channel, val seqNum: String, + var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int, + val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging { + + // If a real batch is not returned, we always have to return an error batch. + @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "", + util.Collections.emptyList()) + + // Synchronization primitives + val batchGeneratedLatch = new CountDownLatch(1) + val batchAckLatch = new CountDownLatch(1) + + // Sanity check to ensure we don't loop like crazy + val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2 + + // OK to use volatile, since the change would only make this true (otherwise it will be + // changed to false - we never apply a negation operation to this) - which means the transaction + // succeeded. + @volatile private var batchSuccess = false + + // The transaction that this processor would handle + var txOpt: Option[Transaction] = None + + /** + * Get an event batch from the channel. This method will block until a batch of events is + * available from the channel. If no events are available after a large number of attempts of + * polling the channel, this method will return an [[EventBatch]] with a non-empty error message + * + * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a + * maximum of maxBatchSize events + */ + def getEventBatch: EventBatch = { + batchGeneratedLatch.await() + eventBatch + } + + /** + * This method is to be called by the sink when it receives an ACK or NACK from Spark. This + * method is a no-op if it is called after transactionTimeout has expired since + * getEventBatch returned a batch of events. + * @param success True if an ACK was received and the transaction should be committed, else false. + */ + def batchProcessed(success: Boolean) { + logDebug("Batch processed for sequence number: " + seqNum) + batchSuccess = success + batchAckLatch.countDown() + } + + /** + * Populates events into the event batch. If the batch cannot be populated, + * this method will not set the events into the event batch, but it sets an error message. + */ + private def populateEvents() { + try { + txOpt = Option(channel.getTransaction) + if(txOpt.isEmpty) { + eventBatch.setErrorMsg("Something went wrong. Channel was " + + "unable to create a transaction!") + } + txOpt.foreach(tx => { + tx.begin() + val events = new util.ArrayList[SparkSinkEvent](maxBatchSize) + val loop = new Breaks + var gotEventsInThisTxn = false + var loopCounter: Int = 0 + loop.breakable { + while (events.size() < maxBatchSize + && loopCounter < totalAttemptsToRemoveFromChannel) { + loopCounter += 1 + Option(channel.take()) match { + case Some(event) => + events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders), + ByteBuffer.wrap(event.getBody))) + gotEventsInThisTxn = true + case None => + if (!gotEventsInThisTxn) { + logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" + + " the current transaction") + TimeUnit.MILLISECONDS.sleep(backOffInterval) + } else { + loop.break() + } + } + } + } + if (!gotEventsInThisTxn) { + val msg = "Tried several times, " + + "but did not get any events from the channel!" + logWarning(msg) + eventBatch.setErrorMsg(msg) + } else { + // At this point, the events are available, so fill them into the event batch + eventBatch = new EventBatch("",seqNum, events) + } + }) + } catch { + case e: Exception => + logWarning("Error while processing transaction.", e) + eventBatch.setErrorMsg(e.getMessage) + try { + txOpt.foreach(tx => { + rollbackAndClose(tx, close = true) + }) + } finally { + txOpt = None + } + } finally { + batchGeneratedLatch.countDown() + } + } + + /** + * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in + * this method commits the transaction with the channel. If the ACK does not come in within + * that time or a NACK comes in, this method rolls back the transaction. + */ + private def processAckOrNack() { + batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS) + txOpt.foreach(tx => { + if (batchSuccess) { + try { + logDebug("Committing transaction") + tx.commit() + } catch { + case e: Exception => + logWarning("Error while attempting to commit transaction. Transaction will be rolled " + + "back", e) + rollbackAndClose(tx, close = false) // tx will be closed later anyway + } finally { + tx.close() + } + } else { + logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.") + rollbackAndClose(tx, close = true) + // This might have been due to timeout or a NACK. Either way the following call does not + // cause issues. This is required to ensure the TransactionProcessor instance is not leaked + parent.removeAndGetProcessor(seqNum) + } + }) + } + + /** + * Helper method to rollback and optionally close a transaction + * @param tx The transaction to rollback + * @param close Whether the transaction should be closed or not after rolling back + */ + private def rollbackAndClose(tx: Transaction, close: Boolean) { + try { + logWarning("Spark was unable to successfully process the events. Transaction is being " + + "rolled back.") + tx.rollback() + } catch { + case e: Exception => + logError("Error rolling back transaction. Rollback may have failed!", e) + } finally { + if (close) { + tx.close() + } + } + } + + /** + * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence] + * @param inMap The map to be converted + * @return The converted map + */ + private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence, + CharSequence] = { + val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size()) + charSeqMap.putAll(inMap) + charSeqMap + } + + /** + * When the thread is started it sets as many events as the batch size or less (if enough + * events aren't available) into the eventBatch and object and lets any threads waiting on the + * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in, + * or for a specified timeout and commits or rolls back the transaction. + * @return + */ + override def call(): Void = { + populateEvents() + processAckOrNack() + null + } +} diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 874b8a7959bb6..9f680b27c3308 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -77,6 +77,11 @@ junit-interface test + + org.apache.spark + spark-streaming-flume-sink_2.10 + ${project.version} + target/scala-${scala.binary.version}/classes diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala new file mode 100644 index 0000000000000..dc629df4f4ac2 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.io.{ObjectOutput, ObjectInput} + +import scala.collection.JavaConversions._ + +import org.apache.spark.util.Utils +import org.apache.spark.Logging + +/** + * A simple object that provides the implementation of readExternal and writeExternal for both + * the wrapper classes for Flume-style Events. + */ +private[streaming] object EventTransformer extends Logging { + def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence], + Array[Byte]) = { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.readFully(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.readFully(keyBuff) + val key: String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.readFully(valBuff) + val value: String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + (headers, bodyBuff) + } + + def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence], + body: Array[Byte]) { + out.writeInt(body.length) + out.write(body) + val numHeaders = headers.size() + out.writeInt(numHeaders) + for ((k,v) <- headers) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 56d2886b26878..4b2ea45fb81d0 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -39,11 +39,8 @@ import org.apache.spark.streaming.receiver.Receiver import org.jboss.netty.channel.ChannelPipelineFactory import org.jboss.netty.channel.Channels -import org.jboss.netty.channel.ChannelPipeline -import org.jboss.netty.channel.ChannelFactory import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.compression._ -import org.jboss.netty.handler.execution.ExecutionHandler private[streaming] class FlumeInputDStream[T: ClassTag]( diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala new file mode 100644 index 0000000000000..148262bb6771e --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume + + +import java.net.InetSocketAddress +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.flume.sink._ + +/** + * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running + * [[org.apache.spark.streaming.flume.sink.SparkSink]]s. + * @param _ssc Streaming context that will execute this input stream + * @param addresses List of addresses at which SparkSinks are listening + * @param maxBatchSize Maximum size of a batch + * @param parallelism Number of parallel connections to open + * @param storageLevel The storage level to use. + * @tparam T Class type of the object of this stream + */ +private[streaming] class FlumePollingInputDStream[T: ClassTag]( + @transient _ssc: StreamingContext, + val addresses: Seq[InetSocketAddress], + val maxBatchSize: Int, + val parallelism: Int, + storageLevel: StorageLevel + ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { + + override def getReceiver(): Receiver[SparkFlumeEvent] = { + new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) + } +} + +private[streaming] class FlumePollingReceiver( + addresses: Seq[InetSocketAddress], + maxBatchSize: Int, + parallelism: Int, + storageLevel: StorageLevel + ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { + + lazy val channelFactoryExecutor = + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true). + setNameFormat("Flume Receiver Channel Thread - %d").build()) + + lazy val channelFactory = + new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor) + + lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build()) + + private lazy val connections = new LinkedBlockingQueue[FlumeConnection]() + + override def onStart(): Unit = { + // Create the connections to each Flume agent. + addresses.foreach(host => { + val transceiver = new NettyTransceiver(host, channelFactory) + val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) + connections.add(new FlumeConnection(transceiver, client)) + }) + for (i <- 0 until parallelism) { + logInfo("Starting Flume Polling Receiver worker threads starting..") + // Threads that pull data from Flume. + receiverExecutor.submit(new Runnable { + override def run(): Unit = { + while (true) { + val connection = connections.poll() + val client = connection.client + try { + val eventBatch = client.getEventBatch(maxBatchSize) + if (!SparkSinkUtils.isErrorBatch(eventBatch)) { + // No error, proceed with processing data + val seq = eventBatch.getSequenceNumber + val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents + logDebug( + "Received batch of " + events.size() + " events with sequence number: " + seq) + try { + // Convert each Flume event to a serializable SparkFlumeEvent + val buffer = new ArrayBuffer[SparkFlumeEvent](events.size()) + var j = 0 + while (j < events.size()) { + buffer += toSparkFlumeEvent(events(j)) + j += 1 + } + store(buffer) + logDebug("Sending ack for sequence number: " + seq) + // Send an ack to Flume so that Flume discards the events from its channels. + client.ack(seq) + logDebug("Ack sent for sequence number: " + seq) + } catch { + case e: Exception => + try { + // Let Flume know that the events need to be pushed back into the channel. + logDebug("Sending nack for sequence number: " + seq) + client.nack(seq) // If the agent is down, even this could fail and throw + logDebug("Nack sent for sequence number: " + seq) + } catch { + case e: Exception => logError( + "Sending Nack also failed. A Flume agent is down.") + } + TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds. + logWarning("Error while attempting to store events", e) + } + } else { + logWarning("Did not receive events from Flume agent due to error on the Flume " + + "agent: " + eventBatch.getErrorMsg) + } + } catch { + case e: Exception => + logWarning("Error while reading data from Flume", e) + } finally { + connections.add(connection) + } + } + } + }) + } + } + + override def onStop(): Unit = { + logInfo("Shutting down Flume Polling Receiver") + receiverExecutor.shutdownNow() + connections.foreach(connection => { + connection.transceiver.close() + }) + channelFactory.releaseExternalResources() + } + + /** + * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]] + * @param event - Event to convert to SparkFlumeEvent + * @return - The SparkFlumeEvent generated from SparkSinkEvent + */ + private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = { + val sparkFlumeEvent = new SparkFlumeEvent() + sparkFlumeEvent.event.setBody(event.getBody) + sparkFlumeEvent.event.setHeaders(event.getHeaders) + sparkFlumeEvent + } +} + +/** + * A wrapper around the transceiver and the Avro IPC API. + * @param transceiver The transceiver to use for communication with Flume + * @param client The client that the callbacks are received on. + */ +private class FlumeConnection(val transceiver: NettyTransceiver, + val client: SparkFlumeProtocol.Callback) + + + diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 716db9fa76031..4b732c1592ab2 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -17,12 +17,19 @@ package org.apache.spark.streaming.flume +import java.net.InetSocketAddress + +import org.apache.spark.annotation.Experimental import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + object FlumeUtils { + private val DEFAULT_POLLING_PARALLELISM = 5 + private val DEFAULT_POLLING_BATCH_SIZE = 1000 + /** * Create a input stream from a Flume source. * @param ssc StreamingContext object @@ -56,7 +63,7 @@ object FlumeUtils { ): ReceiverInputDStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream[SparkFlumeEvent]( ssc, hostname, port, storageLevel, enableDecompression) - + inputStream } @@ -105,4 +112,135 @@ object FlumeUtils { ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression) } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param hostname Address of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening + * @param storageLevel Storage level to use for storing the received objects + */ + @Experimental + def createPollingStream( + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param addresses List of InetSocketAddresses representing the hosts to connect to. + * @param storageLevel Storage level to use for storing the received objects + */ + @Experimental + def createPollingStream( + ssc: StreamingContext, + addresses: Seq[InetSocketAddress], + storageLevel: StorageLevel + ): ReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(ssc, addresses, storageLevel, + DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * @param addresses List of InetSocketAddresses representing the hosts to connect to. + * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a + * single RPC call + * @param parallelism Number of concurrent requests this stream should send to the sink. Note + * that having a higher number of requests concurrently being pulled will + * result in this stream using more threads + * @param storageLevel Storage level to use for storing the received objects + */ + @Experimental + def createPollingStream( + ssc: StreamingContext, + addresses: Seq[InetSocketAddress], + storageLevel: StorageLevel, + maxBatchSize: Int, + parallelism: Int + ): ReceiverInputDStream[SparkFlumeEvent] = { + new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize, + parallelism, storageLevel) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param hostname Hostname of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening + */ + @Experimental + def createPollingStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param hostname Hostname of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening + * @param storageLevel Storage level to use for storing the received objects + */ + @Experimental + def createPollingStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param addresses List of InetSocketAddresses on which the Spark Sink is running. + * @param storageLevel Storage level to use for storing the received objects + */ + @Experimental + def createPollingStream( + jssc: JavaStreamingContext, + addresses: Array[InetSocketAddress], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(jssc, addresses, storageLevel, + DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * @param addresses List of InetSocketAddresses on which the Spark Sink is running + * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a + * single RPC call + * @param parallelism Number of concurrent requests this stream should send to the sink. Note + * that having a higher number of requests concurrently being pulled will + * result in this stream using more threads + * @param storageLevel Storage level to use for storing the received objects + */ + @Experimental + def createPollingStream( + jssc: JavaStreamingContext, + addresses: Array[InetSocketAddress], + storageLevel: StorageLevel, + maxBatchSize: Int, + parallelism: Int + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) + } } diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java new file mode 100644 index 0000000000000..79c5b91654b42 --- /dev/null +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume; + +import java.net.InetSocketAddress; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; + +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.junit.Test; + +public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + // tests the API, does not actually test data receiving + InetSocketAddress[] addresses = new InetSocketAddress[] { + new InetSocketAddress("localhost", 12345) + }; + JavaReceiverInputDStream test1 = + FlumeUtils.createPollingStream(ssc, "localhost", 12345); + JavaReceiverInputDStream test2 = FlumeUtils.createPollingStream( + ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test3 = FlumeUtils.createPollingStream( + ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test4 = FlumeUtils.createPollingStream( + ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5); + } +} diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala new file mode 100644 index 0000000000000..47071d0cc4714 --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.streaming.flume + +import java.net.InetSocketAddress +import java.util.concurrent.{Callable, ExecutorCompletionService, Executors} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} + +import org.apache.flume.Context +import org.apache.flume.channel.MemoryChannel +import org.apache.flume.conf.Configurables +import org.apache.flume.event.EventBuilder + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} +import org.apache.spark.streaming.flume.sink._ + +class FlumePollingStreamSuite extends TestSuiteBase { + + val testPort = 9999 + val batchCount = 5 + val eventsPerBatch = 100 + val totalEventsPerChannel = batchCount * eventsPerBatch + val channelCapacity = 5000 + + test("flume polling test") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = + FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), + StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + outputStream.register() + + // Start the channel and sink. + val context = new Context() + context.put("capacity", channelCapacity.toString) + context.put("transactionCapacity", "1000") + context.put("keep-alive", "0") + val channel = new MemoryChannel() + Configurables.configure(channel, context) + + val sink = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort)) + Configurables.configure(sink, context) + sink.setChannel(channel) + sink.start() + ssc.start() + + writeAndVerify(Seq(channel), ssc, outputBuffer) + assertChannelIsEmpty(channel) + sink.stop() + channel.stop() + } + + test("flume polling test multiple hosts") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _)) + val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = + FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, + eventsPerBatch, 5) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + outputStream.register() + + // Start the channel and sink. + val context = new Context() + context.put("capacity", channelCapacity.toString) + context.put("transactionCapacity", "1000") + context.put("keep-alive", "0") + val channel = new MemoryChannel() + Configurables.configure(channel, context) + + val channel2 = new MemoryChannel() + Configurables.configure(channel2, context) + + val sink = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort)) + Configurables.configure(sink, context) + sink.setChannel(channel) + sink.start() + + val sink2 = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1)) + Configurables.configure(sink2, context) + sink2.setChannel(channel2) + sink2.start() + ssc.start() + writeAndVerify(Seq(channel, channel2), ssc, outputBuffer) + assertChannelIsEmpty(channel) + assertChannelIsEmpty(channel2) + sink.stop() + channel.stop() + } + + def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext, + outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) { + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val executor = Executors.newCachedThreadPool() + val executorCompletion = new ExecutorCompletionService[Void](executor) + channels.map(channel => { + executorCompletion.submit(new TxnSubmitter(channel, clock)) + }) + for (i <- 0 until channels.size) { + executorCompletion.take() + } + val startTime = System.currentTimeMillis() + while (outputBuffer.size < batchCount * channels.size && + System.currentTimeMillis() - startTime < 15000) { + logInfo("output.size = " + outputBuffer.size) + Thread.sleep(100) + } + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + val flattenedBuffer = outputBuffer.flatten + assert(flattenedBuffer.size === totalEventsPerChannel * channels.size) + var counter = 0 + for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) { + val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " + + String.valueOf(i)).getBytes("utf-8"), + Map[String, String]("test-" + i.toString -> "header")) + var found = false + var j = 0 + while (j < flattenedBuffer.size && !found) { + val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8") + if (new String(eventToVerify.getBody, "utf-8") == strToCompare && + eventToVerify.getHeaders.get("test-" + i.toString) + .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) { + found = true + counter += 1 + } + j += 1 + } + } + assert(counter === totalEventsPerChannel * channels.size) + } + + def assertChannelIsEmpty(channel: MemoryChannel) = { + val queueRemaining = channel.getClass.getDeclaredField("queueRemaining"); + queueRemaining.setAccessible(true) + val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits") + assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000) + } + + private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] { + override def call(): Void = { + var t = 0 + for (i <- 0 until batchCount) { + val tx = channel.getTransaction + tx.begin() + for (j <- 0 until eventsPerBatch) { + channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes( + "utf-8"), + Map[String, String]("test-" + t.toString -> "header"))) + t += 1 + } + tx.commit() + tx.close() + Thread.sleep(500) // Allow some time for the events to reach + clock.addToTime(batchDuration.milliseconds) + } + null + } + } +} diff --git a/pom.xml b/pom.xml index 93ef3b91b5bce..8b1435cfe5d19 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ external/twitter external/kafka external/flume + external/flume-sink external/zeromq external/mqtt examples diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 1629bc2cba8ba..0a6326e72297a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -30,11 +30,12 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, spark, sql, - streaming, streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) = + val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, spark, + sql, streaming, streamingFlumeSink, streamingFlume, streamingKafka, streamingMqtt, + streamingTwitter, streamingZeromq) = Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", - "spark", "sql", "streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt", - "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _)) + "spark", "sql", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka", + "streaming-mqtt", "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _)) val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl) = Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl") @@ -156,10 +157,9 @@ object SparkBuild extends PomBuild { /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) - /* Enable Mima for all projects except spark, hive, catalyst, sql and repl */ // TODO: Add Sql to mima checks - allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl).contains(x)). - foreach (x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)) + allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl, + streamingFlumeSink).contains(x)).foreach(x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)) /* Enable Assembly for all assembly projects */ assemblyProjects.foreach(enable(Assembly.settings)) @@ -173,6 +173,8 @@ object SparkBuild extends PomBuild { /* Hive console settings */ enable(Hive.settings)(hive) + enable(Flume.settings)(streamingFlumeSink) + // TODO: move this to its upstream project. override def projectDefinitions(baseDirectory: File): Seq[Project] = { super.projectDefinitions(baseDirectory).map { x => @@ -183,6 +185,10 @@ object SparkBuild extends PomBuild { } +object Flume { + lazy val settings = sbtavro.SbtAvro.avroSettings +} + object SQL { lazy val settings = Seq( diff --git a/project/plugins.sbt b/project/plugins.sbt index d3ac4bf335e87..06d18e193076e 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -24,3 +24,5 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6") addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.1") addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.0") + +addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")