From a042921cbf8c81f8b931388db428d9333be8f96a Mon Sep 17 00:00:00 2001 From: Dan Sebastian Thrane Date: Wed, 10 May 2023 14:21:42 +0200 Subject: [PATCH] Version 2023.2.8 --- backend/version.txt | 2 +- .../compute/ucloud/ContainerRuntime.kt | 1 + .../plugins/compute/ucloud/JobManagement.kt | 1 - .../compute/ucloud/KubernetesResources.kt | 8 +- .../kotlin/plugins/compute/ucloud/Plugin.kt | 99 +++++++-- .../plugins/compute/ucloud/Pod2Runtime.kt | 208 ++++++++++-------- .../plugins/compute/ucloud/Scheduler.kt | 95 ++++++-- 7 files changed, 291 insertions(+), 123 deletions(-) diff --git a/backend/version.txt b/backend/version.txt index 57c93bd327..5fac5b1027 100644 --- a/backend/version.txt +++ b/backend/version.txt @@ -1 +1 @@ -2023.2.3 +2023.2.8 diff --git a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/ContainerRuntime.kt b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/ContainerRuntime.kt index 450081b595..e31b8b0438 100644 --- a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/ContainerRuntime.kt +++ b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/ContainerRuntime.kt @@ -76,6 +76,7 @@ interface ContainerRuntime { } fun requiresReschedulingOfInQueueJobsOnStartup(): Boolean = false + fun notifyReschedulingComplete() {} } data class Tunnel(val hostnameOrIpAddress: String, val port: Int, val close: suspend () -> Unit) diff --git a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/JobManagement.kt b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/JobManagement.kt index 9a4bd7ee54..017360ca25 100644 --- a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/JobManagement.kt +++ b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/JobManagement.kt @@ -105,7 +105,6 @@ class JobManagement( val actualJob = JobsControl.retrieve.call( ResourceRetrieveRequest( JobIncludeFlags( - includeProduct = true, includeApplication = true, includeParameters = true, ), diff --git a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/KubernetesResources.kt b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/KubernetesResources.kt index 0754dcdb0e..2e4f8ae25f 100644 --- a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/KubernetesResources.kt +++ b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/KubernetesResources.kt @@ -681,7 +681,8 @@ data class Node( var apiVersion: String = "v1", var kind: String = "Node", var metadata: ObjectMeta? = null, - var status: Status? = null + var status: Status? = null, + var spec: Spec? = null, ) { @Serializable data class Status( @@ -704,6 +705,11 @@ data class Node( @SerialName("nvidia.com/gpu") var nvidiaGpu: String? = null, ) + + @Serializable + data class Spec( + var unschedulable: Boolean? = null + ) } @Serializable diff --git a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Plugin.kt b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Plugin.kt index 0c7c8ed46f..a5a544d7ed 100644 --- a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Plugin.kt +++ b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Plugin.kt @@ -19,6 +19,7 @@ import dk.sdu.cloud.controllers.ComputeSessionIpc import dk.sdu.cloud.controllers.RequestContext import dk.sdu.cloud.dbConnection import dk.sdu.cloud.ipc.IpcContainer +import dk.sdu.cloud.ipc.IpcHandler import dk.sdu.cloud.ipc.handler import dk.sdu.cloud.ipc.sendRequest import dk.sdu.cloud.logThrowable @@ -34,6 +35,7 @@ import dk.sdu.cloud.plugins.ipcServer import dk.sdu.cloud.plugins.rpcClient import dk.sdu.cloud.plugins.storage.ucloud.UCloudFilePlugin import dk.sdu.cloud.utils.forEachGraal +import dk.sdu.cloud.utils.sendTerminalMessage import dk.sdu.cloud.utils.sendTerminalTable import dk.sdu.cloud.utils.whileGraal import kotlinx.coroutines.DelicateCoroutinesApi @@ -42,6 +44,7 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.selects.select +import kotlinx.serialization.Serializable import kotlinx.serialization.builtins.serializer import kotlin.coroutines.coroutineContext @@ -71,12 +74,15 @@ class UCloudComputePlugin : ComputePlugin, SyncthingPlugin { @OptIn(DelicateCoroutinesApi::class) override suspend fun PluginContext.initialize() { + registerCli() if (!config.shouldRunServerCode()) return files = config.plugins.files[pluginName] as? UCloudFilePlugin ?: run { - error("Must have storage configured for the UCloud compute plugin ($pluginName). " + - "It appears that '${pluginName}' does not point to a valid UCloud storage plugin.") + error( + "Must have storage configured for the UCloud compute plugin ($pluginName). " + + "It appears that '${pluginName}' does not point to a valid UCloud storage plugin." + ) } jobCache = VerifiedJobCache(rpcClient) @@ -212,10 +218,50 @@ class UCloudComputePlugin : ComputePlugin, SyncthingPlugin { files.driveLocator.onEnteringMaintenanceMode { killJobsEnteringMaintenanceMode() } + } - if (runtime.requiresReschedulingOfInQueueJobsOnStartup()) { - scheduleInQueueJobs() - } + private suspend fun PluginContext.registerCli() { + commandLineInterface?.addHandler(CliHandler("ucloud-compute") { args -> + val client = ipcClient + + fun sendHelp(): Nothing = sendCommandLineUsage("ucloud-compute", "UCloud Compute") { + subcommand( + "status", + "Print status of the cluster (if available). Unstable format, please submit an issue before " + + "parsing this format for production use." + ) + } + + genericCommandLineHandler { + when (args.getOrNull(0)) { + "status" -> { + val resp = client.sendRequest(Ipc.status, Unit) + sendTerminalMessage { line(resp.text) } + } + + else -> { + sendHelp() + } + } + } + }) + + ipcServerOptional?.addHandler(Ipc.status.handler { user, request -> + if (user.uid != 0) throw RPCException("Not root", HttpStatusCode.Forbidden) + + val rt = runtime + if (rt is Pod2Runtime) { + StringWrapper(rt.dumpState()) + } else { + StringWrapper("Status unavailable for this scheduler") + } + }) + } + + @Serializable + private data class StringWrapper(val text: String) + private object Ipc : IpcContainer("ucloud_compute_cli") { + val status = updateHandler("status", Unit.serializer(), StringWrapper.serializer()) } private suspend fun scheduleInQueueJobs() { @@ -239,6 +285,11 @@ class UCloudComputePlugin : ComputePlugin, SyncthingPlugin { } override suspend fun PluginContext.runMonitoringLoopInServerMode() { + if (runtime.requiresReschedulingOfInQueueJobsOnStartup()) { + scheduleInQueueJobs() + runtime.notifyReschedulingComplete() + } + while (coroutineContext.isActive) { try { jobManagement.runMonitoring() @@ -436,7 +487,10 @@ class UCloudComputePlugin : ComputePlugin, SyncthingPlugin { ?: throw RPCException("Not supported by provider", HttpStatusCode.BadRequest) } - suspend fun killJobsEnteringMaintenanceMode(dryRun: Boolean = false, dryMaintenanceMode: List = emptyList()): List { + suspend fun killJobsEnteringMaintenanceMode( + dryRun: Boolean = false, + dryMaintenanceMode: List = emptyList() + ): List { val result = ArrayList() for (job in runtime.list()) { val internalMounts = job.mountedDirectories().mapNotNull { dir -> @@ -562,22 +616,29 @@ class UCloudPublicIPPlugin : PublicIPPlugin { commandLineInterface?.addHandler(CliHandler("uc-ip-pool") { args -> val ipcClient = ipcClient - fun sendHelp(): Nothing = sendCommandLineUsage("uc-ip-pool", "View information about IPs in the public IP pool") { - subcommand("ls", "List the available ranges in the pool") + fun sendHelp(): Nothing = + sendCommandLineUsage("uc-ip-pool", "View information about IPs in the public IP pool") { + subcommand("ls", "List the available ranges in the pool") - subcommand("rm", "Delete an entry in the pool") { - arg( - "externalSubnet", - description = "The external IP subnet to remove from the pool. " + - "This will _not_ invalidate already allocated IP addresses." - ) - } + subcommand("rm", "Delete an entry in the pool") { + arg( + "externalSubnet", + description = "The external IP subnet to remove from the pool. " + + "This will _not_ invalidate already allocated IP addresses." + ) + } - subcommand("add", "Adds a new range to the pool") { - arg("externalSubnet", description = "The external IP subnet to add to the pool. For example: 10.0.0.0/24.") - arg("internalSubnet", description = "The internal IP subnet to add to the pool. For example: 10.0.0.0/24.") + subcommand("add", "Adds a new range to the pool") { + arg( + "externalSubnet", + description = "The external IP subnet to add to the pool. For example: 10.0.0.0/24." + ) + arg( + "internalSubnet", + description = "The internal IP subnet to add to the pool. For example: 10.0.0.0/24." + ) + } } - } genericCommandLineHandler { when (args.getOrNull(0)) { diff --git a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Pod2Runtime.kt b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Pod2Runtime.kt index 020491beb6..ccbf7a8017 100644 --- a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Pod2Runtime.kt +++ b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Pod2Runtime.kt @@ -24,8 +24,23 @@ import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.JsonPrimitive import kotlinx.serialization.json.contentOrNull import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference +import kotlin.random.Random + +private object PodCache { + private val allPods = AtomicReference>(emptyList()) + + fun insertPods(pods: List) { + allPods.set(pods) + } + + fun findPod(jobId: Long, rank: Int): Pod? { + val expectedName = idAndRankToPodName(jobId, rank) + return allPods.get().find { it.metadata?.name == expectedName } + } +} private object LogCache { private val emptyBuffer = ByteBuffer.allocateDirect(0) @@ -141,31 +156,14 @@ private data class Pod2Container( // This object contains a reference to a job which might not be running yet, but most likely has a pod available. // The only scenario in which the pod is gone, is if the job has terminated. override val jobId: String = internalJobId.toString() - private var cachedPod: Pod? = null private val cacheMutex = Mutex() - override val pod: Pod - get() = runBlocking { pod() } - var volcanoDoesNotExist: Boolean? = null var cachedVolcano: VolcanoJob? = null - suspend fun pod(): Pod { - val cached = cachedPod - if (cached != null) return cached - cacheMutex.withLock { - val cached2 = cachedPod - if (cached2 != null) return cached2 - - val pod = k8Client.getResource( - Pod.serializer(), - KubernetesResources.pod.withNameAndNamespace(name, namespace) - ) + override val pod: Pod by lazy { PodCache.findPod(internalJobId, rank) ?: error("no such container") } - cachedPod = pod - return pod - } - } + fun pod(): Pod = pod private suspend fun volcano(): VolcanoJob? { val cached = cachedVolcano @@ -385,8 +383,7 @@ class Pod2Runtime( private val mutex = Mutex() private val scheduler = Scheduler() - private var nextNodeScan = 0L - private var nextPodScan = 0L + private var nextResourceScan = 0L private data class JobToSchedule( val id: Long, @@ -410,15 +407,19 @@ class Pod2Runtime( private val cachedList = AtomicReference>>(null) private val cachedQueue = AtomicReference>(emptyList()) + private val didCompleteRescheduling = AtomicBoolean(false) override fun requiresReschedulingOfInQueueJobsOnStartup(): Boolean = true + override fun notifyReschedulingComplete() { + didCompleteRescheduling.set(true) + } fun start() { ProcessingScope.launch { while (isActive) { try { mutex.withLock { scheduleLoop() } - delay(20) + delay(1000) } catch (ex: Throwable) { debugSystem.logThrowable("Error while running scheduleLoop", ex) log.warn(ex.toReadableStacktrace().toString()) @@ -428,6 +429,7 @@ class Pod2Runtime( } private suspend fun scheduleLoop() { + val sectionAStart = System.currentTimeMillis() jobsToScheduleMutex.withLock { jobsToSchedule.forEach { job -> scheduler.addJobToQueue( @@ -445,6 +447,7 @@ class Pod2Runtime( jobsToSchedule.clear() } + val sectionBStart = System.currentTimeMillis() jobsToCancelMutex.withLock { jobsToCancel.forEach { job -> scheduler.removeJobFromQueue(job.id) @@ -455,77 +458,88 @@ class Pod2Runtime( } val now = Time.now() - if (now >= nextNodeScan) { - val allNodes = k8Client.listResources( - Node.serializer(), - KubernetesResources.node.withNamespace(NAMESPACE_ANY) - ).items - - fun label(node: Node, label: String): String? = - (node.metadata?.labels?.get(label) as? JsonPrimitive)?.contentOrNull - - for (node in allNodes) { - val machineType = label(node, "ucloud.dk/machine") ?: defaultNodeType ?: continue - val allocatable = node.status?.allocatable ?: continue - - // NOTE(Dan): We do not respect the "pods" capacity. The provider must ensure that we can schedule a - // full machine using the smallest product configuration without hitting this limit. - scheduler.registerNode( - node.metadata!!.name!!, - machineType, - cpuStringToMilliCpus(allocatable.cpu), - memoryStringToBytes(allocatable.memory), - gpuStringToFullGpus(allocatable.nvidiaGpu) - ) - } - - scheduler.pruneNodes() - nextNodeScan = now + (1000 * 60 * 15L) - } - if (now >= nextPodScan) { - val allPods = k8Client.listResources( - Pod.serializer(), - KubernetesResources.pod.withNamespace(namespace) - ).items - - for (pod in allPods) { - val podName = pod.metadata!!.name!! - val jobAndRank = podNameToIdAndRank(podName) - if (jobAndRank == null) { - log.warn("Killing unknown pod: $pod") - k8Client.deleteResource(KubernetesResources.pod.withNameAndNamespace(podName, namespace)) - continue + val sectionCStart = System.currentTimeMillis() + var sectionDStart = sectionCStart + if (now >= nextResourceScan) { + run { + // Scan nodes + val allNodes = k8Client.listResources( + Node.serializer(), + KubernetesResources.node.withNamespace(NAMESPACE_ANY) + ).items + + fun label(node: Node, label: String): String? = + (node.metadata?.labels?.get(label) as? JsonPrimitive)?.contentOrNull + + for (node in allNodes) { + val machineType = label(node, "ucloud.dk/machine") ?: defaultNodeType ?: continue + val allocatable = node.status?.allocatable ?: continue + + // NOTE(Dan): We do not respect the "pods" capacity. The provider must ensure that we can schedule a + // full machine using the smallest product configuration without hitting this limit. + scheduler.registerNode( + node.metadata!!.name!!, + machineType, + cpuStringToMilliCpus(allocatable.cpu), + memoryStringToBytes(allocatable.memory), + gpuStringToFullGpus(allocatable.nvidiaGpu), + node.spec?.unschedulable == true + ) } - val (jobId, rank) = jobAndRank - if (scheduler.findRunningReplica(jobId, rank, touch = true) == null) { - val podLimits = pod.spec?.containers?.firstOrNull()?.resources?.limits - if (podLimits == null) { - log.warn("Pod without resource limits: $pod") + scheduler.pruneNodes() + } + + sectionDStart = System.currentTimeMillis() + run { + // Scan pods + val allPods = k8Client.listResources( + Pod.serializer(), + KubernetesResources.pod.withNamespace(namespace) + ).items + + for (pod in allPods) { + val podName = pod.metadata!!.name!! + val jobAndRank = podNameToIdAndRank(podName) + if (jobAndRank == null) { + log.warn("Killing unknown pod: $pod") k8Client.deleteResource(KubernetesResources.pod.withNameAndNamespace(podName, namespace)) continue } - fun limit(name: String): String? = (podLimits.get(name) as? JsonPrimitive)?.contentOrNull + val (jobId, rank) = jobAndRank + if (scheduler.findRunningReplica(jobId, rank, touch = true) == null) { + val podLimits = pod.spec?.containers?.firstOrNull()?.resources?.limits + if (podLimits == null) { + log.warn("Pod without resource limits: $pod") + k8Client.deleteResource(KubernetesResources.pod.withNameAndNamespace(podName, namespace)) + continue + } - scheduler.addRunningReplica( - jobId, - rank, - cpuStringToMilliCpus(limit("cpu")), - memoryStringToBytes(limit("memory")), - gpuStringToFullGpus(limit("nvidia.com/gpu")), - pod.spec!!.nodeName!!, - Pod2Data.AlreadyScheduled - ) + fun limit(name: String): String? = (podLimits.get(name) as? JsonPrimitive)?.contentOrNull + + scheduler.addRunningReplica( + jobId, + rank, + cpuStringToMilliCpus(limit("cpu")), + memoryStringToBytes(limit("memory")), + gpuStringToFullGpus(limit("nvidia.com/gpu")), + pod.spec!!.nodeName!!, + Pod2Data.AlreadyScheduled + ) + } } - } - scheduler.pruneJobs() - nextPodScan = now + (1000 * 30L) + scheduler.pruneJobs() + PodCache.insertPods(allPods) + nextResourceScan = now + (1000 * 10L) + } } + val sectionEStart = System.currentTimeMillis() val scheduledJobs = scheduler.schedule() + val sectionFStart = System.currentTimeMillis() val scheduledToNodes = HashMap>() for (job in scheduledJobs) { val data = job.data @@ -604,6 +618,7 @@ class Pod2Runtime( } } + val sectionGStart = System.currentTimeMillis() for ((jobId, nodeSet) in scheduledToNodes) { val compacted = ArrayList() @@ -648,8 +663,24 @@ class Pod2Runtime( ) } - cachedQueue.set(scheduler.jobsInQueue().asSequence().toList()) + val sectionHStart = System.currentTimeMillis() cachedList.set(scheduler.runningReplicas().asSequence().toList()) + cachedQueue.set(scheduler.jobsInQueue().asSequence().toList()) + val sectionIStart = System.currentTimeMillis() + + if ((sectionIStart - sectionAStart > 500 && Random.nextInt(100) <= 10) || Random.nextInt(1000) == 1) { + log.info(buildString { + appendLine("Schedule loop complete") + appendLine(" Section A: ${sectionBStart - sectionAStart}") + appendLine(" Section B: ${sectionCStart - sectionBStart}") + appendLine(" Section C: ${sectionDStart - sectionCStart}") + appendLine(" Section D: ${sectionEStart - sectionDStart}") + appendLine(" Section E: ${sectionFStart - sectionEStart}") + appendLine(" Section F: ${sectionGStart - sectionFStart}") + appendLine(" Section G: ${sectionHStart - sectionGStart}") + appendLine(" Section H: ${sectionIStart - sectionHStart}") + }) + } } private val notNumbers = Regex("[^0-9]") @@ -679,11 +710,11 @@ class Pod2Runtime( return when { memory.isNullOrBlank() -> 0 - memory.contains("Ki") -> notNumbers.replace(memory, "").toLong() * ki - memory.contains("Mi") -> notNumbers.replace(memory, "").toLong() * mi - memory.contains("Gi") -> notNumbers.replace(memory, "").toLong() * gi - memory.contains("Ti") -> notNumbers.replace(memory, "").toLong() * ti - memory.contains("Pi") -> notNumbers.replace(memory, "").toLong() * pi + memory.contains("K") || memory.contains("Ki") -> notNumbers.replace(memory, "").toLong() * ki + memory.contains("M") || memory.contains("Mi") -> notNumbers.replace(memory, "").toLong() * mi + memory.contains("G") || memory.contains("Gi") -> notNumbers.replace(memory, "").toLong() * gi + memory.contains("T") || memory.contains("Ti") -> notNumbers.replace(memory, "").toLong() * ti + memory.contains("P") || memory.contains("Pi") -> notNumbers.replace(memory, "").toLong() * pi else -> notNumbers.replace(memory, "").toLong() } } @@ -796,6 +827,7 @@ class Pod2Runtime( override suspend fun isJobKnown(jobId: String): Boolean { val jobIdLong = jobId.toLongOrNull() ?: return false + if (!didCompleteRescheduling.get()) return true val isKnownInScheduleQueue = jobsToScheduleMutex.withLock { jobsToSchedule.any { it.id == jobIdLong } @@ -811,6 +843,10 @@ class Pod2Runtime( } } + suspend fun dumpState(): String { + return mutex.withLock { scheduler.dumpState() } + } + companion object : Loggable { override val log = logger() diff --git a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Scheduler.kt b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Scheduler.kt index d33dfab694..f6838b2015 100644 --- a/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Scheduler.kt +++ b/provider-integration/integration-module/src/main/kotlin/plugins/compute/ucloud/Scheduler.kt @@ -22,7 +22,9 @@ import dk.sdu.cloud.app.store.api.WebDescription import dk.sdu.cloud.app.store.api.WordInvocationParameter import dk.sdu.cloud.defaultMapper import dk.sdu.cloud.provider.api.ResourceOwner +import dk.sdu.cloud.service.Loggable import dk.sdu.cloud.utils.sendTerminalMessage +import org.slf4j.Logger import java.lang.IllegalStateException import java.util.* import kotlin.collections.ArrayList @@ -50,7 +52,11 @@ class Scheduler { @JvmField var cpu: Int = 0, @JvmField var memory: Long = 0, @JvmField var nvidiaGpu: Int = 0, + @JvmField var initialCpu: Int = 0, + @JvmField var initialMemory: Long = 0, + @JvmField var initialNvidiaGpu: Int = 0, @JvmField var lastSeen: Long = 0, + @JvmField var unscheduable: Boolean = false, ) private var nextQueueIdx = 0 @@ -78,7 +84,17 @@ class Scheduler { @JvmField var node: Int = 0, @JvmField var lastSeen: Long = 0, @JvmField var data: UserData? = null, - ) + ) { + override fun toString(): String { + return buildString { + append("rank = $rank, ") + append("cpu = $cpu, ") + append("memory = $memory, ") + append("nvidiaGpu = $nvidiaGpu, ") + append("lastSeen = $lastSeen, ") + } + } + } private var activeReplicas = 0 private var activeNodes = 0 @@ -91,11 +107,13 @@ class Scheduler { type: String, virtualCpuMillis: Int, memoryInBytes: Long, - nvidiaGpus: Int = 0 + nvidiaGpus: Int, + isUnschedulable: Boolean, ) { val existing = nodeNames.indexOf(name) if (existing != -1) { nodeEntries[existing].lastSeen = time + nodeEntries[existing].unscheduable = isUnschedulable return } @@ -117,7 +135,11 @@ class Scheduler { this.cpu = virtualCpuMillis this.memory = memoryInBytes this.nvidiaGpu = nvidiaGpus + this.initialCpu = virtualCpuMillis + this.initialMemory = memoryInBytes + this.initialNvidiaGpu = nvidiaGpus this.lastSeen = time + this.unscheduable = isUnschedulable activeNodes++ } @@ -144,6 +166,38 @@ class Scheduler { } activeNodes-- + } else { + // NOTE(Dan): Recalculate node state and warn if we have drifted (once we are confident that we + // don't drift, then we can disable this code in production mode) + val cpuWas = entry.cpu + val memoryWas = entry.memory + val nvidiaGpuWas = entry.nvidiaGpu + + entry.cpu = entry.initialCpu + entry.memory = entry.initialMemory + entry.nvidiaGpu = entry.nvidiaGpu + + for (rIdx in replicaEntries.indices) { + val replica = replicaEntries[rIdx] + if (replicaJobId[rIdx] != 0L && replica.node == idx) { + entry.cpu -= replica.cpu + entry.memory -= replica.memory + entry.nvidiaGpu -= replica.nvidiaGpu + } + } + + if (entry.cpu != cpuWas) { + log.warn("CPU state has drifted for ${nodeNames[idx]} was $cpuWas should be ${entry.cpu}") + } + + if (entry.cpu != cpuWas) { + log.warn("Memory state has drifted for ${nodeNames[idx]} was $memoryWas should be ${entry.memory}") + } + + if (entry.cpu != cpuWas) { + log.warn("NVIDIA GPU state has drifted for ${nodeNames[idx]} was " + + "$nvidiaGpuWas should be ${entry.nvidiaGpu}") + } } } return result @@ -354,6 +408,7 @@ class Scheduler { private fun nodeSatisfiesRequest(node: Int, request: Int): Boolean { val nodeEntry = nodeEntries[node] val queueEntry = queueEntries[request] + if (nodeEntry.unscheduable) return false if (nodeEntry.type != queueEntry.type) return false if (nodeEntry.cpu < queueEntry.cpu) return false if (nodeEntry.memory < queueEntry.memory) return false @@ -503,27 +558,37 @@ class Scheduler { return scheduledJobs } - fun dumpState(replicas: Boolean = true, nodes: Boolean = true) { - if (replicas) { - for (i in 0 until replicaJobId.size) { - val jobId = replicaJobId[i] - if (jobId == 0L) continue - sendTerminalMessage { line("$jobId[${replicaEntries[i]}] ${nodeNames[replicaEntries[i].node]}") } + fun dumpState(replicas: Boolean = true, nodes: Boolean = true): String { + return buildString { + appendLine("Current time: $time") + appendLine() + + if (replicas) { + appendLine("Running jobs (one line per rank):") + for (i in 0 until replicaJobId.size) { + val jobId = replicaJobId[i] + if (jobId == 0L) continue + appendLine("Job $jobId on ${nodeNames[replicaEntries[i].node]}: ${replicaEntries[i]} ") + } } - } - if (nodes) { - for (i in 0 until nodeNames.size) { - val name = nodeNames[i] - if (name == null) continue - sendTerminalMessage { line("${name} ${nodeEntries[i]}") } + if (nodes) { + if (replicas) appendLine() + appendLine("Nodes:") + for (i in 0 until nodeNames.size) { + val name = nodeNames[i] + if (name == null) continue + appendLine("${name} ${nodeEntries[i]}") + } } } } - companion object { + companion object : Loggable { const val MAX_NODES = 1024 const val MAX_JOBS_IN_QUEUE = MAX_NODES * 8 const val MAX_NODES_TO_CONSIDER = 128 + + override val log = logger() } }