Skip to content

Commit

Permalink
WX-1252 Runtime attributes cleanup – CWL runtime environment (#7369)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored Feb 26, 2024
1 parent 84f5595 commit 3713d16
Show file tree
Hide file tree
Showing 13 changed files with 29 additions and 145 deletions.
10 changes: 2 additions & 8 deletions backend/src/main/scala/cromwell/backend/Command.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cromwell.backend
import common.validation.ErrorOr._
import common.validation.Validation._
import wom.InstantiatedCommand
import wom.callable.RuntimeEnvironment
import wom.expression.IoFunctionSet
import wom.values.{WomEvaluatedCallInputs, WomValue}

Expand All @@ -26,14 +25,9 @@ object Command {
callEngineFunction: IoFunctionSet,
inputsPreProcessor: WomEvaluatedCallInputs => Try[WomEvaluatedCallInputs] =
(i: WomEvaluatedCallInputs) => Success(i),
valueMapper: WomValue => WomValue,
runtimeEnvironment: RuntimeEnvironment
valueMapper: WomValue => WomValue
): ErrorOr[InstantiatedCommand] =
inputsPreProcessor(jobDescriptor.evaluatedTaskInputs).toErrorOr flatMap { mappedInputs =>
jobDescriptor.taskCall.callable.instantiateCommand(mappedInputs,
callEngineFunction,
valueMapper,
runtimeEnvironment
)
jobDescriptor.taskCall.callable.instantiateCommand(mappedInputs, callEngineFunction, valueMapper)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cromwell.backend

import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric.Positive
import eu.timepit.refined.refineMV
import wdl4s.parser.MemoryUnit
import wom.format.MemorySize

case class MinimumRuntimeSettings(cores: Int Refined Positive = refineMV(1),
ram: MemorySize = MemorySize(4, MemoryUnit.GB),
outputPathSize: Long = Long.MaxValue,
tempPathSize: Long = Long.MaxValue
)
64 changes: 0 additions & 64 deletions backend/src/main/scala/cromwell/backend/RuntimeEnvironment.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import net.ceedubs.ficus.Ficus._
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import shapeless.Coproduct
import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpression, RuntimeEnvironment}
import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpression}
import wom.expression.WomExpression
import wom.graph.LocalName
import wom.values._
Expand Down Expand Up @@ -141,7 +141,7 @@ trait StandardAsyncExecutionActor

lazy val temporaryDirectory: String = configurationDescriptor.backendConfig.getOrElse(
path = "temporary-directory",
default = s"""$$(mkdir -p "${runtimeEnvironment.tempPath}" && echo "${runtimeEnvironment.tempPath}")"""
default = """$(mktemp -d "$PWD"/tmp.XXXXXX)"""
)

val logJobIds: Boolean = true
Expand Down Expand Up @@ -431,7 +431,6 @@ trait StandardAsyncExecutionActor
s"""export $k="$v""""
} mkString ("", "\n", "\n")

val home = jobDescriptor.taskCall.callable.homeOverride.map(_(runtimeEnvironment)).getOrElse("$HOME")
val shortId = jobDescriptor.workflowDescriptor.id.shortString
// Give the out and error FIFO variables names that are unlikely to conflict with anything the user is doing.
val (out, err) = (s"out$shortId", s"err$shortId")
Expand Down Expand Up @@ -479,7 +478,6 @@ trait StandardAsyncExecutionActor
|$tmpDirPermissionsAdjustment
|export _JAVA_OPTIONS=-Djava.io.tmpdir="$$tmpDir"
|export TMPDIR="$$tmpDir"
|export HOME="$home"
|
|SCRIPT_PREAMBLE
|
Expand Down Expand Up @@ -512,16 +510,6 @@ trait StandardAsyncExecutionActor
)
}

def runtimeEnvironmentPathMapper(env: RuntimeEnvironment): RuntimeEnvironment = {
def localize(path: String): String = (WomSingleFile(path) |> commandLineValueMapper).valueString
env.copy(outputPath = env.outputPath |> localize, tempPath = env.tempPath |> localize)
}

lazy val runtimeEnvironment: RuntimeEnvironment =
RuntimeEnvironmentBuilder(jobDescriptor.runtimeAttributes, jobPaths)(
standardParams.minimumRuntimeSettings
) |> runtimeEnvironmentPathMapper

/**
* Turns WomFiles into relative paths. These paths are relative to the working disk.
*
Expand Down Expand Up @@ -685,8 +673,7 @@ trait StandardAsyncExecutionActor
jobDescriptor,
backendEngineFunctions,
mutatingPreProcessor,
commandLineValueMapper,
runtimeEnvironment
commandLineValueMapper
)

def makeStringKeyedMap(list: List[(LocalName, WomValue)]): Map[String, WomValue] = list.toMap map { case (k, v) =>
Expand Down
2 changes: 2 additions & 0 deletions docs/tutorials/HPCSlurmWithLocalScratch.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Installing the Cromwell To Use Local Scratch Device
#### These instructions are a community contribution

### In the process of being updated as of 2024-02

In this section we will install the Cromwell Workflow Management system and configure it, so it can use the local scratch device on the compute nodes.
(these installations are done in a ```centos 8``` enviornment)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.commons.csv.{CSVFormat, CSVPrinter}
import org.apache.commons.io.output.ByteArrayOutputStream
import wom.callable.Callable.OutputDefinition
import wom.callable.MetaValueElement.{MetaValueElementBoolean, MetaValueElementObject}
import wom.callable.{AdHocValue, RuntimeEnvironment}
import wom.callable.AdHocValue
import wom.core.FullyQualifiedName
import wom.expression.{FileEvaluation, NoIoFunctionSet}
import wom.values._
Expand Down Expand Up @@ -992,12 +992,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
override lazy val executeOrRecoverBackOff: SimpleExponentialBackoff =
SimpleExponentialBackoff(initialInterval = 5.seconds, maxInterval = 20.seconds, multiplier = 1.1)

override lazy val runtimeEnvironment: RuntimeEnvironment =
RuntimeEnvironmentBuilder(jobDescriptor.runtimeAttributes,
GcpBatchWorkingDisk.MountPoint,
GcpBatchWorkingDisk.MountPoint
)(standardParams.minimumRuntimeSettings)

protected def sendIncrementMetricsForReferenceFiles(referenceInputFilesOpt: Option[Set[GcpBatchInput]]): Unit =
referenceInputFilesOpt match {
case Some(referenceInputFiles) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import mouse.all._
import shapeless.Coproduct
import wom.callable.Callable.OutputDefinition
import wom.callable.MetaValueElement.{MetaValueElementBoolean, MetaValueElementObject}
import wom.callable.{AdHocValue, RuntimeEnvironment}
import wom.callable.AdHocValue
import wom.core.FullyQualifiedName
import wom.expression.{FileEvaluation, NoIoFunctionSet}
import wom.types.{WomArrayType, WomSingleFileType}
Expand Down Expand Up @@ -146,12 +146,6 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
override lazy val executeOrRecoverBackOff: SimpleExponentialBackoff =
SimpleExponentialBackoff(initialInterval = 3 seconds, maxInterval = 20 seconds, multiplier = 1.1)

override lazy val runtimeEnvironment: RuntimeEnvironment =
RuntimeEnvironmentBuilder(jobDescriptor.runtimeAttributes,
PipelinesApiWorkingDisk.MountPoint,
PipelinesApiWorkingDisk.MountPoint
)(standardParams.minimumRuntimeSettings)

protected lazy val cmdInput: PipelinesApiFileInput =
PipelinesApiFileInput(PipelinesApiJobPaths.JesExecParamName,
pipelinesApiCallPaths.script,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import java.nio.file.FileAlreadyExistsException
import java.time.Instant

import common.validation.Validation._
import cromwell.backend.RuntimeEnvironmentBuilder
import cromwell.backend.impl.sfs.config.ConfigConstants._
import cromwell.backend.sfs._
import cromwell.backend.standard.{StandardAsyncExecutionActorParams, StandardAsyncJob}
Expand Down Expand Up @@ -104,10 +103,8 @@ sealed trait ConfigAsyncJobExecutionActor extends SharedFileSystemAsyncJobExecut
if !inputs.contains(optional.localName.value)
} yield optional -> WomOptionalValue.none(optional.womType.memberType)

val runtimeEnvironment =
RuntimeEnvironmentBuilder(jobDescriptor.runtimeAttributes, jobPaths)(standardParams.minimumRuntimeSettings)
val allInputs = providedWomInputs ++ optionalsForciblyInitializedToNone
val womInstantiation = taskDefinition.instantiateCommand(allInputs, NoIoFunctionSet, identity, runtimeEnvironment)
val womInstantiation = taskDefinition.instantiateCommand(allInputs, NoIoFunctionSet, identity)

val command = womInstantiation.toTry.get.commandString
jobLogger.info(s"executing: $command")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import common.validation.ErrorOr.ErrorOr
import wdl.draft2.model.Declaration
import wdl.draft2.model.expression.{WdlFunctions, WdlStandardLibraryFunctions}
import wdl.shared.FileSizeLimitationConfig
import wom.callable.RuntimeEnvironment
import wom.expression.IoFunctionSet
import wom.graph.LocalName
import wom.values.WomValue
Expand All @@ -20,8 +19,7 @@ trait WdlCommandPart extends CommandPart {

override def instantiate(inputsMap: Map[LocalName, WomValue],
functions: IoFunctionSet,
valueMapper: WomValue => WomValue,
runtimeEnvironment: RuntimeEnvironment
valueMapper: WomValue => WomValue
): ErrorOr[List[InstantiatedCommand]] = {
val wdlFunctions =
WdlStandardLibraryFunctions.fromIoFunctionSet(functions, FileSizeLimitationConfig.fileSizeLimitationConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import wdl.model.draft3.graph.{ExpressionValueConsumer, GeneratedValueHandle}
import wdl.model.draft3.graph.expression._
import wdl.transforms.base.linking.graph.LinkedGraphMaker
import wdl.transforms.base.wdlom2wom.expression.WdlomWomExpression
import wom.callable.RuntimeEnvironment
import wom.expression.{IoFunctionSet, WomExpression}
import wom.graph.LocalName
import wom.types.{WomArrayType, WomPrimitiveType, WomType}
Expand Down Expand Up @@ -52,8 +51,7 @@ case class WdlomWomStringCommandPart(stringCommandPartElement: StringCommandPart
override def toString: String = stringCommandPartElement.value
override def instantiate(inputsMap: Map[LocalName, WomValue],
functions: IoFunctionSet,
valueMapper: WomValue => WomValue,
runtimeEnvironment: RuntimeEnvironment
valueMapper: WomValue => WomValue
): ErrorOr[List[InstantiatedCommand]] = List(InstantiatedCommand(stringCommandPartElement.value)).validNel
}

Expand All @@ -65,8 +63,7 @@ case class WdlomWomPlaceholderCommandPart(attributes: PlaceholderAttributeSet, e

override def instantiate(inputsMap: Map[LocalName, WomValue],
functions: IoFunctionSet,
valueMapper: WomValue => WomValue,
runtimeEnvironment: RuntimeEnvironment
valueMapper: WomValue => WomValue
): ErrorOr[List[InstantiatedCommand]] = {
val inputsValues = inputsMap map { case (localName, value) => localName.value -> value }
expression.evaluateValueForPlaceholder(inputsValues,
Expand Down
4 changes: 1 addition & 3 deletions wom/src/main/scala/wom/CommandPart.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package wom

import common.validation.ErrorOr.ErrorOr
import wom.callable.RuntimeEnvironment
import wom.expression.IoFunctionSet
import wom.graph.LocalName
import wom.values.WomValue

trait CommandPart {
def instantiate(inputsMap: Map[LocalName, WomValue],
functions: IoFunctionSet,
valueMapper: WomValue => WomValue,
runtimeEnvironment: RuntimeEnvironment
valueMapper: WomValue => WomValue
): ErrorOr[List[InstantiatedCommand]]
}
8 changes: 2 additions & 6 deletions wom/src/main/scala/wom/callable/CommandTaskDefinition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ sealed trait CommandTaskDefinition extends TaskDefinition {
def adHocFileCreation: Set[ContainerizedInputExpression]
def environmentExpressions: Map[String, WomExpression]
def additionalGlob: Option[WomGlobFile]
def homeOverride: Option[RuntimeEnvironment => String]

/**
* Provides a custom way to evaluate outputs of the task definition.
Expand All @@ -104,8 +103,7 @@ sealed trait CommandTaskDefinition extends TaskDefinition {

def instantiateCommand(taskInputs: WomEvaluatedCallInputs,
functions: IoFunctionSet,
valueMapper: WomValue => WomValue,
runtimeEnvironment: RuntimeEnvironment
valueMapper: WomValue => WomValue
): ErrorOr[InstantiatedCommand] = {

val inputsByLocalName = taskInputs map { case (k, v) => k.localName -> v }
Expand All @@ -115,7 +113,7 @@ sealed trait CommandTaskDefinition extends TaskDefinition {
// Just raw command parts, no separators.
val rawCommandParts: List[ErrorOr[InstantiatedCommand]] =
commandTemplate(taskInputs).toList.flatMap { commandPart =>
commandPart.instantiate(inputsByLocalName, functions, valueMapper, runtimeEnvironment).sequence
commandPart.instantiate(inputsByLocalName, functions, valueMapper).sequence
}

// Add separator command parts and monoid smash down to one `ErrorOr[InstantiatedCommand]`.
Expand Down Expand Up @@ -163,7 +161,6 @@ final case class CallableTaskDefinition(name: String,
additionalGlob: Option[WomGlobFile] = None,
private[wom] val customizedOutputEvaluation: OutputEvaluationFunction =
OutputEvaluationFunction.none,
homeOverride: Option[RuntimeEnvironment => String] = None,
dockerOutputDirectory: Option[String] = None,
override val sourceLocation: Option[SourceFileLocation]
) extends CommandTaskDefinition {
Expand Down Expand Up @@ -198,7 +195,6 @@ final case class ExecutableTaskDefinition private (callableTaskDefinition: Calla
override def additionalGlob = callableTaskDefinition.additionalGlob
override private[wom] def customizedOutputEvaluation = callableTaskDefinition.customizedOutputEvaluation
override def toExecutable = this.validNel
override def homeOverride = callableTaskDefinition.homeOverride
override def dockerOutputDirectory = callableTaskDefinition.dockerOutputDirectory
}

Expand Down
22 changes: 0 additions & 22 deletions wom/src/main/scala/wom/callable/RuntimeEnvironment.scala

This file was deleted.

0 comments on commit 3713d16

Please sign in to comment.