Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed binary serde issue #252

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

comphead
Copy link

Fixed serde issue when reading/writing a dataframe in binary mode. Please consider next example

case class Outer(
                  arr0: Array[Inner],
                  str0: String,
                  str1: String,
                  arr1: Array[String],
                  str2: String)
case class Inner(str0: String, id0: Int)
    
  def testDF[T](df: Dataset[T]): Unit = {
    df.printSchema()
    val schema = df.schema
    df.write
      .mode(SaveMode.Overwrite)
      .format("org.apache.spark.sql.redis")
      .option("table", "t")
      .option("model", "binary")
      .save()


    val df0 = session.read.format("org.apache.spark.sql.redis")
      .schema(schema)
      .option("table", "t")
      .option("model", "binary")
      .load()

    df0.printSchema()

    df0.show(false)
  }

testDF(Seq(
      Outer(
        arr0 = Array(Inner("str0", 0)),
        str0 = "str0",
        str1 = "str1",
        arr1 = Array("arr1"),
        str2 = "str2"
      )
    ).toDS())

That fails with Caused by: java.lang.IllegalArgumentException: The value (1) of the type (java.lang.String) cannot be converted to an array of structstr0:string,id0:int

The reason of that is:

  • In Redis we already have an object stored with attrs order arr0, str0, str1, arr1, str2
  • buildScan however gets the requiredColumns in another order str0, arr1, str1, arr0, str2
  • binary decoder didn't apply attrs position, just set the updated schema which is not enough
    The proposed fix makes correct attr order for binary deserialized value

Also please note, without provided schema its difficult to deserealize the binary value as we dont have an initial order. Added warning for that

Copy link
Collaborator

@fe2s fe2s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @comphead ,
Thanks a lot for this PR, sorry for the delay reviewing it.
The fix makes sense. Could you please take a look at my comment? Also it will be good to add the test that fails now.

@@ -34,6 +35,10 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] {
override def decodeRow(keyMap: (String, String), value: Array[Byte], schema: StructType,
requiredColumns: Seq[String]): Row = {
val valuesArray: Array[Any] = SerializationUtils.deserialize(value)
new GenericRowWithSchema(valuesArray, schema)
// Aligning column positions with what Catalyst expecting
val alignedSchema = SparkUtils.alignSchemaWithCatalyst(schema, requiredColumns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can optimize it by creating alignedSchema once for all rows. To achieve it I guess we will need to change the decodeRow() signature by adding a new parameter, it will have two schemas: requiredSchema and persistedSchema.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants