Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5681] Fixing Kryo being instantiated w/ invalid SparkConf #7821

Merged
merged 2 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.hudi.{ProvidesHoodieConfig, SerDeUtils}
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.types.{BooleanType, StructType}

import java.util.Base64
Expand Down Expand Up @@ -328,7 +328,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
}).toMap
// Serialize the Map[UpdateCondition, UpdateAssignments] to base64 string
val serializedUpdateConditionAndExpressions = Base64.getEncoder
.encodeToString(SerDeUtils.toBytes(updateConditionToAssignments))
.encodeToString(Serializer.toBytes(updateConditionToAssignments))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work for all Spark versions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly are you referring to?

writeParams += (PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS ->
serializedUpdateConditionAndExpressions)

Expand All @@ -338,7 +338,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
.getOrElse(Literal.create(true, BooleanType))
// Serialize the Map[DeleteCondition, empty] to base64 string
val serializedDeleteCondition = Base64.getEncoder
.encodeToString(SerDeUtils.toBytes(Map(deleteCondition -> Seq.empty[Assignment])))
.encodeToString(Serializer.toBytes(Map(deleteCondition -> Seq.empty[Assignment])))
writeParams += (PAYLOAD_DELETE_CONDITION -> serializedDeleteCondition)
}

Expand Down Expand Up @@ -414,7 +414,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
rewriteCondition -> formatAssignments
}).toMap
Base64.getEncoder.encodeToString(
SerDeUtils.toBytes(insertConditionAndAssignments))
Serializer.toBytes(insertConditionAndAssignments))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,19 @@ import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.io.HoodieWriteHandle
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Projection, SafeProjection}
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.{SparkConf, SparkEnv}

import java.nio.ByteBuffer
import java.util.function.{Function, Supplier}
import java.util.{Base64, Properties}
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -420,7 +421,7 @@ object ExpressionPayload {
override def apply(key: (String, Schema)): Seq[(Projection, Projection)] = {
val (encodedConditionalAssignments, _) = key
val serializedBytes = Base64.getDecoder.decode(encodedConditionalAssignments)
val conditionAssignments = SerDeUtils.toObject(serializedBytes)
val conditionAssignments = Serializer.toObject(serializedBytes)
.asInstanceOf[Map[Expression, Seq[Expression]]]
conditionAssignments.toSeq.map {
case (condition, assignments) =>
Expand Down Expand Up @@ -455,5 +456,50 @@ object ExpressionPayload {
field.schema, field.doc, field.defaultVal, field.order))
Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, mergedFields.asJava)
}


/**
* This object differs from Hudi's generic [[SerializationUtils]] in its ability to serialize
* Spark's internal structures (various [[Expression]]s)
*
* For that purpose we re-use Spark's [[KryoSerializer]] instance sharing configuration
* with enclosing [[SparkEnv]]. This is necessary to make sure that this particular instance of Kryo
* user for serialization of Spark's internal structures (like [[Expression]]s) is configured
* appropriately (class-loading, custom serializers, etc)
*
* TODO rebase on Spark's SerializerSupport
*/
private[hudi] object Serializer {

Comment on lines +472 to +473
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this on all Spark versions (Spark 2.4, 3.1, 3.2, 3.3) in cluster environment (multiple nodes)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked Spark 3.1, 3.2 and 3.3, working fine

// NOTE: This is only Spark >= 3.0
private val KRYO_USE_POOL_CONFIG_KEY = "spark.kryo.pool"

private lazy val conf = {
val conf = Option(SparkEnv.get)
// To make sure we're not modifying existing environment's [[SparkConf]]
// we're cloning it here
.map(_.conf.clone)
.getOrElse(new SparkConf)
// This serializer is configured as thread-local, hence there's no need for
// pooling
conf.set(KRYO_USE_POOL_CONFIG_KEY, "false")
conf
}

private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] {
override protected def initialValue: SerializerInstance = {
new KryoSerializer(conf).newInstance()
}
}

def toBytes(o: Any): Array[Byte] = {
val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o)
BinaryUtil.toBytes(buf)
}

def toObject(bytes: Array[Byte]): Any = {
SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes))
}
}
}