-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5681] Fixing Kryo being instantiated w/ invalid SparkConf
#7821
Conversation
@@ -188,7 +188,6 @@ trait ProvidesHoodieConfig extends Logging { | |||
PRECOMBINE_FIELD.key -> preCombineField, | |||
PARTITIONPATH_FIELD.key -> partitionFieldsStr, | |||
PAYLOAD_CLASS_NAME.key -> payloadClassName, | |||
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we change this file for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stacked on top of another (for testing), will be cleaned up
private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] { | ||
private lazy val conf = { | ||
val conf = Option(SparkEnv.get) | ||
// TODO elaborate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix comment
SparkConf
SparkConf
…ts proliferation elsewhere; Tidying up
eef9f03
to
e99119a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok to me. @alexeykudinkin we need to test the PR thoroughly before merging it. @xushiyan @YannByron could you also take another look?
@@ -328,7 +328,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie | |||
}).toMap | |||
// Serialize the Map[UpdateCondition, UpdateAssignments] to base64 string | |||
val serializedUpdateConditionAndExpressions = Base64.getEncoder | |||
.encodeToString(SerDeUtils.toBytes(updateConditionToAssignments)) | |||
.encodeToString(Serializer.toBytes(updateConditionToAssignments)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work for all Spark versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly are you referring to?
private[hudi] object Serializer { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tested this on all Spark versions (Spark 2.4, 3.1, 3.2, 3.3) in cluster environment (multiple nodes)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked Spark 3.1, 3.2 and 3.3, working fine
…ache#7821) This is addressing misconfiguration of the Kryo object used specifically to serialize Spark's internal structures (like `Expression`s): previously we're using default `SparkConf` instance to configure it, while instead we should have used the one provided by `SparkEnv`
…ache#7821) This is addressing misconfiguration of the Kryo object used specifically to serialize Spark's internal structures (like `Expression`s): previously we're using default `SparkConf` instance to configure it, while instead we should have used the one provided by `SparkEnv`
Change Logs
This is addressing misconfiguration of the Kryo object used specifically to serialize Spark's internal structures (like
Expression
s): previously we're using defaultSparkConf
instance to configure it, while instead we should have used the one provided bySparkEnv
Impact
Addresses NPE/ClassCastException occurring when trying to run Merge Into statements in Spark SQL
Risk level (write none, low medium or high below)
Low
Documentation Update
N/A
Contributor's checklist