From 91bfa72cba1cdc3b12e732303e72c43a52f3918c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 17 Apr 2014 11:07:31 -0700 Subject: [PATCH] Fixed bugs. --- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 6 +++++- .../org/apache/spark/streaming/kafka/KafkaStreamSuite.scala | 1 - .../spark/streaming/receiver/NetworkReceiverExecutor.scala | 2 +- .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 4 ++-- .../org/apache/spark/streaming/StreamingContextSuite.scala | 1 + 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 6a2d652528d8a..c17688b021084 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -121,7 +121,11 @@ private[spark] object UIUtils extends Logging { (records, "") } } - "%.1f%s".formatLocal(Locale.US, value, unit) + if (unit.isEmpty) { + "%d".formatLocal(Locale.US, value) + } else { + "%.1f%s".formatLocal(Locale.US, value, unit) + } } // Yarn has to go through a proxy so the base uri is provided and has to be on all links diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 0dc5318e8bfae..b959f9e1dbaf1 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -37,7 +37,6 @@ class KafkaStreamSuite extends TestSuiteBase { val test3: NetworkInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) - assert(test1.isInstanceOf) // TODO: Actually test receiving data ssc.stop() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutor.scala index a22d93e6a04be..e502ff57a4728 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutor.scala @@ -174,7 +174,7 @@ private[streaming] abstract class NetworkReceiverExecutor( } /** Check if receiver has been marked for stopping */ - def isReceiverStarted() = { + def isReceiverStarted() = synchronized { logDebug("state = " + receiverState) receiverState == Started } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 6607437db560a..1cadde7100ea5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -90,9 +90,9 @@ private[ui] class StreamingPage(parent: StreamingTab) val receiverInfo = listener.receiverInfo(receiverId) val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) - val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId)) + val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId)) val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => formatDurationVerbose(r.toLong)) + d.getQuantiles().map(r => formatNumber(r.toLong)) }.getOrElse { Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index aeaaf3bbae404..960980c773f2b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -283,6 +283,7 @@ class TestReceiver extends NetworkReceiver[Int](StorageLevel.MEMORY_ONLY) with L def onStart() { val thread = new Thread() { override def run() { + logInfo("Receiving started") while (!isStopped) { store(TestReceiver.counter.getAndIncrement) }