Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed binary serde issue #252

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ trait Logging {
}
}

def logWarn(msg: => String): Unit = {
if (logger.isWarnEnabled) {
_logger.warn(msg)
}
}

def logDebug(msg: => String): Unit = {
if (logger.isDebugEnabled) {
_logger.debug(msg)
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/com/redislabs/provider/redis/util/SparkUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.redislabs.provider.redis.util

import org.apache.spark.sql.types.StructType

object SparkUtils {
/**
* Setting the schema column positions the same order as in requiredFields
* @param schema Current schema
* @param requiredColumns Column positions expecting by Catalyst
*/
def alignSchemaWithCatalyst(schema: StructType, requiredColumns: Seq[String]): StructType = {
val fieldsMap = schema.fields.map(f => (f.name, f)).toMap
StructType(requiredColumns.map { c =>
fieldsMap(c)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.apache.spark.sql.redis

import java.nio.charset.StandardCharsets.UTF_8

import com.redislabs.provider.redis.util.SparkUtils
import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
Expand Down Expand Up @@ -34,6 +35,10 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] {
override def decodeRow(keyMap: (String, String), value: Array[Byte], schema: StructType,
requiredColumns: Seq[String]): Row = {
val valuesArray: Array[Any] = SerializationUtils.deserialize(value)
new GenericRowWithSchema(valuesArray, schema)
// Aligning column positions with what Catalyst expecting
val alignedSchema = SparkUtils.alignSchemaWithCatalyst(schema, requiredColumns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can optimize it by creating alignedSchema once for all rows. To achieve it I guess we will need to change the decodeRow() signature by adding a new parameter, it will have two schemas: requiredSchema and persistedSchema.

val names = schema.fieldNames
val alignedValuesArray = requiredColumns.toArray.map(f => valuesArray(names.indexOf(f)))
new GenericRowWithSchema(alignedValuesArray, alignedSchema)
}
}
22 changes: 10 additions & 12 deletions src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.{List => JList}

import com.redislabs.provider.redis.rdd.Keys
import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
import com.redislabs.provider.redis.util.Logging
import com.redislabs.provider.redis.util.{Logging, SparkUtils}
import com.redislabs.provider.redis.util.PipelineUtils._
import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisDataTypeHash, RedisDataTypeString, RedisEndpoint, RedisNode, toRedisContext}
import org.apache.commons.lang3.SerializationUtils
Expand Down Expand Up @@ -159,19 +159,17 @@ class RedisSourceRelation(override val sqlContext: SQLContext,
new GenericRow(Array[Any]())
}
} else {
// filter schema columns, it should be in the same order as given 'requiredColumns'
val requiredSchema = {
val fieldsMap = schema.fields.map(f => (f.name, f)).toMap
val requiredFields = requiredColumns.map { c =>
fieldsMap(c)
}
StructType(requiredFields)
}
val keyType =
/*
For binary its crucial to have a schema, as we cen't infer it and catalyst requiredColumns doesn't guarantee
the same order. Thus the schema is only place where we can read correct attribute positions for binary
*/
val (keyType, requiredSchema) =
if (persistenceModel == SqlOptionModelBinary) {
RedisDataTypeString
if (this.schema == null)
logWarn("Unable to identify the schema when reading a dataframe in Binary mode. It can cause type inconsistency!")
(RedisDataTypeString, this.schema)
} else {
RedisDataTypeHash
(RedisDataTypeHash, SparkUtils.alignSchemaWithCatalyst(this.schema, requiredColumns))
}
keysRdd.mapPartitions { partition =>
// grouped iterator to only allocate memory for a portion of rows
Expand Down