Skip to content

Commit

Permalink
[HUDI-4214] improve repeat init write schema in ExpressionPayload (ap…
Browse files Browse the repository at this point in the history
…ache#5820)

* [HUDI-4214] improve repeat init write schema in ExpressionPayload
  • Loading branch information
KnightChess authored and Forus0322 committed Jun 22, 2022
1 parent 70d9724 commit 5e4cdcb
Showing 1 changed file with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema}
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, setWriteSchema, getMergedSchema}
import org.apache.spark.sql.types.{StructField, StructType}

import java.util.concurrent.Callable
Expand Down Expand Up @@ -215,9 +215,7 @@ class ExpressionPayload(record: GenericRecord,
*/
private def initWriteSchemaIfNeed(properties: Properties): Unit = {
if (writeSchema == null) {
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
writeSchema = new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
writeSchema = setWriteSchema(properties)
}
}

Expand Down Expand Up @@ -276,6 +274,18 @@ object ExpressionPayload {
.maximumSize(1024)
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()

private val writeSchemaCache = CacheBuilder.newBuilder().build[String, Schema]()

def setWriteSchema(properties: Properties): Schema = {
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
writeSchemaCache.get(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key),
new Callable[Schema] {
override def call(): Schema =
new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
})
}

/**
* Do the CodeGen for each condition and assignment expressions.We will cache it to reduce
* the compile time for each method call.
Expand Down

0 comments on commit 5e4cdcb

Please sign in to comment.