From 9fd0da7885fb778edb9a1983d4922d304052fc95 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 14 Jul 2014 16:29:33 -0700 Subject: [PATCH] SPARK-1729. Use foreach instead of map for all Options. --- .../flume/sink/SparkAvroCallbackHandler.scala | 16 ++++++++++------ .../org/apache/spark/flume/sink/SparkSink.scala | 6 +++--- .../spark/flume/sink/TransactionProcessor.scala | 6 +++--- .../flume/FlumePollingInputDStream.scala | 2 +- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala index 1f6e60815dd06..11d4805657803 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala @@ -59,12 +59,16 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, val sequenceNumber = seqBase + seqCounter.incrementAndGet() val processor = new TransactionProcessor(channel, sequenceNumber, n, transactionTimeout, backOffInterval, this) - transactionExecutorOpt.map(executor => { + transactionExecutorOpt.foreach(executor => { executor.submit(processor) }) - processorMap.put(sequenceNumber, processor) - // Wait until a batch is available - will be an error if - processor.getEventBatch + // Wait until a batch is available - will be an error if error message is non-empty + val batch = processor.getEventBatch + if (batch.getErrorMsg != null && !batch.getErrorMsg.equals("")) { + processorMap.put(sequenceNumber, processor) + } + + batch } /** @@ -93,7 +97,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, * @param success Whether the batch was successful or not. */ private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { - Option(removeAndGetProcessor(sequenceNumber)).map(processor => { + Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => { processor.batchProcessed(success) }) } @@ -112,7 +116,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, * Shuts down the executor used to process transactions. */ def shutdown() { - transactionExecutorOpt.map(executor => { + transactionExecutorOpt.foreach(executor => { executor.shutdownNow() }) } diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala index cf968ee39435f..265b37dd0b302 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala @@ -84,7 +84,7 @@ class SparkSink extends AbstractSink with Configurable { // dependencies which are being excluded in the build. In practice, // Netty dependencies are already available on the JVM as Flume would have pulled them in. serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) - serverOpt.map(server => { + serverOpt.foreach(server => { LOG.info("Starting Avro server for sink: " + getName) server.start() }) @@ -93,10 +93,10 @@ class SparkSink extends AbstractSink with Configurable { override def stop() { LOG.info("Stopping Spark Sink: " + getName) - handler.map(callbackHandler => { + handler.foreach(callbackHandler => { callbackHandler.shutdown() }) - serverOpt.map(server => { + serverOpt.foreach(server => { LOG.info("Stopping Avro Server for sink: " + getName) server.close() server.join() diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala index 8c4860d364aee..a4689cca5624a 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala @@ -106,7 +106,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, eventBatch.setErrorMsg("Something went wrong. Channel was " + "unable to create a transaction!") } - txOpt.map(tx => { + txOpt.foreach(tx => { tx.begin() val events = new util.ArrayList[SparkSinkEvent](maxBatchSize) val loop = new Breaks @@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, LOG.error("Error while processing transaction.", e) eventBatch.setErrorMsg(e.getMessage) try { - txOpt.map(tx => { + txOpt.foreach(tx => { rollbackAndClose(tx, close = true) }) } finally { @@ -163,7 +163,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, */ private def processAckOrNack() { batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS) - txOpt.map(tx => { + txOpt.foreach(tx => { if (batchSuccess) { try { tx.commit() diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index ff6a5b5ce1d04..b8507f0a48a0e 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -151,7 +151,7 @@ private[streaming] class FlumePollingReceiver( override def onStop(): Unit = { logInfo("Shutting down Flume Polling Receiver") receiverExecutor.shutdownNow() - connections.map(connection => { + connections.foreach(connection => { connection.transceiver.close() }) channelFactory.releaseExternalResources()