Skip to content

Commit

Permalink
Do not call Thread.yield() until necessary (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndkoval authored Jun 13, 2023
1 parent 6613cd8 commit a2d8ca4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.io.*
import java.lang.*
import java.util.concurrent.*
import java.util.concurrent.locks.*
import kotlin.math.*

/**
* This executor maintains the specified number of threads and is used by
Expand All @@ -26,37 +25,20 @@ import kotlin.math.*
* possible, so that they should not be parked and unparked between invocations.
*/
internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash: Int) : Closeable {
// Threads used in this runner.
/**
* Threads used in this runner.
*/
val threads: List<TestThread>

/**
* null, waiting TestThread, Runnable task, or SHUTDOWN
*/
private val tasks = atomicArrayOfNulls<Any>(nThreads)

/**
* null, waiting in [submitAndAwait] thread, DONE, or exception
*/
private val results = atomicArrayOfNulls<Any>(nThreads)
/**
* Specifies the number of loop cycles for threads
* active waiting, after that they should be parked
*/
private var spinCount = 40000
/**
* An adaptive active waiting strategy is used for the case when
* the number of threads is greater than the number of cores.
* This flag is set to `true` when any of the threads was parked
* during the previous [submitAndAwait] call.
*/
@Volatile
private var wasParked: Boolean = false
/**
* This balance is either increased or decreased by 1 at the
* end of each invocation when [wasParked] is `true` or `false`
* correspondingly. When the balance achieves [WAS_PARK_BALANCE_THRESHOLD],
* [spinCount] is doubled, and when the balance achieves
* -[WAS_PARK_BALANCE_THRESHOLD], [spinCount] is halved.
*/
private var wasParkedBalance: Int = 0

/**
* This flag is set to `true` when [await] detects a hang.
Expand All @@ -83,7 +65,6 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
require(tasks.size == nThreads)
submitTasks(tasks)
await(timeoutMs)
updateAdaptiveSpinCount()
}

private fun submitTasks(tasks: Array<out Any>) {
Expand All @@ -93,23 +74,6 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
}
}

private fun updateAdaptiveSpinCount() {
if (wasParked) {
wasParked = false
wasParkedBalance++
if (wasParkedBalance >= WAS_PARK_BALANCE_THRESHOLD) {
spinCount /= 2
wasParkedBalance = 0
}
} else {
wasParkedBalance--
if (wasParkedBalance <= -WAS_PARK_BALANCE_THRESHOLD) {
spinCount = min(spinCount * 2, MAX_SPIN_COUNT)
wasParkedBalance = 0
}
}
}

private fun submitTask(iThread: Int, task: Any) {
if (tasks[iThread].compareAndSet(null, task)) return
// CAS failed => a test thread is parked.
Expand Down Expand Up @@ -192,12 +156,11 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
}

private inline fun spinWait(getter: () -> Any?): Any? {
repeat(spinCount) {
repeat(SPINNING_LOOP_ITERATIONS_BEFORE_PARK) {
getter()?.let {
return it
}
}
wasParked = true
return null
}

Expand All @@ -212,11 +175,9 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
class TestThread(val iThread: Int, val runnerHash: Int, r: Runnable) : Thread(r, "FixedActiveThreadsExecutor@$runnerHash-$iThread") {
var cont: CancellableContinuation<*>? = null
}

companion object {
private val SHUTDOWN = Any()
private val DONE = Any()
private const val MAX_SPIN_COUNT = 1_000_000
private const val WAS_PARK_BALANCE_THRESHOLD = 20
}
}

private const val SPINNING_LOOP_ITERATIONS_BEFORE_PARK = 1000_000

private val SHUTDOWN = Any()
private val DONE = Any()
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import kotlinx.coroutines.*
import org.jetbrains.kotlinx.lincheck.*
import org.jetbrains.kotlinx.lincheck.CancellationResult.*
import org.jetbrains.kotlinx.lincheck.execution.*
import org.jetbrains.kotlinx.lincheck.runner.FixedActiveThreadsExecutor.TestThread
import org.jetbrains.kotlinx.lincheck.runner.FixedActiveThreadsExecutor.*
import org.jetbrains.kotlinx.lincheck.runner.ParallelThreadsRunner.Completion.*
import org.jetbrains.kotlinx.lincheck.runner.UseClocks.*
import org.jetbrains.kotlinx.lincheck.strategy.*
import org.objectweb.asm.*
Expand Down Expand Up @@ -64,7 +65,6 @@ internal open class ParallelThreadsRunner(
private fun trySetCancelledStatus(iThread: Int, actorId: Int) = completionStatuses[iThread].compareAndSet(actorId, null, CompletionStatus.CANCELLED)

private val uninitializedThreads = AtomicInteger(scenario.threads) // for threads synchronization
private var spinningTimeBeforeYield = 1000 // # of loop cycles
private var yieldInvokedInOnStart = false

override fun initialize() {
Expand Down Expand Up @@ -148,13 +148,6 @@ internal open class ParallelThreadsRunner(
AtomicReferenceArray<CompletionStatus>(scenario.parallelExecution[t].size)
}
uninitializedThreads.set(scenario.threads)
// update `spinningTimeBeforeYield` adaptively
if (yieldInvokedInOnStart) {
spinningTimeBeforeYield = (spinningTimeBeforeYield + 1) / 2
yieldInvokedInOnStart = false
} else {
spinningTimeBeforeYield = (spinningTimeBeforeYield * 2).coerceAtMost(MAX_SPINNING_TIME_BEFORE_YIELD)
}
// reset stored continuations
executor.threads.forEach { it.cont = null }
}
Expand Down Expand Up @@ -202,7 +195,7 @@ internal open class ParallelThreadsRunner(
suspensionPointResults[iThread][actorId] = NoResult
return Suspended
}
if (i++ % spinningTimeBeforeYield == 0) Thread.yield()
if (i++ % SPINNING_LOOP_ITERATIONS_BEFORE_YIELD == 0) Thread.yield()
}
// Coroutine will be resumed. Call method so that strategy can learn it.
afterCoroutineResumed(iThread)
Expand Down Expand Up @@ -308,7 +301,7 @@ internal open class ParallelThreadsRunner(
// wait for other threads to start
var i = 1
while (uninitializedThreads.get() != 0) {
if (i % spinningTimeBeforeYield == 0) {
if (i % SPINNING_LOOP_ITERATIONS_BEFORE_YIELD == 0) {
yieldInvokedInOnStart = true
Thread.yield()
}
Expand All @@ -332,4 +325,4 @@ internal enum class UseClocks { ALWAYS, RANDOM }

internal enum class CompletionStatus { CANCELLED, RESUMED }

private const val MAX_SPINNING_TIME_BEFORE_YIELD = 2_000_000
private const val SPINNING_LOOP_ITERATIONS_BEFORE_YIELD = 100_000
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,11 @@ abstract class ManagedStrategy(
*/
private fun awaitTurn(iThread: Int) {
// Wait actively until the thread is allowed to continue
var i = 0
while (currentThread != iThread) {
// Finish forcibly if an error occurred and we already have an `InvocationResult`.
if (suddenInvocationResult != null) throw ForcibleExecutionFinishException
Thread.yield()
if (++i % SPINNING_LOOP_ITERATIONS_BEFORE_YIELD == 0) Thread.yield()
}
}

Expand Down Expand Up @@ -916,3 +917,5 @@ internal object ForcibleExecutionFinishException : RuntimeException() {
}

private const val COROUTINE_SUSPENSION_CODE_LOCATION = -1 // currently the exact place of coroutine suspension is not known

private const val SPINNING_LOOP_ITERATIONS_BEFORE_YIELD = 100_000

0 comments on commit a2d8ca4

Please sign in to comment.