Skip to content

Commit

Permalink
[SPARK-6428] Turn on explicit type checking for public methods.
Browse files Browse the repository at this point in the history
This builds on my earlier pull requests and turns on the explicit type checking in scalastyle.

Author: Reynold Xin <[email protected]>

Closes apache#5342 from rxin/SPARK-6428 and squashes the following commits:

7b531ab [Reynold Xin] import ordering
2d9a8a5 [Reynold Xin] jl
e668b1c [Reynold Xin] override
9b9e119 [Reynold Xin] Parenthesis.
82e0cf5 [Reynold Xin] [SPARK-6428] Turn on explicit type checking for public methods.
  • Loading branch information
rxin committed Apr 3, 2015
1 parent c42c3fc commit 82701ee
Show file tree
Hide file tree
Showing 46 changed files with 170 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.call(x).asScala
def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
*/
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x)
def fn: (T) => S = (x: T) => f.call(x)
import com.google.common.collect.Ordering // shadows scala.math.Ordering
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
implicit val ctag: ClassTag[S] = fakeClassTag
Expand Down
53 changes: 34 additions & 19 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.{lang => jl}
import java.lang.{Iterable => JIterable, Long => JLong}
import java.util.{Comparator, List => JList, Iterator => JIterator}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -93,7 +94,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of the original partition.
*/
def mapPartitionsWithIndex[R](
f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]],
f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
preservesPartitioning)(fakeClassTag))(fakeClassTag)
Expand All @@ -109,7 +110,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def cm = implicitly[ClassTag[(K2, V2)]]
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}

Expand All @@ -119,7 +120,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x).asScala
def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}

Expand All @@ -129,8 +130,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}

/**
Expand All @@ -139,16 +140,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x).asScala
def cm = implicitly[ClassTag[(K2, V2)]]
def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}

/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}

Expand All @@ -157,7 +160,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
JavaRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
}
Expand All @@ -166,16 +171,20 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
}

/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}

Expand All @@ -184,7 +193,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
}
Expand All @@ -194,7 +205,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
JavaPairRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
}
Expand Down Expand Up @@ -277,8 +290,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
(x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
}
JavaRDD.fromRDD(
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}
Expand Down Expand Up @@ -441,8 +456,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): java.util.Map[T, java.lang.Long] =
mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2)))))
def countByValue(): java.util.Map[T, jl.Long] =
mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2)))))

/**
* (Experimental) Approximate version of countByValue().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ object LocalKMeans {
val convergeDist = 0.001
val rand = new Random(42)

def generateData = {
def generatePoint(i: Int) = {
def generateData: Array[DenseVector[Double]] = {
def generatePoint(i: Int): DenseVector[Double] = {
DenseVector.fill(D){rand.nextDouble * R}
}
Array.tabulate(N)(generatePoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ object LocalLR {

case class DataPoint(x: Vector[Double], y: Double)

def generateData = {
def generatePoint(i: Int) = {
def generateData: Array[DataPoint] = {
def generatePoint(i: Int): DataPoint = {
val y = if(i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ object LogQuery {
// scalastyle:on
/** Tracks the total query count and number of aggregate bytes for a particular group. */
class Stats(val count: Int, val numBytes: Int) extends Serializable {
def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
override def toString = "bytes=%s\tn=%s".format(numBytes, count)
def merge(other: Stats): Stats = new Stats(count + other.count, numBytes + other.numBytes)
override def toString: String = "bytes=%s\tn=%s".format(numBytes, count)
}

def extractKey(line: String): (String, String, String) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ object SparkLR {

case class DataPoint(x: Vector[Double], y: Double)

def generateData = {
def generatePoint(i: Int) = {
def generateData: Array[DataPoint] = {
def generatePoint(i: Int): DataPoint = {
val y = if(i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object SparkTC {
val numVertices = 100
val rand = new Random(42)

def generateGraph = {
def generateGraph: Seq[(Int, Int)] = {
val edges: mutable.Set[(Int, Int)] = mutable.Set.empty
while (edges.size < numEdges) {
val from = rand.nextInt(numVertices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PRMessage() extends Message[String] with Serializable {
}

class CustomPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
def numPartitions: Int = partitions

def getPartition(key: Any): Int = {
val hash = key match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ object MovieLensALS {
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean)
: Double = {

def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
def mapPredictedRating(r: Double): Double = {
if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
}

val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map{ x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ extends Actor with ActorHelper {

lazy private val remotePublisher = context.actorSelection(urlOfPublisher)

override def preStart = remotePublisher ! SubscribeReceiver(context.self)
override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self)

def receive = {
def receive: PartialFunction[Any, Unit] = {
case msg => store(msg.asInstanceOf[T])
}

override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ import org.apache.spark.util.IntParam
*/
object RecoverableNetworkWordCount {

def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
: StreamingContext = {

// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.SparkConf
*/
object SimpleZeroMQPublisher {

def main(args: Array[String]) = {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
System.exit(1)
Expand All @@ -45,7 +45,7 @@ object SimpleZeroMQPublisher {
val acs: ActorSystem = ActorSystem()

val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
implicit def stringToByteString(x: String) = ByteString(x)
implicit def stringToByteString(x: String): ByteString = ByteString(x)
val messages: List[ByteString] = List("words ", "may ", "count ")
while (true) {
Thread.sleep(1000)
Expand Down Expand Up @@ -86,7 +86,7 @@ object ZeroMQWordCount {
// Create the context and set the batch size
val ssc = new StreamingContext(sparkConf, Seconds(2))

def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator

// For this stream, a zeroMQ publisher should be running.
val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object PageViewGenerator {
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
override def run(): Unit = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver

import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._

Expand Down Expand Up @@ -187,8 +186,8 @@ class FlumeReceiver(
logInfo("Flume receiver stopped")
}

override def preferredLocation = Some(host)
override def preferredLocation: Option[String] = Option(host)

/** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
Expand All @@ -198,13 +197,12 @@ class FlumeReceiver(
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {

def getPipeline() = {
def getPipeline(): ChannelPipeline = {
val pipeline = Channels.pipeline()
val encoder = new ZlibEncoder(6)
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ class DirectKafkaInputDStream[

private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
def batchForTime = data.asInstanceOf[mutable.HashMap[
Time, Array[OffsetRange.OffsetRangeTuple]]]
def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
}

override def update(time: Time) {
batchForTime.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class KafkaRDD[
.dropWhile(_.offset < requestOffset)
}

override def close() = consumer.close()
override def close(): Unit = consumer.close()

override def getNext(): R = {
if (iter == null || !iter.hasNext) {
Expand Down Expand Up @@ -207,7 +207,7 @@ object KafkaRDD {
fromOffsets: Map[TopicAndPartition, Long],
untilOffsets: Map[TopicAndPartition, LeaderOffset],
messageHandler: MessageAndMetadata[K, V] => R
): KafkaRDD[K, V, U, T, R] = {
): KafkaRDD[K, V, U, T, R] = {
val leaders = untilOffsets.map { case (tp, lo) =>
tp -> (lo.host, lo.port)
}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TwitterReceiver(
try {
val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
newTwitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
def onStatus(status: Status): Unit = {
store(status)
}
// Unimplemented
Expand Down
Loading

0 comments on commit 82701ee

Please sign in to comment.