From ff036d31c6adbc2cd5f2c9347c267073b673167b Mon Sep 17 00:00:00 2001 From: Kalpit Shah Date: Fri, 25 Apr 2014 10:44:30 -0700 Subject: [PATCH 1/6] SPARK-1630: Make PythonRDD handle Null elements and strings gracefully --- .../apache/spark/api/python/PythonRDD.scala | 20 ++++++++++++++----- .../spark/api/python/PythonRDDSuite.scala | 5 +++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 672c344a56597..3f5b9e66698bd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -245,7 +245,7 @@ private object SpecialLengths { val TIMING_DATA = -3 } -private[spark] object PythonRDD { +private[spark] object PythonRDD extends Logging { val UTF8 = Charset.forName("UTF-8") def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): @@ -301,15 +301,25 @@ private[spark] object PythonRDD { throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass) } case other => - throw new SparkException("Unexpected element type " + first.getClass) + Option(other) match { + case None => + logDebug("Encountered NULL element from iterator. We skip writing NULL to stream.") + case Some(x) => + throw new SparkException("Unexpected element type " + first.getClass) + } } } } def writeUTF(str: String, dataOut: DataOutputStream) { - val bytes = str.getBytes(UTF8) - dataOut.writeInt(bytes.length) - dataOut.write(bytes) + Option(str) match { + case None => + logDebug("Encountered NULL string. We skip writing NULL to stream.") + case Some(x) => + val bytes = x.getBytes(UTF8) + dataOut.writeInt(bytes.length) + dataOut.write(bytes) + } } def writeToFile[T](items: java.util.Iterator[T], filename: String) { diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index 7b866f08a0e9f..d47fcf0111037 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -29,5 +29,10 @@ class PythonRDDSuite extends FunSuite { PythonRDD.writeIteratorToStream(input.iterator, buffer) } + test("Handle nulls gracefully") { + val input: List[String] = List("a",null) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } } From 8a4a0f94d34b76b44b590ca741b438393b803106 Mon Sep 17 00:00:00 2001 From: Kalpit Shah Date: Fri, 25 Apr 2014 11:24:41 -0700 Subject: [PATCH 2/6] SPARK-1630: Incorporated code-review feedback --- .../apache/spark/api/python/PythonRDD.scala | 18 ++++++++---------- .../spark/api/python/PythonRDDSuite.scala | 2 +- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 3f5b9e66698bd..29a0f7889659c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -301,22 +301,20 @@ private[spark] object PythonRDD extends Logging { throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass) } case other => - Option(other) match { - case None => - logDebug("Encountered NULL element from iterator. We skip writing NULL to stream.") - case Some(x) => - throw new SparkException("Unexpected element type " + first.getClass) + if (other == null) { + logDebug("Encountered NULL element from iterator. We skip writing NULL to stream.") + } else { + throw new SparkException("Unexpected element type " + first.getClass) } } } } def writeUTF(str: String, dataOut: DataOutputStream) { - Option(str) match { - case None => - logDebug("Encountered NULL string. We skip writing NULL to stream.") - case Some(x) => - val bytes = x.getBytes(UTF8) + if (str == null) { + logDebug("Encountered NULL string. We skip writing NULL to stream.") + } else { + val bytes = str.getBytes(UTF8) dataOut.writeInt(bytes.length) dataOut.write(bytes) } diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index d47fcf0111037..3304384432c70 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -30,7 +30,7 @@ class PythonRDDSuite extends FunSuite { } test("Handle nulls gracefully") { - val input: List[String] = List("a",null) + val input: List[String] = List("a", null) val buffer = new DataOutputStream(new ByteArrayOutputStream) PythonRDD.writeIteratorToStream(input.iterator, buffer) } From dddda9e2858d518c916b60972a2ba0a025b38855 Mon Sep 17 00:00:00 2001 From: Kalpit Shah Date: Fri, 25 Apr 2014 11:59:50 -0700 Subject: [PATCH 3/6] SPARK-1630: Fixed indentation --- .../spark/api/python/PythonRDDSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index 3304384432c70..c62f341441e53 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -23,16 +23,16 @@ import org.scalatest.FunSuite class PythonRDDSuite extends FunSuite { - test("Writing large strings to the worker") { - val input: List[String] = List("a"*100000) - val buffer = new DataOutputStream(new ByteArrayOutputStream) - PythonRDD.writeIteratorToStream(input.iterator, buffer) - } + test("Writing large strings to the worker") { + val input: List[String] = List("a"*100000) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } - test("Handle nulls gracefully") { - val input: List[String] = List("a", null) - val buffer = new DataOutputStream(new ByteArrayOutputStream) - PythonRDD.writeIteratorToStream(input.iterator, buffer) - } + test("Handle nulls gracefully") { + val input: List[String] = List("a", null) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } } From 3af8b4d4e7152bf5857201febd6e223aab4587fe Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 23 Jul 2014 12:47:33 -0700 Subject: [PATCH 4/6] turn Null of Java into None of Python --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 8 +++++--- python/pyspark/serializers.py | 5 ++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 25b76398c2290..f1e65e8dbd162 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -267,9 +267,10 @@ private object SpecialLengths { val END_OF_DATA_SECTION = -1 val PYTHON_EXCEPTION_THROWN = -2 val TIMING_DATA = -3 + val NULL = -4 } -private[spark] object PythonRDD extends Logging { +private[spark] object PythonRDD { val UTF8 = Charset.forName("UTF-8") /** @@ -345,7 +346,8 @@ private[spark] object PythonRDD extends Logging { } case other => if (other == null) { - logDebug("Encountered NULL element from iterator. We skip writing NULL to stream.") + dataOut.writeInt(SpecialLengths.NULL) + writeIteratorToStream(iter, dataOut) } else { throw new SparkException("Unexpected element type " + first.getClass) } @@ -529,7 +531,7 @@ private[spark] object PythonRDD extends Logging { def writeUTF(str: String, dataOut: DataOutputStream) { if (str == null) { - logDebug("Encountered NULL string. We skip writing NULL to stream.") + dataOut.writeInt(SpecialLengths.NULL) } else { val bytes = str.getBytes(UTF8) dataOut.writeInt(bytes.length) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 9be78b39fbc21..d90523800b084 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -75,6 +75,7 @@ class SpecialLengths(object): END_OF_DATA_SECTION = -1 PYTHON_EXCEPTION_THROWN = -2 TIMING_DATA = -3 + NULL = -4 class Serializer(object): @@ -193,7 +194,7 @@ def load_stream(self, stream): return chain.from_iterable(self._load_stream_without_unbatching(stream)) def _load_stream_without_unbatching(self, stream): - return self.serializer.load_stream(stream) + return self.serializer.load_stream(stream) def __eq__(self, other): return (isinstance(other, BatchedSerializer) and @@ -309,6 +310,8 @@ class UTF8Deserializer(Serializer): def loads(self, stream): length = read_int(stream) + if length == SpecialLengths.NULL: + return None return stream.read(length).decode('utf8') def load_stream(self, stream): From 00fa7f0508d9fb06389b6cf1e05d7f9eaf1c9222 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 23 Jul 2014 15:04:44 -0700 Subject: [PATCH 5/6] fix style, add new test case for list stats with nulls --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 6 +++--- .../org/apache/spark/api/python/PythonRDDSuite.scala | 9 +++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f1e65e8dbd162..697b25fa7fbfe 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -533,9 +533,9 @@ private[spark] object PythonRDD { if (str == null) { dataOut.writeInt(SpecialLengths.NULL) } else { - val bytes = str.getBytes(UTF8) - dataOut.writeInt(bytes.length) - dataOut.write(bytes) + val bytes = str.getBytes(UTF8) + dataOut.writeInt(bytes.length) + dataOut.write(bytes) } } diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index c62f341441e53..d345115e327bd 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.FunSuite class PythonRDDSuite extends FunSuite { test("Writing large strings to the worker") { - val input: List[String] = List("a"*100000) + val input: List[String] = List("a" * 100000) val buffer = new DataOutputStream(new ByteArrayOutputStream) PythonRDD.writeIteratorToStream(input.iterator, buffer) } @@ -34,5 +34,10 @@ class PythonRDDSuite extends FunSuite { val buffer = new DataOutputStream(new ByteArrayOutputStream) PythonRDD.writeIteratorToStream(input.iterator, buffer) } -} + test("Handle list starts with nulls gracefully") { + val input: List[String] = List(null, null, "a", null) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } +} \ No newline at end of file From 12c00d7efbc233c1ac59f915c1191e1454c60828 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 29 Jul 2014 11:50:11 -0700 Subject: [PATCH 6/6] handle null in bytes --- .../apache/spark/api/python/PythonRDD.scala | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 697b25fa7fbfe..02665ed610a52 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -22,6 +22,7 @@ import java.net._ import java.nio.charset.Charset import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} +import scala.annotation.tailrec import scala.collection.JavaConversions._ import scala.reflect.ClassTag import scala.util.Try @@ -310,36 +311,40 @@ private[spark] object PythonRDD { JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } + @tailrec def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) { // The right way to implement this would be to use TypeTags to get the full // type of T. Since I don't want to introduce breaking changes throughout the // entire Spark API, I have to use this hacky approach: + def writeBytes(bytes: Array[Byte]) { + if (bytes == null) { + dataOut.writeInt(SpecialLengths.NULL) + } else { + dataOut.writeInt(bytes.length) + dataOut.write(bytes) + } + } if (iter.hasNext) { val first = iter.next() val newIter = Seq(first).iterator ++ iter first match { case arr: Array[Byte] => - newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes => - dataOut.writeInt(bytes.length) - dataOut.write(bytes) - } + newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { writeBytes(_) } case string: String => - newIter.asInstanceOf[Iterator[String]].foreach { str => - writeUTF(str, dataOut) - } + newIter.asInstanceOf[Iterator[String]].foreach { writeUTF(_, dataOut) } case pair: Tuple2[_, _] => pair._1 match { case bytePair: Array[Byte] => - newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair => - dataOut.writeInt(pair._1.length) - dataOut.write(pair._1) - dataOut.writeInt(pair._2.length) - dataOut.write(pair._2) + newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { + case (k, v) => + writeBytes(k) + writeBytes(v) } case stringPair: String => - newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair => - writeUTF(pair._1, dataOut) - writeUTF(pair._2, dataOut) + newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { + case (k, v) => + writeUTF(k, dataOut) + writeUTF(v, dataOut) } case other => throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)