Skip to content

Commit

Permalink
[SPARK-12525] Fix fatal compiler warnings in Kinesis ASL due to @tran…
Browse files Browse the repository at this point in the history
…sient annotations

The Scala 2.11 SBT build currently fails for Spark 1.6.0 and master due to warnings about the `transient` annotation:

```
[error] [warn] /Users/joshrosen/Documents/spark/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala:73: no valid targets for annotation on value sc - it is discarded unused. You may specify targets with meta-annotations, e.g. (transient param)
[error] [warn]     transient sc: SparkContext,
```

This fix implemented here is the same as what we did in #8433: remove the `transient` annotations when they are not necessary and replace use  `transient private val` in the remaining cases.

Author: Josh Rosen <[email protected]>

Closes #10479 from JoshRosen/fix-sbt-2.11.
  • Loading branch information
JoshRosen committed Dec 28, 2015
1 parent a6d3853 commit fb572c6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,26 @@ class KinesisBackedBlockRDDPartition(
*/
private[kinesis]
class KinesisBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
sc: SparkContext,
val regionName: String,
val endpointUrl: String,
@transient blockIds: Array[BlockId],
@transient private val _blockIds: Array[BlockId],
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient isBlockIdValid: Array[Boolean] = Array.empty,
@transient private val isBlockIdValid: Array[Boolean] = Array.empty,
val retryTimeoutMs: Int = 10000,
val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
val awsCredentialsOption: Option[SerializableAWSCredentials] = None
) extends BlockRDD[T](sc, blockIds) {
) extends BlockRDD[T](sc, _blockIds) {

require(blockIds.length == arrayOfseqNumberRanges.length,
require(_blockIds.length == arrayOfseqNumberRanges.length,
"Number of blockIds is not equal to the number of sequence number ranges")

override def isValid(): Boolean = true

override def getPartitions: Array[Partition] = {
Array.tabulate(blockIds.length) { i =>
Array.tabulate(_blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
new KinesisBackedBlockRDDPartition(i, blockIds(i), isValid, arrayOfseqNumberRanges(i))
new KinesisBackedBlockRDDPartition(i, _blockIds(i), isValid, arrayOfseqNumberRanges(i))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.streaming.{Duration, StreamingContext, Time}

private[kinesis] class KinesisInputDStream[T: ClassTag](
@transient _ssc: StreamingContext,
_ssc: StreamingContext,
streamName: String,
endpointUrl: String,
regionName: String,
Expand Down

2 comments on commit fb572c6

@yssharma
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen - Could this fix be causing the runtime exception while running the KinesisTestSuite ?
object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@764b14b8)

- Read data available partially in block manager, rest in Kinesis *** FAILED *** org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2284) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2058) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) ... Cause: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@764b14b8) - field (class: org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD, name: org$apache$spark$streaming$kinesis$KinesisBackedBlockRDD$$sc, type: class org.apache.spark.SparkContext) - object (class org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD, KinesisBackedBlockRDD[0] at BlockRDD at KinesisBackedBlockRDD.scala:87) - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@41bfa9e9) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@359066bc) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@41bfa9e9)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at map at KinesisBackedBlockRDDSuite.scala:218) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, <function0>) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13, <function1>)

@yssharma
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen All good mate. Was trying some test cases for and suspected if it affected the PR. Able to get the tests working now. Cheers
#17467

Please sign in to comment.