From 3ee8277adec0ba89d8e0e67468be20874ba492f1 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 20 Sep 2024 15:53:34 -0700 Subject: [PATCH 1/7] Initial attempt --- project/TestParallelization.scala | 342 ++++++++++++++++++++++-------- 1 file changed, 258 insertions(+), 84 deletions(-) diff --git a/project/TestParallelization.scala b/project/TestParallelization.scala index aa973ae6605..c4837f90b21 100644 --- a/project/TestParallelization.scala +++ b/project/TestParallelization.scala @@ -1,26 +1,29 @@ import sbt.Keys._ import sbt._ +// scalastyle:off println + object TestParallelization { - lazy val numShards = sys.env.get("NUM_SHARDS").map(_.toInt) + lazy val numShardsOpt = sys.env.get("NUM_SHARDS").map(_.toInt) + lazy val shardIdOpt = sys.env.get("SHARD_ID").map(_.toInt) + lazy val testParallelismOpt = sys.env.get("TEST_PARALLELISM_COUNT").map(_.toInt) lazy val settings = { - val parallelismCount = sys.env.get("TEST_PARALLELISM_COUNT") - if (parallelismCount.exists( _.toInt > 1)) { + if (numShardsOpt.exists(_ > 1) && testParallelismOpt.exists(_ > 1) && + shardIdOpt.exists(_ >= 0)) { customTestGroupingSettings ++ simpleGroupingStrategySettings - } - else { + } else { Seq.empty[Setting[_]] } } /** - Replace the default value for Test / testGrouping settingKey - and set it to a new value calculated by using the custom Task - [[testGroupingStrategy]]. Adding these settings to the build - will require to separately provide a value for the TaskKey - [[testGroupingStrategy]] + * Replace the default value for Test / testGrouping settingKey and set it to a new value + * calculated by using the custom Task [[testGroupingStrategy]]. + * + * Adding these settings to the build will require us to separately provide a value for the + * TaskKey [[testGroupingStrategy]] */ lazy val customTestGroupingSettings = { Seq( @@ -33,34 +36,24 @@ object TestParallelization { val logger = streams.value.log logger.info(s"Tests will be grouped in ${grouping.testGroups.size} groups") val groups = grouping.testGroups - groups.foreach{ - group => - logger.info(s"${group.name} contains ${group.tests.size} tests") - } + groups.foreach { group => logger.info(s"${group.name} contains ${group.tests.size} tests") } + logger.info(groupingStrategy.toString) groups } ) } - - - /** - Sets the Test / testGroupingStrategy Task to an instance of the - SimpleHashStrategy - */ + /** Sets the Test / testGroupingStrategy Task to an instance of the SimpleHashStrategy */ lazy val simpleGroupingStrategySettings = Seq( Test / forkTestJVMCount := { - sys.env.get("TEST_PARALLELISM_COUNT").map(_.toInt) - .getOrElse(java.lang.Runtime.getRuntime.availableProcessors) - }, - Test / shardId := { - sys.env.get("SHARD_ID").map(_.toInt) + testParallelismOpt.getOrElse(java.lang.Runtime.getRuntime.availableProcessors) }, + Test / shardId := { shardIdOpt.get }, Test / testGroupingStrategy := { val groupsCount = (Test / forkTestJVMCount).value val shard = (Test / shardId).value val baseJvmDir = baseDirectory.value - SimpleHashStrategy(groupsCount, baseJvmDir, shard, defaultForkOptions.value) + GreedyHashStrategy(groupsCount, baseJvmDir, shard, defaultForkOptions.value) }, Test / parallelExecution := true, Global / concurrentRestrictions := { @@ -68,18 +61,19 @@ object TestParallelization { } ) - val shardId = SettingKey[Option[Int]]("shard id", - "The shard id assigned" - ) + val shardId = SettingKey[Int]("shard id", "The shard id assigned") - val forkTestJVMCount = SettingKey[Int]("fork test jvm count", + val forkTestJVMCount = SettingKey[Int]( + "fork test jvm count", "The number of separate JVM to use for tests" ) - val testGroupingStrategy = TaskKey[GroupingStrategy]("test grouping strategy", + val testGroupingStrategy = TaskKey[GroupingStrategy]( + "test grouping strategy", "The strategy to allocate different tests into groups," + - "potentially using multiple JVMS for their execution" + "potentially using multiple JVMS for their execution" ) + private val defaultForkOptions = Def.task { ForkOptions( javaHome = javaHome.value, @@ -91,98 +85,278 @@ object TestParallelization { envVars = (Test / envVars).value ) } + /** * Base trait to group tests. * - * By default SBT will run all tests as if they belong to a single group, - * but allows tests to be grouped. Setting [[sbt.Keys.testGrouping]] to - * a list of groups replace the default single-group definition. - * - * When creating an instance of [[sbt.Tests.Group]] it is possible to specify - * an [[sbt.Tests.TestRunPolicy]]: this parameter can be used to use multiple - * subprocesses for test execution + * By default, SBT will run all tests as if they belong to a single group, but allows tests to be + * grouped. Setting [[sbt.Keys.testGrouping]] to a list of groups replaces the default + * single-group definition. * + * When creating an instance of [[sbt.Tests.Group]] it is possible to specify an + * [[sbt.Tests.TestRunPolicy]]: this parameter can be used to use multiple subprocesses for test + * execution */ sealed trait GroupingStrategy { /** - * Adds an [[sbt.TestDefinition]] to this GroupingStrategy and - * returns an updated Grouping Strategy + * Adds an [[sbt.TestDefinition]] to this GroupingStrategy and returns an updated Grouping + * Strategy */ def add(testDefinition: TestDefinition): GroupingStrategy - /** - * Returns the test groups built from this GroupingStrategy - */ + /** Returns the test groups built from this GroupingStrategy */ def testGroups: List[Tests.Group] } - class SimpleHashStrategy private( - groups: Map[Int, Tests.Group], - shardId: Option[Int]) extends GroupingStrategy { + /** + * GreedyHashStrategy is a grouping strategy used to distribute test suites across multiple shards + * and groups (threads) based on their estimated duration. It aims to balance the test load across + * the shards and groups by utilizing a greedy assignment algorithm that assigns test suites to + * the group with the smallest estimated runtime. + * + * @param groups The current mapping of group indices to their respective [[sbt.Tests.Group]] + * objects, which hold test definitions. + * @param shardId The shard ID that this instance is responsible for. + * @param highDurationTestAssignment Precomputed assignments of high-duration test suites to + * specific groups within the shard. + * @param groupRuntimes Array holding the current total runtime for each group within the shard. + * @param groupCounts Array holding the number of test suites assigned to each group within the + * shard. + */ + class GreedyHashStrategy private( + groups: scala.collection.mutable.Map[Int, Tests.Group], + shardId: Int, + highDurationTestAssignment: Array[List[String]], + var groupRuntimes: Array[Double] + ) extends GroupingStrategy { + import TestParallelization.GreedyHashStrategy._ + + if (shardId < 0 || shardId >= NUM_SHARDS) { + throw new IllegalArgumentException( + s"Assigned shard ID $shardId is not between 0 and ${NUM_SHARDS - 1} inclusive") + } lazy val testGroups = groups.values.toList - val groupCount = groups.size override def add(testDefinition: TestDefinition): GroupingStrategy = { + val testSuiteName = testDefinition.name + val isHighDurationTest = HIGH_DURATION_TEST_SUITES.exists(_._1 == testSuiteName) + val highDurationTestGroupIndex = + highDurationTestAssignment.indexWhere(_.contains(testSuiteName)) - /** - * The way test sharding works is that every action task is assigned a shard ID - * in the range [0, numShards - 1]. - * Tests where the (test name hash % number of shards) is equal to the - * shard ID for this action are added to the set of tests to run. - * All other tests will be assigned to other shards. - * We are guaranteed coverage since the result of the modulo math is guaranteed - * be within [0, numShards - 1] and there will be numShards actions - * that are triggered. - * - * Note: This is a simple grouping strategy which doesn't consider test - * complexity, duration or other factors. - */ - if (shardId.isDefined && numShards.isDefined) { - if (shardId.get < 0 || shardId.get >= numShards.get) { - throw new IllegalArgumentException( - s"Assigned shard ID $shardId is not between 0 and ${numShards.get - 1} inclusive") - } + println(s"Trying to assign test suite: $testSuiteName. This is shardId: $shardId.") + println(s"isHighDurationTest: $isHighDurationTest") + println(s"highDurationTestGroupIndex: $highDurationTestGroupIndex") + + if (isHighDurationTest) { + if (highDurationTestGroupIndex >= 0) { + // Case 1: this is a high duration test that belongs to this shard. Assign it. + val duration = HIGH_DURATION_TEST_SUITES.find(_._1 == testSuiteName).get._2 + println(s"[High] Assigning suite $testSuiteName ($duration mins) to shard $shardId") - val testIsAssignedToShard = - math.abs(testDefinition.name.hashCode % numShards.get) == shardId.get - if(!testIsAssignedToShard) { - return new SimpleHashStrategy(groups, shardId) + val currentGroup = groups(highDurationTestGroupIndex) + val updatedGroup = currentGroup.withTests(currentGroup.tests :+ testDefinition) + groups(highDurationTestGroupIndex) = updatedGroup + + // Do NOT update groupRuntimes -- this was already included in the initial value of + // groupRuntimes + + this + } else { + // Case 2: this is a high duration test that does NOT belong to this shard. Skip it. + println(s"[High] NOT assigning suite $testSuiteName to shard $shardId") + this } + } else if (math.abs(testDefinition.name.hashCode % NUM_SHARDS) == shardId) { + // Case 3: this is a normal test that belongs to this shard. Assign it. + println(s"[Low] Assigning suite $testSuiteName to shard $shardId") + + val minDurationGroupIndex = groupRuntimes.zipWithIndex.minBy(_._1)._2 + + println(s"groupRuntimes: ${groupRuntimes.mkString("Array(", ", ", ")")}") + println(s"minDurationGroupIndex: $minDurationGroupIndex") + + val currentGroup = groups(minDurationGroupIndex) + val updatedGroup = currentGroup.withTests(currentGroup.tests :+ testDefinition) + groups(minDurationGroupIndex) = updatedGroup + + groupRuntimes(minDurationGroupIndex) += 1.0 // assume test duration of 1 minute + + this + } else { + // Case 4: this is a normal test that does NOT belong to this shard. Skip it. + println(s"[Low] NOT assigning suite $testSuiteName to shard $shardId") + this } + } - val groupIdx = math.abs(testDefinition.name.hashCode % groupCount) - val currentGroup = groups(groupIdx) - val updatedGroup = currentGroup.withTests( - currentGroup.tests :+ testDefinition - ) - new SimpleHashStrategy(groups + (groupIdx -> updatedGroup), shardId) + override def toString: String = { + val actualDurationsStr = groupRuntimes.zipWithIndex.map { + case (actualDuration, groupIndex) => + f" Group $groupIndex: Estimated Duration = $actualDuration%.2f mins, " + + f"Count = ${groups(groupIndex).tests.size}" + }.mkString("\n") + + s""" + |Shard ID: $shardId + |Suite Group Assignments: + |$actualDurationsStr + """.stripMargin } } - object SimpleHashStrategy { + object GreedyHashStrategy { + + val NUM_SHARDS = numShardsOpt.get + + val HIGH_DURATION_TEST_SUITES: List[(String, Double)] = List( + ("org.apache.spark.sql.delta.MergeIntoDVsWithPredicatePushdownCDCSuite", 36.09), + ("org.apache.spark.sql.delta.MergeIntoDVsSuite", 33.26), + ("org.apache.spark.sql.delta.MergeIntoDVsCDCSuite", 27.39), + ("org.apache.spark.sql.delta.cdc.MergeCDCSuite", 26.24), + ("org.apache.spark.sql.delta.MergeIntoDVsWithPredicatePushdownSuite", 23.58), + ("org.apache.spark.sql.delta.MergeIntoSQLSuite", 23.01), + ("org.apache.spark.sql.delta.MergeIntoScalaSuite", 16.67), + ("org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite", 11.55), + ("org.apache.spark.sql.delta.stats.DataSkippingDeltaV1ParquetCheckpointV2Suite", 8.26), + ("org.apache.spark.sql.delta.DescribeDeltaHistorySuite", 7.16), + ("org.apache.spark.sql.delta.ImplicitMergeCastingSuite", 7.14), + ("org.apache.spark.sql.delta.stats.DataSkippingDeltaV1JsonCheckpointV2Suite", 7.0), + ("org.apache.spark.sql.delta.UpdateSQLWithDeletionVectorsSuite", 6.03), + ("org.apache.spark.sql.delta.commands.backfill.RowTrackingBackfillConflictsDVSuite", 5.97), + ("org.apache.spark.sql.delta.DeltaSourceSuite", 5.86), + ("org.apache.spark.sql.delta.cdc.UpdateCDCWithDeletionVectorsSuite", 5.67), + ("org.apache.spark.sql.delta.stats.DataSkippingDeltaV1Suite", 5.63), + ("org.apache.spark.sql.delta.DeltaSourceLargeLogSuite", 5.61), + ("org.apache.spark.sql.delta.stats.DataSkippingDeltaV1NameColumnMappingSuite", 5.43), + ("org.apache.spark.sql.delta.GenerateIdentityValuesSuite", 5.4), + ("org.apache.spark.sql.delta.commands.backfill.RowTrackingBackfillConflictsSuite", 5.02), + ("org.apache.spark.sql.delta.ImplicitStreamingMergeCastingSuite", 4.77), + ("org.apache.spark.sql.delta.DeltaVacuumWithCoordinatedCommitsBatch100Suite", 4.73), + ("org.apache.spark.sql.delta.CoordinatedCommitsBatchBackfill1DeltaLogSuite", 4.64), + ("org.apache.spark.sql.delta.DeltaLogSuite", 4.6), + ("org.apache.spark.sql.delta.IdentityColumnIngestionScalaSuite", 4.36), + ("org.apache.spark.sql.delta.DeltaVacuumSuite", 4.22), + ("org.apache.spark.sql.delta.columnmapping.RemoveColumnMappingCDCSuite", 4.12), + ("org.apache.spark.sql.delta.DeltaSuite", 4.05), + ("org.apache.spark.sql.delta.UpdateSQLSuite", 3.99), + ("org.apache.spark.sql.delta.typewidening.TypeWideningInsertSchemaEvolutionSuite", 3.92), + ("org.apache.spark.sql.delta.cdc.DeleteCDCSuite", 3.9), + ("org.apache.spark.sql.delta.CoordinatedCommitsBatchBackfill100DeltaLogSuite", 3.86), + ("org.apache.spark.sql.delta.rowid.UpdateWithRowTrackingCDCSuite", 3.83), + ("org.apache.spark.sql.delta.expressions.HilbertIndexSuite", 3.75), + ("org.apache.spark.sql.delta.DeltaProtocolVersionSuite", 3.71), + ("org.apache.spark.sql.delta.CoordinatedCommitsBatchBackfill2DeltaLogSuite", 3.68), + ("org.apache.spark.sql.delta.CheckpointsWithCoordinatedCommitsBatch100Suite", 3.59), + ("org.apache.spark.sql.delta.ConvertToDeltaScalaSuite", 3.59), + ("org.apache.spark.sql.delta.typewidening.TypeWideningTableFeatureSuite", 3.49), + ("org.apache.spark.sql.delta.cdc.UpdateCDCSuite", 3.42), + ("org.apache.spark.sql.delta.CloneTableScalaDeletionVectorSuite", 3.41), + ("org.apache.spark.sql.delta.IdentityColumnSyncScalaSuite", 3.33), + ("org.apache.spark.sql.delta.DeleteSQLSuite", 3.31), + ("org.apache.spark.sql.delta.CheckpointsWithCoordinatedCommitsBatch2Suite", 3.19), + ("org.apache.spark.sql.delta.DeltaSourceIdColumnMappingSuite", 3.18), + ("org.apache.spark.sql.delta.rowid.RowTrackingMergeCDFDVSuite", 3.18), + ("org.apache.spark.sql.delta.rowid.UpdateWithRowTrackingTableFeatureCDCSuite", 3.12), + ("org.apache.spark.sql.delta.UpdateSQLWithDeletionVectorsAndPredicatePushdownSuite", 3.01), + ("org.apache.spark.sql.delta.rowid.RowTrackingMergeDVSuite", 2.97) + ) + + /** + * Generate the optimal test assignment across shards and groups. + * + * @param numShards Number of shards + * @param numGroups Number of groups per shard + */ + def generateOptimalAssignment(numShards: Int, numGroups: Int): + (Array[Array[List[String]]], Array[Array[Double]]) = { + val assignment = Array.fill(numShards)(Array.fill(numGroups)(List.empty[String])) + val groupDurations = Array.fill(numShards)(Array.fill(numGroups)(0.0)) + val shardDurations = Array.fill(numShards)(0.0) + val sortedTestSuites = HIGH_DURATION_TEST_SUITES.sortBy(-_._2) + + sortedTestSuites.foreach { case (testSuiteName, duration) => + val (shardIdx, groupIdx) = + findBestShardAndGroup(numShards, numGroups, shardDurations, groupDurations) + + assignment(shardIdx)(groupIdx) = assignment(shardIdx)(groupIdx) :+ testSuiteName + groupDurations(shardIdx)(groupIdx) += duration + shardDurations(shardIdx) += duration + } + + (assignment, groupDurations) + } + + /** + * Finds the best shard and group to assign the next test suite. + * Selects the group with the smallest total duration, and in case of ties, selects the shard + * with the smallest total duration. + * + * @param numShards Number of shards + * @param numGroups Number of groups per shard + * @param shardDurations Total duration per shard + * @param groupDurations Total duration per group in each shard + * @return Tuple of (shard index, group index) for the optimal assignment + */ + def findBestShardAndGroup( + numShards: Int, + numGroups: Int, + shardDurations: Array[Double], + groupDurations: Array[Array[Double]] + ): (Int, Int) = { + var bestShardIdx = -1 + var bestGroupIdx = -1 + var minGroupDuration = Double.MaxValue + var minShardDuration = Double.MaxValue + + for (shardIdx <- 0 until numShards) { + for (groupIdx <- 0 until numGroups) { + val currentGroupDuration = groupDurations(shardIdx)(groupIdx) + val currentShardDuration = shardDurations(shardIdx) + + if (currentGroupDuration < minGroupDuration || + (currentGroupDuration == minGroupDuration && + currentShardDuration < minShardDuration)) { + minGroupDuration = currentGroupDuration + minShardDuration = currentShardDuration + bestShardIdx = shardIdx + bestGroupIdx = groupIdx + } + } + } + + (bestShardIdx, bestGroupIdx) + } def apply( groupCount: Int, baseDir: File, - shard: Option[Int], + shardId: Int, forkOptionsTemplate: ForkOptions): GroupingStrategy = { - val testGroups = (0 until groupCount).map { + val testGroups = scala.collection.mutable.Map((0 until groupCount).map { groupIdx => val forkOptions = forkOptionsTemplate.withRunJVMOptions( runJVMOptions = forkOptionsTemplate.runJVMOptions ++ - Seq(s"-Djava.io.tmpdir=${baseDir}/target/tmp/$groupIdx") + Seq(s"-Djava.io.tmpdir=$baseDir/target/tmp/$groupIdx") ) val group = Tests.Group( - name = s"Test group ${groupIdx}", + name = s"Test group $groupIdx", tests = Nil, runPolicy = Tests.SubProcess(forkOptions) ) groupIdx -> group - } - new SimpleHashStrategy(testGroups.toMap, shard) + }: _*) + + val (allTestAssignments, allGroupDurations) = + generateOptimalAssignment(NUM_SHARDS, groupCount) + + new GreedyHashStrategy( + testGroups, + shardId, + allTestAssignments(shardId), + allGroupDurations(shardId) + ) } } - } From 5a3e12cec2a2685344df0a31848182dd72502a7c Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 23 Sep 2024 12:14:28 -0700 Subject: [PATCH 2/7] remove extra printlns --- project/TestParallelization.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/project/TestParallelization.scala b/project/TestParallelization.scala index c4837f90b21..2b8a57859e6 100644 --- a/project/TestParallelization.scala +++ b/project/TestParallelization.scala @@ -145,15 +145,15 @@ object TestParallelization { val highDurationTestGroupIndex = highDurationTestAssignment.indexWhere(_.contains(testSuiteName)) - println(s"Trying to assign test suite: $testSuiteName. This is shardId: $shardId.") - println(s"isHighDurationTest: $isHighDurationTest") - println(s"highDurationTestGroupIndex: $highDurationTestGroupIndex") + // println(s"Trying to assign test suite: $testSuiteName. This is shardId: $shardId.") + // println(s"isHighDurationTest: $isHighDurationTest") + // println(s"highDurationTestGroupIndex: $highDurationTestGroupIndex") if (isHighDurationTest) { if (highDurationTestGroupIndex >= 0) { // Case 1: this is a high duration test that belongs to this shard. Assign it. val duration = HIGH_DURATION_TEST_SUITES.find(_._1 == testSuiteName).get._2 - println(s"[High] Assigning suite $testSuiteName ($duration mins) to shard $shardId") + // println(s"[High] Assigning suite $testSuiteName ($duration mins) to shard $shardId") val currentGroup = groups(highDurationTestGroupIndex) val updatedGroup = currentGroup.withTests(currentGroup.tests :+ testDefinition) @@ -165,17 +165,17 @@ object TestParallelization { this } else { // Case 2: this is a high duration test that does NOT belong to this shard. Skip it. - println(s"[High] NOT assigning suite $testSuiteName to shard $shardId") + // println(s"[High] NOT assigning suite $testSuiteName to shard $shardId") this } } else if (math.abs(testDefinition.name.hashCode % NUM_SHARDS) == shardId) { // Case 3: this is a normal test that belongs to this shard. Assign it. - println(s"[Low] Assigning suite $testSuiteName to shard $shardId") + // println(s"[Low] Assigning suite $testSuiteName to shard $shardId") val minDurationGroupIndex = groupRuntimes.zipWithIndex.minBy(_._1)._2 - println(s"groupRuntimes: ${groupRuntimes.mkString("Array(", ", ", ")")}") - println(s"minDurationGroupIndex: $minDurationGroupIndex") + // println(s"groupRuntimes: ${groupRuntimes.mkString("Array(", ", ", ")")}") + // println(s"minDurationGroupIndex: $minDurationGroupIndex") val currentGroup = groups(minDurationGroupIndex) val updatedGroup = currentGroup.withTests(currentGroup.tests :+ testDefinition) @@ -186,7 +186,7 @@ object TestParallelization { this } else { // Case 4: this is a normal test that does NOT belong to this shard. Skip it. - println(s"[Low] NOT assigning suite $testSuiteName to shard $shardId") + // println(s"[Low] NOT assigning suite $testSuiteName to shard $shardId") this } } From 5adc30220feebfffed750842e06e7c788675c002 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 23 Sep 2024 13:05:11 -0700 Subject: [PATCH 3/7] use 0.71 min as default instead of 1.0; minor cleanup --- project/TestParallelization.scala | 65 ++++++++++++------------------- 1 file changed, 25 insertions(+), 40 deletions(-) diff --git a/project/TestParallelization.scala b/project/TestParallelization.scala index 2b8a57859e6..4fae135f46d 100644 --- a/project/TestParallelization.scala +++ b/project/TestParallelization.scala @@ -115,19 +115,17 @@ object TestParallelization { * the shards and groups by utilizing a greedy assignment algorithm that assigns test suites to * the group with the smallest estimated runtime. * - * @param groups The current mapping of group indices to their respective [[sbt.Tests.Group]] + * @param groups The initial mapping of group indices to their respective [[sbt.Tests.Group]] * objects, which hold test definitions. * @param shardId The shard ID that this instance is responsible for. * @param highDurationTestAssignment Precomputed assignments of high-duration test suites to * specific groups within the shard. * @param groupRuntimes Array holding the current total runtime for each group within the shard. - * @param groupCounts Array holding the number of test suites assigned to each group within the - * shard. */ class GreedyHashStrategy private( groups: scala.collection.mutable.Map[Int, Tests.Group], shardId: Int, - highDurationTestAssignment: Array[List[String]], + highDurationTestAssignment: Array[Set[String]], var groupRuntimes: Array[Double] ) extends GroupingStrategy { import TestParallelization.GreedyHashStrategy._ @@ -141,19 +139,14 @@ object TestParallelization { override def add(testDefinition: TestDefinition): GroupingStrategy = { val testSuiteName = testDefinition.name - val isHighDurationTest = HIGH_DURATION_TEST_SUITES.exists(_._1 == testSuiteName) + val isHighDurationTest = TOP_50_HIGH_DURATION_TEST_SUITES.exists(_._1 == testSuiteName) val highDurationTestGroupIndex = highDurationTestAssignment.indexWhere(_.contains(testSuiteName)) - // println(s"Trying to assign test suite: $testSuiteName. This is shardId: $shardId.") - // println(s"isHighDurationTest: $isHighDurationTest") - // println(s"highDurationTestGroupIndex: $highDurationTestGroupIndex") - if (isHighDurationTest) { if (highDurationTestGroupIndex >= 0) { // Case 1: this is a high duration test that belongs to this shard. Assign it. - val duration = HIGH_DURATION_TEST_SUITES.find(_._1 == testSuiteName).get._2 - // println(s"[High] Assigning suite $testSuiteName ($duration mins) to shard $shardId") + val duration = TOP_50_HIGH_DURATION_TEST_SUITES.find(_._1 == testSuiteName).get._2 val currentGroup = groups(highDurationTestGroupIndex) val updatedGroup = currentGroup.withTests(currentGroup.tests :+ testDefinition) @@ -165,28 +158,22 @@ object TestParallelization { this } else { // Case 2: this is a high duration test that does NOT belong to this shard. Skip it. - // println(s"[High] NOT assigning suite $testSuiteName to shard $shardId") this } } else if (math.abs(testDefinition.name.hashCode % NUM_SHARDS) == shardId) { // Case 3: this is a normal test that belongs to this shard. Assign it. - // println(s"[Low] Assigning suite $testSuiteName to shard $shardId") val minDurationGroupIndex = groupRuntimes.zipWithIndex.minBy(_._1)._2 - // println(s"groupRuntimes: ${groupRuntimes.mkString("Array(", ", ", ")")}") - // println(s"minDurationGroupIndex: $minDurationGroupIndex") - val currentGroup = groups(minDurationGroupIndex) val updatedGroup = currentGroup.withTests(currentGroup.tests :+ testDefinition) groups(minDurationGroupIndex) = updatedGroup - groupRuntimes(minDurationGroupIndex) += 1.0 // assume test duration of 1 minute + groupRuntimes(minDurationGroupIndex) += AVG_TEST_SUITE_DURATION_EXCLUDING_SLOWEST_50 this } else { // Case 4: this is a normal test that does NOT belong to this shard. Skip it. - // println(s"[Low] NOT assigning suite $testSuiteName to shard $shardId") this } } @@ -210,7 +197,10 @@ object TestParallelization { val NUM_SHARDS = numShardsOpt.get - val HIGH_DURATION_TEST_SUITES: List[(String, Double)] = List( + val AVG_TEST_SUITE_DURATION_EXCLUDING_SLOWEST_50 = 0.71 + + /** 50 slowest test suites and their durations. */ + val TOP_50_HIGH_DURATION_TEST_SUITES: List[(String, Double)] = List( ("org.apache.spark.sql.delta.MergeIntoDVsWithPredicatePushdownCDCSuite", 36.09), ("org.apache.spark.sql.delta.MergeIntoDVsSuite", 33.26), ("org.apache.spark.sql.delta.MergeIntoDVsCDCSuite", 27.39), @@ -264,28 +254,25 @@ object TestParallelization { ) /** - * Generate the optimal test assignment across shards and groups. - * - * @param numShards Number of shards - * @param numGroups Number of groups per shard + * Generate the optimal test assignment across shards and groups for high duration test suites. */ - def generateOptimalAssignment(numShards: Int, numGroups: Int): - (Array[Array[List[String]]], Array[Array[Double]]) = { - val assignment = Array.fill(numShards)(Array.fill(numGroups)(List.empty[String])) - val groupDurations = Array.fill(numShards)(Array.fill(numGroups)(0.0)) - val shardDurations = Array.fill(numShards)(0.0) - val sortedTestSuites = HIGH_DURATION_TEST_SUITES.sortBy(-_._2) + def highDurationOptimalAssignment(numGroups: Int): + (Array[Array[Set[String]]], Array[Array[Double]]) = { + val assignment = Array.fill(NUM_SHARDS)(Array.fill(numGroups)(List.empty[String])) + val groupDurations = Array.fill(NUM_SHARDS)(Array.fill(numGroups)(0.0)) + val shardDurations = Array.fill(NUM_SHARDS)(0.0) + val sortedTestSuites = TOP_50_HIGH_DURATION_TEST_SUITES.sortBy(-_._2) sortedTestSuites.foreach { case (testSuiteName, duration) => val (shardIdx, groupIdx) = - findBestShardAndGroup(numShards, numGroups, shardDurations, groupDurations) + findShardAndGroupWithLowestDuration(numGroups, shardDurations, groupDurations) assignment(shardIdx)(groupIdx) = assignment(shardIdx)(groupIdx) :+ testSuiteName groupDurations(shardIdx)(groupIdx) += duration shardDurations(shardIdx) += duration } - (assignment, groupDurations) + (assignment.map(_.map(_.toSet)), groupDurations) } /** @@ -299,18 +286,16 @@ object TestParallelization { * @param groupDurations Total duration per group in each shard * @return Tuple of (shard index, group index) for the optimal assignment */ - def findBestShardAndGroup( - numShards: Int, + private def findShardAndGroupWithLowestDuration( numGroups: Int, shardDurations: Array[Double], - groupDurations: Array[Array[Double]] - ): (Int, Int) = { + groupDurations: Array[Array[Double]]): (Int, Int) = { var bestShardIdx = -1 var bestGroupIdx = -1 var minGroupDuration = Double.MaxValue var minShardDuration = Double.MaxValue - for (shardIdx <- 0 until numShards) { + for (shardIdx <- 0 until NUM_SHARDS) { for (groupIdx <- 0 until numGroups) { val currentGroupDuration = groupDurations(shardIdx)(groupIdx) val currentShardDuration = shardDurations(shardIdx) @@ -348,14 +333,14 @@ object TestParallelization { groupIdx -> group }: _*) - val (allTestAssignments, allGroupDurations) = - generateOptimalAssignment(NUM_SHARDS, groupCount) + val (allShardsTestAssignments, allShardsGroupDurations) = + highDurationOptimalAssignment(groupCount) new GreedyHashStrategy( testGroups, shardId, - allTestAssignments(shardId), - allGroupDurations(shardId) + allShardsTestAssignments(shardId), + allShardsGroupDurations(shardId) ) } } From 2761e90612dd7d25540e766cb165bd7b8783a393 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 23 Sep 2024 14:01:12 -0700 Subject: [PATCH 4/7] increase num shards --- .github/workflows/spark_test.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/spark_test.yaml b/.github/workflows/spark_test.yaml index f521673f0f7..3d0b61c3abc 100644 --- a/.github/workflows/spark_test.yaml +++ b/.github/workflows/spark_test.yaml @@ -9,11 +9,11 @@ jobs: # These Scala versions must match those in the build.sbt scala: [2.12.18, 2.13.13] # Important: This list of shards must be [0..NUM_SHARDS - 1] - shard: [0, 1, 2] + shard: [0, 1, 2, 3] env: SCALA_VERSION: ${{ matrix.scala }} # Important: This must be the same as the length of shards in matrix - NUM_SHARDS: 3 + NUM_SHARDS: 4 steps: - uses: actions/checkout@v3 - uses: technote-space/get-diff-action@v4 From f27edf23bdaa49e151e81304e12e0271e1d0382e Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 23 Sep 2024 16:38:33 -0700 Subject: [PATCH 5/7] Rename strategy class; use different hash --- project/TestParallelization.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/project/TestParallelization.scala b/project/TestParallelization.scala index 4fae135f46d..54476df5c4c 100644 --- a/project/TestParallelization.scala +++ b/project/TestParallelization.scala @@ -1,3 +1,5 @@ +import scala.util.hashing.MurmurHash3 + import sbt.Keys._ import sbt._ @@ -43,7 +45,9 @@ object TestParallelization { ) } - /** Sets the Test / testGroupingStrategy Task to an instance of the SimpleHashStrategy */ + /** + * Sets the Test / testGroupingStrategy Task to an instance of the MinShardGroupDurationStrategy + */ lazy val simpleGroupingStrategySettings = Seq( Test / forkTestJVMCount := { testParallelismOpt.getOrElse(java.lang.Runtime.getRuntime.availableProcessors) @@ -53,7 +57,7 @@ object TestParallelization { val groupsCount = (Test / forkTestJVMCount).value val shard = (Test / shardId).value val baseJvmDir = baseDirectory.value - GreedyHashStrategy(groupsCount, baseJvmDir, shard, defaultForkOptions.value) + MinShardGroupDurationStrategy(groupsCount, baseJvmDir, shard, defaultForkOptions.value) }, Test / parallelExecution := true, Global / concurrentRestrictions := { @@ -122,13 +126,13 @@ object TestParallelization { * specific groups within the shard. * @param groupRuntimes Array holding the current total runtime for each group within the shard. */ - class GreedyHashStrategy private( + class MinShardGroupDurationStrategy private( groups: scala.collection.mutable.Map[Int, Tests.Group], shardId: Int, highDurationTestAssignment: Array[Set[String]], var groupRuntimes: Array[Double] ) extends GroupingStrategy { - import TestParallelization.GreedyHashStrategy._ + import TestParallelization.MinShardGroupDurationStrategy._ if (shardId < 0 || shardId >= NUM_SHARDS) { throw new IllegalArgumentException( @@ -160,7 +164,7 @@ object TestParallelization { // Case 2: this is a high duration test that does NOT belong to this shard. Skip it. this } - } else if (math.abs(testDefinition.name.hashCode % NUM_SHARDS) == shardId) { + } else if (math.abs(MurmurHash3.stringHash(testDefinition.name) % NUM_SHARDS) == shardId) { // Case 3: this is a normal test that belongs to this shard. Assign it. val minDurationGroupIndex = groupRuntimes.zipWithIndex.minBy(_._1)._2 @@ -193,7 +197,7 @@ object TestParallelization { } } - object GreedyHashStrategy { + object MinShardGroupDurationStrategy { val NUM_SHARDS = numShardsOpt.get @@ -336,7 +340,7 @@ object TestParallelization { val (allShardsTestAssignments, allShardsGroupDurations) = highDurationOptimalAssignment(groupCount) - new GreedyHashStrategy( + new MinShardGroupDurationStrategy( testGroups, shardId, allShardsTestAssignments(shardId), From e2517f3e08fd0b6b59e3648418bfe64f6deae795 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 24 Sep 2024 15:56:01 -0700 Subject: [PATCH 6/7] Comment cleanup; move variable to smaller scope --- project/TestParallelization.scala | 60 +++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/project/TestParallelization.scala b/project/TestParallelization.scala index 54476df5c4c..769d4fbcec1 100644 --- a/project/TestParallelization.scala +++ b/project/TestParallelization.scala @@ -3,8 +3,6 @@ import scala.util.hashing.MurmurHash3 import sbt.Keys._ import sbt._ -// scalastyle:off println - object TestParallelization { lazy val numShardsOpt = sys.env.get("NUM_SHARDS").map(_.toInt) @@ -144,12 +142,14 @@ object TestParallelization { override def add(testDefinition: TestDefinition): GroupingStrategy = { val testSuiteName = testDefinition.name val isHighDurationTest = TOP_50_HIGH_DURATION_TEST_SUITES.exists(_._1 == testSuiteName) - val highDurationTestGroupIndex = - highDurationTestAssignment.indexWhere(_.contains(testSuiteName)) if (isHighDurationTest) { + val highDurationTestGroupIndex = + highDurationTestAssignment.indexWhere(_.contains(testSuiteName)) + if (highDurationTestGroupIndex >= 0) { - // Case 1: this is a high duration test that belongs to this shard. Assign it. + // Case 1: this is a high duration test that was pre-computed in the optimal assignment to + // belong to this shard. Assign it. val duration = TOP_50_HIGH_DURATION_TEST_SUITES.find(_._1 == testSuiteName).get._2 val currentGroup = groups(highDurationTestGroupIndex) @@ -168,7 +168,6 @@ object TestParallelization { // Case 3: this is a normal test that belongs to this shard. Assign it. val minDurationGroupIndex = groupRuntimes.zipWithIndex.minBy(_._1)._2 - val currentGroup = groups(minDurationGroupIndex) val updatedGroup = currentGroup.withTests(currentGroup.tests :+ testDefinition) groups(minDurationGroupIndex) = updatedGroup @@ -258,7 +257,53 @@ object TestParallelization { ) /** - * Generate the optimal test assignment across shards and groups for high duration test suites. + * Generates the optimal test assignment across shards and groups for high duration test suites. + * + * Will assign the high duration test suites in descending order, always assigning to the + * group with the smallest total duration. In case of ties (e.g. early on when some group + * durations are still 0, will assign to the shard with the smallest total duration). + * + * Here's a simple example using 3 shards and 2 groups per shard: + * + * Test 1: MergeIntoDVsWithPredicatePushdownCDCSuite (36.09 mins) --> Shard 0, Group 0 + * - Shard 0: Group 0 = 36.09 mins, Group 1 = 0.0 mins + * - Shard 1: Group 0 = 0.0 mins, Group 1 = 0.0 mins + * - Shard 2: Group 0 = 0.0 mins, Group 1 = 0.0 mins + * + * Test 2: MergeIntoDVsSuite (33.26 mins) --> Shard 1, Group 0 + * - Shard 0: Group 0 = 36.09 mins, Group 1 = 0.0 mins + * - Shard 1: Group 0 = 33.26 mins, Group 1 = 0.0 mins + * - Shard 2: Group 0 = 0.0 mins, Group 1 = 0.0 mins + * + * Test 3: MergeIntoDVsCDCSuite (27.39 mins) --> Shard 2, Group 0 + * - Shard 0: Group 0 = 36.09 mins, Group 1 = 0.0 mins + * - Shard 1: Group 0 = 33.26 mins, Group 1 = 0.0 mins + * - Shard 2: Group 0 = 27.39 mins, Group 1 = 0.0 mins + * + * Test 4: MergeCDCSuite (26.24 mins) --> Shard 2, Group 1 + * - Shard 0: Group 0 = 36.09 mins, Group 1 = 0.0 mins + * - Shard 1: Group 0 = 33.26 mins, Group 1 = 0.0 mins + * - Shard 2: Group 0 = 27.39 mins, Group 1 = 26.24 mins + * + * Test 5: MergeIntoDVsWithPredicatePushdownSuite (23.58 mins) -> Shard 1, Group 1 + * - Shard 0: Group 0 = 36.09 mins, Group 1 = 0.0 mins + * - Shard 1: Group 0 = 33.26 mins, Group 1 = 23.58 mins + * - Shard 2: Group 0 = 27.39 mins, Group 1 = 26.24 mins + * + * Test 6: MergeIntoSQLSuite (23.01 mins) --> Shard 0, Group 1 + * - Shard 0: Group 0 = 36.09 mins, Group 1 = 23.01 mins + * - Shard 1: Group 0 = 33.26 mins, Group 1 = 23.58 mins + * - Shard 2: Group 0 = 27.39 mins, Group 1 = 26.24 mins + * + * Test 7: MergeIntoScalaSuite (16.67 mins) --> Shard 0, Group 1 + * - Shard 0: Group 0 = 36.09 mins, Group 1 = 39.68 mins + * - Shard 1: Group 0 = 33.26 mins, Group 1 = 23.58 mins + * - Shard 2: Group 0 = 27.39 mins, Group 1 = 26.24 mins + * + * Test 8: DeletionVectorsSuite (11.55 mins) --> Shard 1, Group 1 + * - Shard 0: Group 0 = 36.09 mins, Group 1 = 39.68 mins + * - Shard 1: Group 0 = 33.26 mins, Group 1 = 35.13 mins + * - Shard 2: Group 0 = 27.39 mins, Group 1 = 26.24 mins */ def highDurationOptimalAssignment(numGroups: Int): (Array[Array[Set[String]]], Array[Array[Double]]) = { @@ -281,6 +326,7 @@ object TestParallelization { /** * Finds the best shard and group to assign the next test suite. + * * Selects the group with the smallest total duration, and in case of ties, selects the shard * with the smallest total duration. * From ba1cf84c867648d9504e03e086aaf8721d87c60f Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 24 Sep 2024 15:56:12 -0700 Subject: [PATCH 7/7] Create project/README.md --- project/README.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 project/README.md diff --git a/project/README.md b/project/README.md new file mode 100644 index 00000000000..8510aa29724 --- /dev/null +++ b/project/README.md @@ -0,0 +1,30 @@ +# Updating delta-spark TestParallelization Top 50 Slowest Test Suites List + +- Cherry-pick changes from https://github.com/delta-io/delta/pull/3694 +- That PR adds a test report listener to delta-spark that will output csv files containing per-JVM, per-group (thread), and per-test runtimes +- Run the CI and download the generated csv artifacts +- You can use the following pyspark code to get the top 50 slowest test suites +- You can copy and paste that into Chat GPT and ask it to format it as a Scala List + +```python +from pyspark.sql.functions import col, sum +from pyspark.sql.types import StructType, StructField, StringType, LongType + +schema = StructType([ + StructField("test_suite", StringType(), True), + StructField("test_name", StringType(), True), + StructField("execution_time_ms", LongType(), True), + StructField("result", StringType(), True) +]) + +csv_dir = "..." + +spark.read.csv(csv_dir, schema=schema) \ + .filter(col("execution_time_ms") != -1) \ + .groupBy("test_suite") \ + .agg((sum("execution_time_ms") / 60000).alias("execution_time_mins")) \ + .orderBy(col("execution_time_mins").desc()) \ + .limit(50) \ + .select("test_suite", "execution_time_mins") \ + .show(50, truncate=False) +```