Skip to content

Commit

Permalink
Revert "make the model table is also respect the keepPartitionNum par…
Browse files Browse the repository at this point in the history
…ameter"

This reverts commit 6a6d649.
  • Loading branch information
allwefantasy committed Jul 2, 2023
1 parent 6a6d649 commit 13a0a9c
Showing 1 changed file with 2 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,7 @@ class MasterSlaveInSpark(name: String, session: SparkSession, _owner: String) ex
if (!keepPartitionNum) {
df.repartition(targetLen)
} else df
} else {
if (!keepPartitionNum) {
df.repartition(1).sortWithinPartitions(f.col("start").asc)
} else {
require(df.rdd.partitions.length == 1,
"""
|The model in Delta Lake is not supported to be consumed by multiple tasks.
|If you set keepPartitionNum true, please make sure the original number of partitions of the model table is 1.
|""".stripMargin)
df
}
}
} else df.repartition(1).sortWithinPartitions(f.col("start").asc)

buildDataSocketServers(tempdf, job)
this
Expand Down Expand Up @@ -118,7 +107,7 @@ class MasterSlaveInSpark(name: String, session: SparkSession, _owner: String) ex
val totalCores = resource.totalCores
val activeCores = jobInfo.resourceSummary(null).activeTasks
val freeCores = totalCores - activeCores
logInfo(s"Detect resources: totalCores:${totalCores} activeCores:${activeCores} freeCores:${freeCores} targetLen:${targetLen}")
logInfo(s"Detect resources: totalCores:${totalCores} activeCores:${activeCores} freeCores:${freeCores}")
logInfo("Compute formula: freeCores / 2 <= targetLen ? targetLen = Math.max(Math.floor(freeCores / 2) - 1, 1).toInt : targetLen = targetLen")
if (freeCores / 2 <= targetLen) {
targetLen = Math.max(Math.floor(freeCores / 2) - 1, 1).toInt
Expand Down

0 comments on commit 13a0a9c

Please sign in to comment.