Skip to content

Commit

Permalink
[SPARK-4882] Register PythonBroadcast with Kryo so that PySpark works…
Browse files Browse the repository at this point in the history
… with KryoSerializer

This PR fixes an issue where PySpark broadcast variables caused NullPointerExceptions if KryoSerializer was used.  The fix is to register PythonBroadcast with Kryo so that it's deserialized with a KryoJavaSerializer.

Author: Josh Rosen <[email protected]>

Closes apache#3831 from JoshRosen/SPARK-4882 and squashes the following commits:

0466c7a [Josh Rosen] Register PythonBroadcast with Kryo.
d5b409f [Josh Rosen] Enable registrationRequired, which would have caught this bug.
069d8a7 [Josh Rosen] Add failing test for SPARK-4882

(cherry picked from commit efa80a5)
Signed-off-by: Josh Rosen <[email protected]>
  • Loading branch information
JoshRosen committed Dec 30, 2014
1 parent d5e0a45 commit 822a0b4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializ
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
Expand Down Expand Up @@ -79,6 +80,7 @@ class KryoSerializer(conf: SparkConf)
// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

// Allow the user to register their own classes by setting spark.kryo.registrator
for (regCls <- registrator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import scala.io.Source

import java.io.{PrintWriter, File}

import org.scalatest.{Matchers, FunSuite}

import org.apache.spark.{SharedSparkContext, SparkConf}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

// This test suite uses SharedSparkContext because we need a SparkEnv in order to deserialize
// a PythonBroadcast:
class PythonBroadcastSuite extends FunSuite with Matchers with SharedSparkContext {
test("PythonBroadcast can be serialized with Kryo (SPARK-4882)") {
val tempDir = Utils.createTempDir()
val broadcastedString = "Hello, world!"
def assertBroadcastIsValid(broadcast: PythonBroadcast): Unit = {
val source = Source.fromFile(broadcast.path)
val contents = source.mkString
source.close()
contents should be (broadcastedString)
}
try {
val broadcastDataFile: File = {
val file = new File(tempDir, "broadcastData")
val printWriter = new PrintWriter(file)
printWriter.write(broadcastedString)
printWriter.close()
file
}
val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath)
assertBroadcastIsValid(broadcast)
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
val deserializedBroadcast =
Utils.clone[PythonBroadcast](broadcast, new KryoSerializer(conf).newInstance())
assertBroadcastIsValid(deserializedBroadcast)
} finally {
Utils.deleteRecursively(tempDir)
}
}
}

0 comments on commit 822a0b4

Please sign in to comment.