Skip to content

Commit

Permalink
Version 2023.2.8
Browse files Browse the repository at this point in the history
  • Loading branch information
DanThrane committed May 10, 2023
1 parent 3a89659 commit a042921
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 123 deletions.
2 changes: 1 addition & 1 deletion backend/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2023.2.3
2023.2.8
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ interface ContainerRuntime {
}

fun requiresReschedulingOfInQueueJobsOnStartup(): Boolean = false
fun notifyReschedulingComplete() {}
}

data class Tunnel(val hostnameOrIpAddress: String, val port: Int, val close: suspend () -> Unit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class JobManagement(
val actualJob = JobsControl.retrieve.call(
ResourceRetrieveRequest(
JobIncludeFlags(
includeProduct = true,
includeApplication = true,
includeParameters = true,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,8 @@ data class Node(
var apiVersion: String = "v1",
var kind: String = "Node",
var metadata: ObjectMeta? = null,
var status: Status? = null
var status: Status? = null,
var spec: Spec? = null,
) {
@Serializable
data class Status(
Expand All @@ -704,6 +705,11 @@ data class Node(
@SerialName("nvidia.com/gpu")
var nvidiaGpu: String? = null,
)

@Serializable
data class Spec(
var unschedulable: Boolean? = null
)
}

@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import dk.sdu.cloud.controllers.ComputeSessionIpc
import dk.sdu.cloud.controllers.RequestContext
import dk.sdu.cloud.dbConnection
import dk.sdu.cloud.ipc.IpcContainer
import dk.sdu.cloud.ipc.IpcHandler
import dk.sdu.cloud.ipc.handler
import dk.sdu.cloud.ipc.sendRequest
import dk.sdu.cloud.logThrowable
Expand All @@ -34,6 +35,7 @@ import dk.sdu.cloud.plugins.ipcServer
import dk.sdu.cloud.plugins.rpcClient
import dk.sdu.cloud.plugins.storage.ucloud.UCloudFilePlugin
import dk.sdu.cloud.utils.forEachGraal
import dk.sdu.cloud.utils.sendTerminalMessage
import dk.sdu.cloud.utils.sendTerminalTable
import dk.sdu.cloud.utils.whileGraal
import kotlinx.coroutines.DelicateCoroutinesApi
Expand All @@ -42,6 +44,7 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.selects.select
import kotlinx.serialization.Serializable
import kotlinx.serialization.builtins.serializer
import kotlin.coroutines.coroutineContext

Expand Down Expand Up @@ -71,12 +74,15 @@ class UCloudComputePlugin : ComputePlugin, SyncthingPlugin {

@OptIn(DelicateCoroutinesApi::class)
override suspend fun PluginContext.initialize() {
registerCli()
if (!config.shouldRunServerCode()) return

files = config.plugins.files[pluginName] as? UCloudFilePlugin
?: run {
error("Must have storage configured for the UCloud compute plugin ($pluginName). " +
"It appears that '${pluginName}' does not point to a valid UCloud storage plugin.")
error(
"Must have storage configured for the UCloud compute plugin ($pluginName). " +
"It appears that '${pluginName}' does not point to a valid UCloud storage plugin."
)
}

jobCache = VerifiedJobCache(rpcClient)
Expand Down Expand Up @@ -212,10 +218,50 @@ class UCloudComputePlugin : ComputePlugin, SyncthingPlugin {
files.driveLocator.onEnteringMaintenanceMode {
killJobsEnteringMaintenanceMode()
}
}

if (runtime.requiresReschedulingOfInQueueJobsOnStartup()) {
scheduleInQueueJobs()
}
private suspend fun PluginContext.registerCli() {
commandLineInterface?.addHandler(CliHandler("ucloud-compute") { args ->
val client = ipcClient

fun sendHelp(): Nothing = sendCommandLineUsage("ucloud-compute", "UCloud Compute") {
subcommand(
"status",
"Print status of the cluster (if available). Unstable format, please submit an issue before " +
"parsing this format for production use."
)
}

genericCommandLineHandler {
when (args.getOrNull(0)) {
"status" -> {
val resp = client.sendRequest(Ipc.status, Unit)
sendTerminalMessage { line(resp.text) }
}

else -> {
sendHelp()
}
}
}
})

ipcServerOptional?.addHandler(Ipc.status.handler { user, request ->
if (user.uid != 0) throw RPCException("Not root", HttpStatusCode.Forbidden)

val rt = runtime
if (rt is Pod2Runtime) {
StringWrapper(rt.dumpState())
} else {
StringWrapper("Status unavailable for this scheduler")
}
})
}

@Serializable
private data class StringWrapper(val text: String)
private object Ipc : IpcContainer("ucloud_compute_cli") {
val status = updateHandler("status", Unit.serializer(), StringWrapper.serializer())
}

private suspend fun scheduleInQueueJobs() {
Expand All @@ -239,6 +285,11 @@ class UCloudComputePlugin : ComputePlugin, SyncthingPlugin {
}

override suspend fun PluginContext.runMonitoringLoopInServerMode() {
if (runtime.requiresReschedulingOfInQueueJobsOnStartup()) {
scheduleInQueueJobs()
runtime.notifyReschedulingComplete()
}

while (coroutineContext.isActive) {
try {
jobManagement.runMonitoring()
Expand Down Expand Up @@ -436,7 +487,10 @@ class UCloudComputePlugin : ComputePlugin, SyncthingPlugin {
?: throw RPCException("Not supported by provider", HttpStatusCode.BadRequest)
}

suspend fun killJobsEnteringMaintenanceMode(dryRun: Boolean = false, dryMaintenanceMode: List<Long> = emptyList()): List<String> {
suspend fun killJobsEnteringMaintenanceMode(
dryRun: Boolean = false,
dryMaintenanceMode: List<Long> = emptyList()
): List<String> {
val result = ArrayList<String>()
for (job in runtime.list()) {
val internalMounts = job.mountedDirectories().mapNotNull { dir ->
Expand Down Expand Up @@ -562,22 +616,29 @@ class UCloudPublicIPPlugin : PublicIPPlugin {
commandLineInterface?.addHandler(CliHandler("uc-ip-pool") { args ->
val ipcClient = ipcClient

fun sendHelp(): Nothing = sendCommandLineUsage("uc-ip-pool", "View information about IPs in the public IP pool") {
subcommand("ls", "List the available ranges in the pool")
fun sendHelp(): Nothing =
sendCommandLineUsage("uc-ip-pool", "View information about IPs in the public IP pool") {
subcommand("ls", "List the available ranges in the pool")

subcommand("rm", "Delete an entry in the pool") {
arg(
"externalSubnet",
description = "The external IP subnet to remove from the pool. " +
"This will _not_ invalidate already allocated IP addresses."
)
}
subcommand("rm", "Delete an entry in the pool") {
arg(
"externalSubnet",
description = "The external IP subnet to remove from the pool. " +
"This will _not_ invalidate already allocated IP addresses."
)
}

subcommand("add", "Adds a new range to the pool") {
arg("externalSubnet", description = "The external IP subnet to add to the pool. For example: 10.0.0.0/24.")
arg("internalSubnet", description = "The internal IP subnet to add to the pool. For example: 10.0.0.0/24.")
subcommand("add", "Adds a new range to the pool") {
arg(
"externalSubnet",
description = "The external IP subnet to add to the pool. For example: 10.0.0.0/24."
)
arg(
"internalSubnet",
description = "The internal IP subnet to add to the pool. For example: 10.0.0.0/24."
)
}
}
}

genericCommandLineHandler {
when (args.getOrNull(0)) {
Expand Down
Loading

0 comments on commit a042921

Please sign in to comment.