Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2706] refactor spark-sql to make consistent with DataFrame api #3936

Merged
merged 6 commits into from
Nov 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true);
Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), false);
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true);
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceOptionsHelper.{allAlternatives, translateConfigurations}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
Expand All @@ -42,19 +44,21 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner

import org.apache.log4j.LogManager

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkContext}

import java.util
import java.util.Properties

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ListBuffer

object HoodieSparkSqlWriter {
Expand Down Expand Up @@ -141,7 +145,7 @@ object HoodieSparkSqlWriter {
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_NAME))
.setKeyGeneratorClassProp(HoodieWriterUtils.getOriginKeyGenerator(parameters))
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.initTable(sparkContext.hadoopConfiguration, path)
Expand Down Expand Up @@ -713,22 +717,6 @@ object HoodieSparkSqlWriter {
}
}

private def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieTableConfig): Unit = {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
}
}
if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
}

private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
val mergedParams = mutable.Map.empty ++
Expand All @@ -745,16 +733,4 @@ object HoodieSparkSqlWriter {
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}

private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieTableConfig, key: String): String = {
if (null == tableConfig) {
null
} else {
if (allAlternatives.contains(key)) {
tableConfig.getString(allAlternatives(key))
} else {
tableConfig.getString(key)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.hudi

import java.util.Properties

import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{HoodieConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.command.SqlKeyGenerator

import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.{mapAsScalaMapConverter, _}
import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import scala.collection.JavaConverters._

/**
* WriterUtils to assist in write path in Datasource and tests.
Expand Down Expand Up @@ -102,4 +106,68 @@ object HoodieWriterUtils {
properties.putAll(mapAsJavaMap(parameters))
new HoodieConfig(properties)
}

def getOriginKeyGenerator(parameters: Map[String, String]): String = {
val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null)
if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there uts cover the logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

parameters.getOrElse(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
} else {
kg
}
}

/**
* Detects conflicts between new parameters and existing table configurations
*/
def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieConfig): Unit = {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
}
}

if (null != tableConfig) {
val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
if (null != datasourceRecordKey && null != tableConfigRecordKey
&& datasourceRecordKey != tableConfigRecordKey) {
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
}

val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
&& datasourcePreCombineKey != tableConfigPreCombineKey) {
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
}

val datasourceKeyGen = getOriginKeyGenerator(params)
val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
if (null != datasourceKeyGen && null != tableConfigKeyGen
&& datasourceKeyGen != tableConfigKeyGen) {
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
}
}

if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
}

private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = {
if (null == tableConfig) {
null
} else {
if (allAlternatives.contains(key)) {
tableConfig.getString(allAlternatives(key))
} else {
tableConfig.getString(key)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ValidationUtils

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType


/**
Expand All @@ -43,6 +47,7 @@ object HoodieOptionConfig {
.withSqlKey("primaryKey")
.withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.withTableConfigKey(HoodieTableConfig.RECORDKEY_FIELDS.key)
.defaultValue(DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
.build()

val SQL_KEY_TABLE_TYPE: HoodieOption[String] = buildConf()
Expand Down Expand Up @@ -102,6 +107,8 @@ object HoodieOptionConfig {

private lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1)

def withDefaultSqlOptions(options: Map[String, String]): Map[String, String] = defaultSqlOptions ++ options

/**
* Mapping the sql's short name key/value in the options to the hoodie's config key/value.
* @param options
Expand All @@ -119,14 +126,13 @@ object HoodieOptionConfig {
* @return
*/
def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = {
defaultTableConfig ++
options.map { case (k, v) =>
if (keyTableConfigMapping.contains(k)) {
keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
} else {
k -> v
}
options.map { case (k, v) =>
if (keyTableConfigMapping.contains(k)) {
keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
} else {
k -> v
}
}
}

/**
Expand All @@ -136,16 +142,19 @@ object HoodieOptionConfig {
options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2))
}

private lazy val defaultTableConfig: Map[String, String] = {
private lazy val defaultSqlOptions: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]})
.filter(option => option.tableConfigKey.isDefined && option.defaultValue.isDefined)
.map(option => option.tableConfigKey.get ->
valueMapping.getOrElse(option.defaultValue.get.toString, option.defaultValue.get.toString))
.map(option => option.sqlKeyName -> option.defaultValue.get.toString)
.toMap
}

private lazy val defaultTableConfig: Map[String, String] = {
mappingSqlOptionToHoodieParam(defaultSqlOptions)
}

/**
* Get the primary key from the table options.
* @param options
Expand All @@ -154,7 +163,7 @@ object HoodieOptionConfig {
def getPrimaryColumns(options: Map[String, String]): Array[String] = {
val params = mappingSqlOptionToHoodieParam(options)
params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.map(_.split(",").filter(_.length > 0))
.map(_.split(",").filter(_.nonEmpty))
.getOrElse(Array.empty)
}

Expand All @@ -171,7 +180,47 @@ object HoodieOptionConfig {

def getPreCombineField(options: Map[String, String]): Option[String] = {
val params = mappingSqlOptionToHoodieParam(options)
params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)
params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key).filter(_.nonEmpty)
}

def deleteHooideOptions(options: Map[String, String]): Map[String, String] = {
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => keyMapping.contains(kv._1))
}

// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName)
options.filterKeys(targetOptions.contains)
}

// validate primaryKey, preCombineField and type options
def validateTable(spark: SparkSession, schema: StructType, options: Map[String, String]): Unit = {
val resolver = spark.sessionState.conf.resolver

// validate primary key
val primaryKeys = options.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
.map(_.split(",").filter(_.length > 0))
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
primaryKeys.get.foreach { primaryKey =>
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, primaryKey)),
s"Can't find primary key `$primaryKey` in ${schema.treeString}.")
}

// validate precombine key
val precombineKey = options.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
if (precombineKey.isDefined && precombineKey.get.nonEmpty) {
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, precombineKey.get)),
s"Can't find precombine key `${precombineKey.get}` in ${schema.treeString}.")
}

// validate table type
val tableType = options.get(SQL_KEY_TABLE_TYPE.sqlKeyName)
ValidationUtils.checkArgument(tableType.nonEmpty, "No `type` is specified.")
ValidationUtils.checkArgument(
tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW) ||
tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR),
s"'type' must be '${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW}' or " +
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
}

def buildConf[T](): HoodieOptions[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val metadataConfig = {
val properties = new Properties()
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava)
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ table.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
val targetTableId = getMergeIntoTargetTableId(mergeInto)
val targetTable =
sparkSession.sessionState.catalog.getTableMetadata(targetTableId)
val targetTableType = HoodieOptionConfig.getTableType(targetTable.storage.properties)
val preCombineField = HoodieOptionConfig.getPreCombineField(targetTable.storage.properties)
val tblProperties = targetTable.storage.properties ++ targetTable.properties
val targetTableType = HoodieOptionConfig.getTableType(tblProperties)
val preCombineField = HoodieOptionConfig.getPreCombineField(tblProperties)

// Get the map of target attribute to value of the update assignments.
val target2Values = resolvedAssignments.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,13 @@ object AlterHoodieTableAddColumnsCommand {
val path = getTableLocation(table, sparkSession)

val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
path, table.identifier.table, HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties).asJava)
val client = DataSourceUtils.createHoodieClient(
jsc,
schema.toString,
path,
table.identifier.table,
HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties ++ table.properties).asJava
)

val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ extends RunnableCommand {
.build()
val tableConfig = metaClient.getTableConfig

val optParams = withSparkConf(sparkSession, table.storage.properties) {
withSparkConf(sparkSession, table.storage.properties) {
Map(
"path" -> path,
TBL_NAME.key -> tableIdentifier.table,
Expand All @@ -104,10 +104,6 @@ extends RunnableCommand {
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp
)
}

val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
translatedOptions
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
}

def normalizePartitionSpec[T](
Expand Down
Loading