Skip to content

Commit

Permalink
Binary serde issue. Type inconsistency with Spark Catalyst
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed May 20, 2020
1 parent b73ddb0 commit b66a30c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ trait Logging {
}
}

def logWarn(msg: => String): Unit = {
if (logger.isWarnEnabled) {
_logger.warn(msg)
}
}

def logDebug(msg: => String): Unit = {
if (logger.isDebugEnabled) {
_logger.debug(msg)
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.redislabs.provider.redis.util

import org.apache.spark.sql.types.StructType

object SparkUtils {
/**
* Setting the schema column positions the same order as in requiredFields
* @param schema Current schema
* @param requiredColumns Column positions expecting by Catalyst
*/
def alignSchemaWithCatalyst(schema: StructType, requiredColumns: Seq[String]): StructType = {
val fieldsMap = schema.fields.map(f => (f.name, f)).toMap
StructType(requiredColumns.map { c =>
fieldsMap(c)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.apache.spark.sql.redis

import java.nio.charset.StandardCharsets.UTF_8

import com.redislabs.provider.redis.util.SparkUtils
import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
Expand Down Expand Up @@ -34,6 +35,10 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] {
override def decodeRow(keyMap: (String, String), value: Array[Byte], schema: StructType,
requiredColumns: Seq[String]): Row = {
val valuesArray: Array[Any] = SerializationUtils.deserialize(value)
new GenericRowWithSchema(valuesArray, schema)
// Aligning column positions with what Catalyst expecting
val alignedSchema = SparkUtils.alignSchemaWithCatalyst(schema, requiredColumns)
val names = schema.fieldNames
val alignedValuesArray = requiredColumns.toArray.map(f => valuesArray(names.indexOf(f)))
new GenericRowWithSchema(alignedValuesArray, alignedSchema)
}
}
22 changes: 10 additions & 12 deletions src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.{List => JList}

import com.redislabs.provider.redis.rdd.Keys
import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
import com.redislabs.provider.redis.util.Logging
import com.redislabs.provider.redis.util.{Logging, SparkUtils}
import com.redislabs.provider.redis.util.PipelineUtils._
import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisDataTypeHash, RedisDataTypeString, RedisEndpoint, RedisNode, toRedisContext}
import org.apache.commons.lang3.SerializationUtils
Expand Down Expand Up @@ -161,19 +161,17 @@ class RedisSourceRelation(override val sqlContext: SQLContext,
new GenericRow(Array[Any]())
}
} else {
// filter schema columns, it should be in the same order as given 'requiredColumns'
val requiredSchema = {
val fieldsMap = schema.fields.map(f => (f.name, f)).toMap
val requiredFields = requiredColumns.map { c =>
fieldsMap(c)
}
StructType(requiredFields)
}
val keyType =
/*
For binary its crucial to have a schema, as we cen't infer it and catalyst requiredColumns doesn't guarantee
the same order. Thus the schema is only place where we can read correct attribute positions for binary
*/
val (keyType, requiredSchema) =
if (persistenceModel == SqlOptionModelBinary) {
RedisDataTypeString
if (this.schema == null)
logWarn("Unable to identify the schema when reading a dataframe in Binary mode. It can cause type inconsistency!")
(RedisDataTypeString, this.schema)
} else {
RedisDataTypeHash
(RedisDataTypeHash, SparkUtils.alignSchemaWithCatalyst(this.schema, requiredColumns))
}
keysRdd.mapPartitions { partition =>
// grouped iterator to only allocate memory for a portion of rows
Expand Down

0 comments on commit b66a30c

Please sign in to comment.