Skip to content

Commit

Permalink
Reference disk usage metric + refactoring [BT-20] (#6145)
Browse files Browse the repository at this point in the history
  • Loading branch information
Grigoriy Sterin authored Jan 11, 2021
1 parent eebe0aa commit a49e1fc
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.net.SocketTimeoutException
import _root_.io.grpc.Status
import akka.actor.ActorRef
import akka.http.scaladsl.model.{ContentType, ContentTypes}
import cats.data.NonEmptyList
import cats.data.Validated.{Invalid, Valid}
import cats.syntax.validated._
import com.google.api.client.googleapis.json.GoogleJsonResponseException
Expand Down Expand Up @@ -53,7 +54,7 @@ import wom.values._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Success, Try}
import scala.util.{Failure, Success, Try}

object PipelinesApiAsyncBackendJobExecutionActor {
val JesOperationIdKey = "__jes_operation_id"
Expand Down Expand Up @@ -543,13 +544,25 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
cloudPath: Path,
transferLibraryContainerPath: Path,
gcsTransferConfiguration: GcsTransferConfiguration,
referenceFileToDiskImageMapping: Option[Map[String, PipelinesApiReferenceFilesDisk]]): Future[Unit] = Future.successful(())
referenceInputsToMountedPathsOpt: Option[Map[PipelinesApiInput, String]]): Future[Unit] = Future.successful(())

protected def uploadGcsDelocalizationScript(createPipelineParameters: CreatePipelineParameters,
cloudPath: Path,
transferLibraryContainerPath: Path,
gcsTransferConfiguration: GcsTransferConfiguration): Future[Unit] = Future.successful(())

protected val useReferenceDisks: Boolean = {
val optionName = WorkflowOptions.UseReferenceDisks.name
workflowDescriptor.workflowOptions.getBoolean(optionName) match {
case Success(value) => value
case Failure(OptionNotFoundException(_)) => false
case Failure(f) =>
// Should not happen, this case should have been screened for and fast-failed during workflow materialization.
log.error(f, s"Programmer error: unexpected failure attempting to read value for workflow option '$optionName' as a Boolean")
false
}
}

private def createNewJob(): Future[ExecutionHandle] = {
// Want to force runtimeAttributes to evaluate so we can fail quickly now if we need to:
def evaluateRuntimeAttributes = Future.fromTry(Try(runtimeAttributes))
Expand Down Expand Up @@ -606,6 +619,16 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
tellMetadata(backendLabelEvents)
}

def getReferenceInputsToMountedPathsOpt(createPipelinesParameters: CreatePipelineParameters): Option[Map[PipelinesApiInput, String]] = {
if (useReferenceDisks) {
jesAttributes
.referenceFileToDiskImageMappingOpt
.map(getReferenceInputsToMountedPathMappings(_, createPipelinesParameters.inputOutputParameters.fileInputParameters))
} else {
None
}
}

val runPipelineResponse = for {
_ <- evaluateRuntimeAttributes
_ <- uploadScriptFile
Expand All @@ -617,12 +640,14 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
transferLibraryContainerPath = createParameters.commandScriptContainerPath.sibling(GcsTransferLibraryName)
_ <- uploadGcsTransferLibrary(createParameters, gcsTransferLibraryCloudPath, gcsTransferConfiguration)
gcsLocalizationScriptCloudPath = jobPaths.callExecutionRoot / PipelinesApiJobPaths.GcsLocalizationScriptName
_ <- uploadGcsLocalizationScript(createParameters, gcsLocalizationScriptCloudPath, transferLibraryContainerPath, gcsTransferConfiguration, jesAttributes.referenceFileToDiskImageMappingOpt)
referenceInputsToMountedPathsOpt = getReferenceInputsToMountedPathsOpt(createParameters)
_ <- uploadGcsLocalizationScript(createParameters, gcsLocalizationScriptCloudPath, transferLibraryContainerPath, gcsTransferConfiguration, referenceInputsToMountedPathsOpt)
gcsDelocalizationScriptCloudPath = jobPaths.callExecutionRoot / PipelinesApiJobPaths.GcsDelocalizationScriptName
_ <- uploadGcsDelocalizationScript(createParameters, gcsDelocalizationScriptCloudPath, transferLibraryContainerPath, gcsTransferConfiguration)
_ = this.hasDockerCredentials = createParameters.privateDockerKeyAndEncryptedToken.isDefined
runId <- runPipeline(workflowId, createParameters, jobLogger)
_ = sendGoogleLabelsToMetadata(customLabels)
_ = sendIncrementMetricsForReferenceFiles(referenceInputsToMountedPathsOpt.map(_.keySet))
} yield runId

runPipelineResponse map { runId =>
Expand All @@ -632,6 +657,17 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
}
}

protected def sendIncrementMetricsForReferenceFiles(referenceInputFilesOpt: Option[Set[PipelinesApiInput]]): Unit = {
referenceInputFilesOpt match {
case Some(referenceInputFiles) =>
referenceInputFiles.foreach { referenceInputFile =>
increment(NonEmptyList.of("referencefiles", referenceInputFile.relativeHostPath.pathAsString))
}
case _ =>
// do nothing - reference disks feature is either not configured in Cromwell or disabled in workflow options
}
}

override def pollStatusAsync(handle: JesPendingExecutionHandle): Future[RunStatus] = {
super[PipelinesApiStatusRequestClient].pollStatus(workflowId, handle.pendingJob)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.data.{NonEmptyList, Validated}
import cats.data.Validated._
import cats.implicits._
import com.typesafe.config.{Config, ConfigValue}
import com.typesafe.scalalogging.StrictLogging
import common.exception.MessageAggregation
import common.validation.ErrorOr._
import common.validation.Validation._
Expand Down Expand Up @@ -55,7 +56,8 @@ case class PipelinesApiConfigurationAttributes(project: String,

object PipelinesApiConfigurationAttributes
extends PipelinesApiDockerCacheMappingOperations
with PipelinesApiReferenceFilesMappingOperations {
with PipelinesApiReferenceFilesMappingOperations
with StrictLogging {

/**
* @param transferAttempts This is the number of attempts, not retries, hence it is positive.
Expand Down Expand Up @@ -221,7 +223,7 @@ object PipelinesApiConfigurationAttributes
(memoryRetryKeys, memoryRetryMultiplier) flatMapN validateMemoryRetryConfig
}

val referenceDiskLocalizationManifestFiles: ErrorOr[Option[List[ValidFullGcsPath]]] = validateGcsPathsToReferenceDiskManifestFiles(backendConfig)
val referenceDiskLocalizationManifestFiles: ErrorOr[Option[List[ValidFullGcsPath]]] = validateGcsPathsToReferenceDiskManifestFiles(backendConfig, backendName)

val dockerImageCacheManifestFile: ErrorOr[Option[ValidFullGcsPath]] = validateGcsPathToDockerImageCacheManifestFile(backendConfig)

Expand Down Expand Up @@ -321,15 +323,18 @@ object PipelinesApiConfigurationAttributes
}
}

def validateGcsPathsToReferenceDiskManifestFiles(backendConfig: Config): ErrorOr[Option[List[ValidFullGcsPath]]] = {
def validateGcsPathsToReferenceDiskManifestFiles(backendConfig: Config, backendName: String): ErrorOr[Option[List[ValidFullGcsPath]]] = {
backendConfig.getAs[List[String]]("reference-disk-localization-manifest-files") match {
case Some(gcsPaths) =>
logger.info(s"Reference disks feature for $backendName backend is configured with the following manifest files: ${gcsPaths.mkString(",")}.")
gcsPaths.map(validateSingleGcsPath)
.sequence
.contextualizeErrors("parse paths to file as valid GCS paths")
.map(Option.apply)

case None => None.validNel
case None =>
logger.info(s"Reference disks feature for $backendName backend is not configured.")
None.validNel
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package cromwell.backend.google.pipelines.common

import java.nio.file.Paths
import java.util.UUID

import _root_.io.grpc.Status
import _root_.wdl.draft2.model._
import akka.actor.{ActorRef, Props}
import akka.testkit.{ImplicitSender, TestActorRef, TestDuration, TestProbe}
import cats.data.NonEmptyList
import com.google.api.client.http.HttpRequest
import com.google.cloud.NoCredentials
import common.collections.EnhancedCollections._
Expand All @@ -27,6 +29,8 @@ import cromwell.core.labels.Labels
import cromwell.core.logging.JobLogger
import cromwell.core.path.{DefaultPathBuilder, PathBuilder}
import cromwell.filesystems.gcs.{GcsPath, GcsPathBuilder, MockGcsPathBuilder}
import cromwell.services.instrumentation.{CromwellBucket, CromwellIncrement}
import cromwell.services.instrumentation.InstrumentationService.InstrumentationServiceMessage
import cromwell.services.keyvalue.InMemoryKvServiceActor
import cromwell.services.keyvalue.KeyValueServiceActor.{KvGet, KvJobKey, KvPair, ScopedKey}
import cromwell.util.JsonFormatting.WomValueJsonFormatter._
Expand Down Expand Up @@ -127,12 +131,13 @@ class PipelinesApiAsyncBackendJobExecutionActorSpec extends TestKitSuite
jesConfiguration: PipelinesApiConfiguration,
functions: PipelinesApiExpressionFunctions = TestableJesExpressionFunctions,
jesSingletonActor: ActorRef = emptyActor,
ioActor: ActorRef = mockIoActor) = {
ioActor: ActorRef = mockIoActor,
serviceRegistryActor: ActorRef = kvService) = {

this(
DefaultStandardAsyncExecutionActorParams(
jobIdKey = PipelinesApiAsyncBackendJobExecutionActor.JesOperationIdKey,
serviceRegistryActor = kvService,
serviceRegistryActor = serviceRegistryActor,
ioActor = ioActor,
jobDescriptor = jobDescriptor,
configurationDescriptor = jesConfiguration.configurationDescriptor,
Expand Down Expand Up @@ -215,15 +220,19 @@ class PipelinesApiAsyncBackendJobExecutionActorSpec extends TestKitSuite
private def executionActor(jobDescriptor: BackendJobDescriptor,
promise: Promise[BackendJobExecutionResponse],
jesSingletonActor: ActorRef,
shouldBePreemptible: Boolean): ActorRef = {
shouldBePreemptible: Boolean,
serviceRegistryActor: ActorRef = kvService,
referenceInputFilesOpt: Option[Set[PipelinesApiInput]] = None): ActorRef = {

val job = StandardAsyncJob(UUID.randomUUID().toString)
val run = Run(job)
val handle = new JesPendingExecutionHandle(jobDescriptor, run.job, Option(run), None)

class ExecuteOrRecoverActor extends TestablePipelinesApiJobExecutionActor(jobDescriptor, promise, papiConfiguration, jesSingletonActor = jesSingletonActor) {
class ExecuteOrRecoverActor extends TestablePipelinesApiJobExecutionActor(jobDescriptor, promise, papiConfiguration, jesSingletonActor = jesSingletonActor, serviceRegistryActor = serviceRegistryActor) {
override def executeOrRecover(mode: ExecutionMode)(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
if(preemptible == shouldBePreemptible) Future.successful(handle)
sendIncrementMetricsForReferenceFiles(referenceInputFilesOpt)

if (preemptible == shouldBePreemptible) Future.successful(handle)
else Future.failed(new Exception(s"Test expected preemptible to be $shouldBePreemptible but got $preemptible"))
}
}
Expand Down Expand Up @@ -316,6 +325,40 @@ class PipelinesApiAsyncBackendJobExecutionActorSpec extends TestKitSuite
}
}

it should "send proper value for \"number of reference files used gauge\" metric, or don't send anything if reference disks feature is disabled" in {
val expectedInput1 = PipelinesApiFileInput(name = "testfile1", relativeHostPath = DefaultPathBuilder.build(Paths.get(s"test/reference/path/file1")), mount = null, cloudPath = null)
val expectedInput2 = PipelinesApiFileInput(name = "testfile2", relativeHostPath = DefaultPathBuilder.build(Paths.get(s"test/reference/path/file2")), mount = null, cloudPath = null)
val expectedReferenceInputFiles = Set[PipelinesApiInput](expectedInput1, expectedInput2)

val expectedMsg1 = InstrumentationServiceMessage(CromwellIncrement(CromwellBucket(List.empty, NonEmptyList.of("referencefiles", expectedInput1.relativeHostPath.pathAsString))))
val expectedMsg2 = InstrumentationServiceMessage(CromwellIncrement(CromwellBucket(List.empty, NonEmptyList.of("referencefiles", expectedInput2.relativeHostPath.pathAsString))))

val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0)
val serviceRegistryProbe = TestProbe()

val backend1 = executionActor(
jobDescriptor,
Promise[BackendJobExecutionResponse](),
TestProbe().ref,
shouldBePreemptible = false,
serviceRegistryActor = serviceRegistryProbe.ref,
referenceInputFilesOpt = Option(expectedReferenceInputFiles)
)
backend1 ! Execute
serviceRegistryProbe.expectMsgAllOf(expectedMsg1, expectedMsg2)

val backend2 = executionActor(
jobDescriptor,
Promise[BackendJobExecutionResponse](),
TestProbe().ref,
shouldBePreemptible = false,
serviceRegistryActor = serviceRegistryProbe.ref,
referenceInputFilesOpt = None
)
backend2 ! Execute
serviceRegistryProbe.expectNoMessage(timeout)
}

it should "not restart 2 of 1 unexpected shutdowns without another preemptible VM" in {
val actorRef = buildPreemptibleTestActorRef(2, 1)
val jesBackend = actorRef.underlyingActor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class PipelinesApiConfigurationAttributesSpec extends AnyFlatSpec with CromwellT
val referenceDiskManifestConfigStr = s"""reference-disk-localization-manifest-files = ["$referenceDiskManifest1Path", "$referenceDiskManifest2Path"]"""
val backendConfig = ConfigFactory.parseString(configString(referenceDiskManifestConfigStr))

val validatedGcsPathsToReferenceDiskManifestFilesErrorOr = PipelinesApiConfigurationAttributes.validateGcsPathsToReferenceDiskManifestFiles(backendConfig)
val validatedGcsPathsToReferenceDiskManifestFilesErrorOr = PipelinesApiConfigurationAttributes.validateGcsPathsToReferenceDiskManifestFiles(backendConfig, "unit-test-backend")
validatedGcsPathsToReferenceDiskManifestFilesErrorOr match {
case Valid(validatedGcsPathsToReferenceDiskManifestFilesOpt) =>
validatedGcsPathsToReferenceDiskManifestFilesOpt match {
Expand All @@ -434,7 +434,7 @@ class PipelinesApiConfigurationAttributesSpec extends AnyFlatSpec with CromwellT
it should "parse correct missing reference-disk-localization-manifest-files config" in {
val backendConfig = ConfigFactory.parseString(configString())

val validatedGcsPathsToReferenceDiskManifestFilesErrorOr = PipelinesApiConfigurationAttributes.validateGcsPathsToReferenceDiskManifestFiles(backendConfig)
val validatedGcsPathsToReferenceDiskManifestFilesErrorOr = PipelinesApiConfigurationAttributes.validateGcsPathsToReferenceDiskManifestFiles(backendConfig, "unit-test-backend")
validatedGcsPathsToReferenceDiskManifestFilesErrorOr match {
case Valid(validatedGcsPathsToReferenceDiskManifestFilesOpt) =>
validatedGcsPathsToReferenceDiskManifestFilesOpt shouldBe None
Expand All @@ -447,7 +447,7 @@ class PipelinesApiConfigurationAttributesSpec extends AnyFlatSpec with CromwellT
val referenceDiskManifestConfigStr = "reference-disk-localization-manifest-files = []"
val backendConfig = ConfigFactory.parseString(configString(referenceDiskManifestConfigStr))

val validatedGcsPathsToReferenceDiskManifestFilesErrorOr = PipelinesApiConfigurationAttributes.validateGcsPathsToReferenceDiskManifestFiles(backendConfig)
val validatedGcsPathsToReferenceDiskManifestFilesErrorOr = PipelinesApiConfigurationAttributes.validateGcsPathsToReferenceDiskManifestFiles(backendConfig, "unit-test-backend")
validatedGcsPathsToReferenceDiskManifestFilesErrorOr match {
case Valid(validatedGcsPathsToReferenceDiskManifestFilesOpt) =>
validatedGcsPathsToReferenceDiskManifestFilesOpt match {
Expand Down Expand Up @@ -483,7 +483,7 @@ class PipelinesApiConfigurationAttributesSpec extends AnyFlatSpec with CromwellT
it should "parse correct missing docker-image-cache-manifest-file config" in {
val backendConfig = ConfigFactory.parseString(configString())

val validatedGcsPathsToDockerImageCacheManifestFilesErrorOr = PipelinesApiConfigurationAttributes.validateGcsPathsToReferenceDiskManifestFiles(backendConfig)
val validatedGcsPathsToDockerImageCacheManifestFilesErrorOr = PipelinesApiConfigurationAttributes.validateGcsPathsToReferenceDiskManifestFiles(backendConfig, "unit-test-backend")
validatedGcsPathsToDockerImageCacheManifestFilesErrorOr match {
case Valid(validatedGcsPathsToDockerImageCacheManifestFilesOpt) =>
validatedGcsPathsToDockerImageCacheManifestFilesOpt shouldBe None
Expand Down
Loading

0 comments on commit a49e1fc

Please sign in to comment.