Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5278] Support more conf to cluster procedure #7304

Merged
merged 1 commit into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -713,5 +713,57 @@ public static LayoutOptimizationStrategy fromValue(String value) {

return enumValue;
}

public String getValue() {
return value;
}
}

public enum ClusteringOperator {

/**
* only schedule the clustering plan
*/
SCHEDULE("schedule"),

/**
* only execute then pending clustering plans
*/
EXECUTE("execute"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we execute specific pending clustering plan instead of all pending clustering plans?


/**
* schedule cluster first, and execute all pending clustering plans
*/
SCHEDULE_AND_EXECUTE("scheduleandexecute");

private static final Map<String, ClusteringOperator> VALUE_TO_ENUM_MAP =
TypeUtils.getValueToEnumMap(ClusteringOperator.class, e -> e.value);

private final String value;

ClusteringOperator(String value) {
this.value = value;
}

@Nonnull
public static ClusteringOperator fromValue(String value) {
ClusteringOperator enumValue = VALUE_TO_ENUM_MAP.get(value);
if (enumValue == null) {
throw new HoodieException(String.format("Invalid value (%s)", value));
}
return enumValue;
}

public boolean isSchedule() {
KnightChess marked this conversation as resolved.
Show resolved Hide resolved
return this != ClusteringOperator.EXECUTE;
}

public boolean isExecute() {
return this != ClusteringOperator.SCHEDULE;
}

public String getValue() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.config.HoodieClusteringConfig.{ClusteringOperator, LayoutOptimizationStrategy}
import org.apache.hudi.exception.HoodieClusteringException
import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
import org.apache.spark.internal.Logging
Expand All @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.types._

import java.util.Locale
import java.util.function.Supplier
import scala.collection.JavaConverters._

Expand All @@ -50,7 +52,12 @@ class RunClusteringProcedure extends BaseProcedure
ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false)
ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false),
ProcedureParameter.optional(5, "op", DataTypes.StringType, None),
ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None),
// params => key=value, key2=value2
ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
ProcedureParameter.optional(8, "instants", DataTypes.StringType, None)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand All @@ -72,6 +79,10 @@ class RunClusteringProcedure extends BaseProcedure
val predicate = getArgValueOrDefault(args, PARAMETERS(2))
val orderColumns = getArgValueOrDefault(args, PARAMETERS(3))
val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
val op = getArgValueOrDefault(args, PARAMETERS(5))
val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
val options = getArgValueOrDefault(args, PARAMETERS(7))
val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))

val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
Expand Down Expand Up @@ -100,24 +111,74 @@ class RunClusteringProcedure extends BaseProcedure
logInfo("No order columns")
}

orderStrategy match {
case Some(o) =>
val strategy = LayoutOptimizationStrategy.fromValue(o.asInstanceOf[String])
conf = conf ++ Map(
HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key() -> strategy.getValue
)
case _ =>
logInfo("No order strategy")
}

options match {
case Some(p) =>
val paramPairs = StringUtils.split(p.asInstanceOf[String], ",").asScala
paramPairs.foreach{ pair =>
val values = StringUtils.split(pair, "=")
conf = conf ++ Map(values.get(0) -> values.get(1))
}
case _ =>
logInfo("No options")
}

// Get all pending clustering instants
var pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient)
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)

var operator: ClusteringOperator = ClusteringOperator.SCHEDULE_AND_EXECUTE
pendingClustering = instantsStr match {
case Some(inst) =>
op match {
case Some(o) =>
if (!ClusteringOperator.EXECUTE.name().equalsIgnoreCase(o.asInstanceOf[String])) {
throw new HoodieClusteringException("specific instants only can be used in 'execute' op or not specific op")
}
case _ =>
logInfo("No op and set it to EXECUTE with instants specified.")
}
operator = ClusteringOperator.EXECUTE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here why we need set operator to EXECUTE but in line#144 we do not need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the user does not specify the instants

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need check if users specify the instants with SCHEDULE and SCHEDULE_AND_EXECUTE, we should throw exception instead of set it to EXECUTE when specify instants.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please put the line operator = ClusteringOperator.EXECUTE below the line logInfo("No op") and please change logInfo("No op") to logInfo("No op and set it to EXECUTE with instants specified.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the line can not put below logInfo("No op"), operator default is scheduleAndExecute, if user specific instants, it need be set to execute after check

checkAndFilterPendingInstants(pendingClustering, inst.asInstanceOf[String])
case _ =>
logInfo("No specific instants")
op match {
case Some(o) =>
operator = ClusteringOperator.fromValue(o.asInstanceOf[String].toLowerCase(Locale.ROOT))
case _ =>
logInfo("No op, use default scheduleAndExecute")
}
pendingClustering
}

logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")

var client: SparkRDDWriteClient[_] = null
try {
client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
val instantTime = HoodieActiveTimeline.createNewInstantTime
if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
pendingClustering ++= Seq(instantTime)
if (operator.isSchedule) {
val instantTime = HoodieActiveTimeline.createNewInstantTime
if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
pendingClustering ++= Seq(instantTime)
}
}
logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")

val startTs = System.currentTimeMillis()
pendingClustering.foreach(client.cluster(_, true))
logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
s" time cost: ${System.currentTimeMillis() - startTs}ms.")
if (operator.isExecute) {
val startTs = System.currentTimeMillis()
pendingClustering.foreach(client.cluster(_, true))
logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
s" time cost: ${System.currentTimeMillis() - startTs}ms.")
}

val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
Expand Down Expand Up @@ -182,6 +243,16 @@ class RunClusteringProcedure extends BaseProcedure
})
}

private def checkAndFilterPendingInstants(pendingInstants: Seq[String], instantStr: String): Seq[String] = {
val instants = StringUtils.split(instantStr, ",").asScala
val pendingSet = pendingInstants.toSet
val noneInstants = instants.filter(ins => !pendingSet.contains(ins))
if (noneInstants.nonEmpty) {
throw new HoodieClusteringException(s"specific ${noneInstants.mkString(",")} instants is not exist")
}
instants.sortBy(f => f)
}

}

object RunClusteringProcedure {
Expand Down
Loading