From 54045f756fb4180f3f971e118935829c846f8984 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 26 Mar 2014 16:34:45 -0700 Subject: [PATCH] Add a custom serializer for maps since they do not have a no-arg constructor. --- .../sql/execution/SparkSqlSerializer.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 1c3196ae2e7b6..915f551fb2f01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -32,6 +32,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { kryo.setRegistrationRequired(false) kryo.register(classOf[MutablePair[_, _]]) kryo.register(classOf[Array[Any]]) + kryo.register(classOf[scala.collection.immutable.Map$Map1], new MapSerializer) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]]) @@ -70,3 +71,20 @@ class BigDecimalSerializer extends Serializer[BigDecimal] { BigDecimal(input.readString()) } } + +/** + * Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize + * them as `Array[(k,v)]`. + */ +class MapSerializer extends Serializer[Map[_,_]] { + def write(kryo: Kryo, output: Output, map: Map[_,_]) { + kryo.writeObject(output, map.flatMap(e => Seq(e._1, e._2)).toArray) + } + + def read(kryo: Kryo, input: Input, tpe: Class[Map[_,_]]): Map[_,_] = { + kryo.readObject(input, classOf[Array[Any]]) + .sliding(2,2) + .map { case Array(k,v) => (k,v) } + .toMap + } +}