Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr committed Aug 25, 2021
2 parents 5f8a277 + 0aff353 commit a4567f6
Show file tree
Hide file tree
Showing 67 changed files with 1,222 additions and 468 deletions.
13 changes: 9 additions & 4 deletions .github/workflows/combine_scalasteward_prs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ jobs:
git config user.name "broadbot"
echo "Fetching repo state..."
git fetch --quiet
git fetch origin develop:origin_develop
echo "Bringing in PRs:"
gh pr list --limit 1000 | grep 'scala-steward:' | tac | head -n${PR_LIMIT}
Expand All @@ -54,8 +55,8 @@ jobs:
NEW_BRANCH="consolidated-scala-steward-prs-${FIRST_PR}-${LAST_PR}-$(date +'%Y-%m-%d_%H-%M')"
echo "NEW_BRANCH=${NEW_BRANCH}"
git checkout develop
git pull
git checkout origin_develop
echo "develop is currently at: $(git rev-parse --verify HEAD)"
git checkout -B ${NEW_BRANCH}
SUCCESSFUL_PRS=()
Expand All @@ -64,8 +65,12 @@ jobs:
for pr in ${NEXT_SLICE[@]}
do
echo "Bringing in: $pr"
git fetch origin pull/${pr}/head:pr_${pr}_temp
git checkout ${NEW_BRANCH}
echo "${NEW_BRANCH} is currently at: $(git rev-parse --verify HEAD)"
git checkout pr_${pr}_temp
echo "pr_${pr}_temp is currently at: $(git rev-parse --verify HEAD)"
git rebase "${NEW_BRANCH}" "pr_${pr}_temp" && EXIT_CODE=$? || EXIT_CODE=$?
if [ "${EXIT_CODE}" == "0" ]
Expand Down Expand Up @@ -95,7 +100,7 @@ jobs:
Consolidates scala-steward PRs: ${SUCCESSFUL_PRS[*]}
Merge conflicts during consolidation (might be empty): ${UNSUCCESSFUL_PRS[*]}"
Merge conflicts during consolidation (if non-empty, this list of PRs will need to be manually applied): ${UNSUCCESSFUL_PRS[*]}"
fi
NEXT_INDEX=$(( NEXT_INDEX + PR_GROUP_SIZE ))
Expand Down
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
# Cromwell Change Log

## 67 Release Notes

### Configuration updates for improved scaling

Some configuration changes were introduced in Cromwell 67 to support improved scaling. See Cromwell's `reference.conf` for details on new parameters.

* I/O throttling moved from `io` to its own `io.throttle` stanza; config updates may be required if these values are currently being overridden in local deployments.

* The default `system.job-rate-control` has been changed from 50 per second to 20 per 10 seconds.

* New configuration parameters have been introduced for values which were previously hardcoded constants:
* `system.file-hash-batch-size`, value updated from `100` to `50`.
* `io.gcs.max-batch-size`, value stays the same at `100`.
* `io.gcs.max-batch-duration`, value stays the same at `5 seconds`.

* New configuration parameters which should not require updating:
* `io.command-backpressure-staleness`
* `io.backpressure-extension-log-threshold`
* `load-control.io-normal-window-minimum`
* `load-control.io-normal-window-maximum`

* `io.nio.parallelism` was previously misspelled in `reference.conf` but not in Cromwell's configuration reading code. Only correct spellings of this configuration key had or will have effect.

## 66 Release Notes

### Google Artifact Registry Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package cromwell.backend

import akka.actor.ActorLogging
import akka.event.LoggingReceive
import common.validation.Validation.{MemoryRetryMultiplier, MemoryRetryMultiplierRefined}
import cromwell.backend.BackendJobExecutionActor._
import cromwell.backend.BackendLifecycleActor._
import cromwell.backend.OutputEvaluator.EvaluatedJobOutputs
import cromwell.core._
import cromwell.core.path.Path
import eu.timepit.refined.refineMV
import wom.expression.IoFunctionSet
import wom.values.WomValue

Expand Down Expand Up @@ -48,8 +46,7 @@ object BackendJobExecutionActor {
case class JobFailedNonRetryableResponse(jobKey: JobKey, throwable: Throwable, returnCode: Option[Int]) extends BackendJobFailedResponse
case class JobFailedRetryableResponse(jobKey: BackendJobDescriptorKey,
throwable: Throwable,
returnCode: Option[Int],
memoryMultiplier: MemoryRetryMultiplierRefined = refineMV[MemoryRetryMultiplier](1.0)) extends BackendJobFailedResponse
returnCode: Option[Int]) extends BackendJobFailedResponse

// Reconnection Exceptions
case class JobReconnectionNotSupportedException(jobKey: BackendJobDescriptorKey) extends Exception(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ trait BackendLifecycleActorFactory {
/**
* A set of KV store keys that are requested and looked up on behalf of all backends before running each job.
*/
def defaultKeyValueStoreKeys: Seq[String] = Seq(BackendLifecycleActorFactory.FailedRetryCountKey)
def defaultKeyValueStoreKeys: Seq[String] = Seq(BackendLifecycleActorFactory.FailedRetryCountKey, BackendLifecycleActorFactory.MemoryMultiplierKey)

/*
* Returns credentials that can be used to authenticate to a docker registry server
Expand All @@ -137,4 +137,5 @@ trait BackendLifecycleActorFactory {

object BackendLifecycleActorFactory {
val FailedRetryCountKey = "FailedRetryCount"
val MemoryMultiplierKey = "MemoryMultiplier"
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging with SlowJob
case Finish(FailedNonRetryableExecutionHandle(throwable, returnCode, _)) =>
completionPromise.success(JobFailedNonRetryableResponse(jobDescriptor.key, throwable, returnCode))
context.stop(self)
case Finish(FailedRetryableExecutionHandle(throwable, returnCode, memoryMultiplier, _)) =>
completionPromise.success(JobFailedRetryableResponse(jobDescriptor.key, throwable, returnCode, memoryMultiplier))
case Finish(FailedRetryableExecutionHandle(throwable, returnCode, _)) =>
completionPromise.success(JobFailedRetryableResponse(jobDescriptor.key, throwable, returnCode))
context.stop(self)
case Finish(cromwell.backend.async.AbortedExecutionHandle) =>
completionPromise.success(JobAbortedResponse(jobDescriptor.key))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package cromwell.backend.async

import common.validation.Validation.{MemoryRetryMultiplier, MemoryRetryMultiplierRefined}
import cromwell.backend.BackendJobDescriptor
import cromwell.backend.async.AsyncBackendJobExecutionActor.JobId
import cromwell.core.path.Path
import cromwell.core.{CallOutputs, ExecutionEvent}
import cromwell.services.keyvalue.KeyValueServiceActor.KvPair
import eu.timepit.refined.refineMV

/**
* Trait to encapsulate whether an execution is complete and if so provide a result. Useful in conjunction
Expand Down Expand Up @@ -51,7 +49,6 @@ final case class FailedNonRetryableExecutionHandle(throwable: Throwable,

final case class FailedRetryableExecutionHandle(throwable: Throwable,
returnCode: Option[Int] = None,
memoryMultiplier: MemoryRetryMultiplierRefined = refineMV[MemoryRetryMultiplier](1.0),
override val kvPairsToSave: Option[Seq[KvPair]]) extends FailedExecutionHandle {

override val isDone = true
Expand Down
4 changes: 1 addition & 3 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilderFactory}
import cromwell.core.{CallKey, HogGroup, WorkflowId, WorkflowOptions}
import cromwell.docker.DockerInfoActor.DockerSize
import cromwell.services.keyvalue.KeyValueServiceActor.KvResponse
import eu.timepit.refined.refineMV
import net.ceedubs.ficus.Ficus._
import wom.callable.{ExecutableCallable, MetaValueElement}
import wom.graph.CommandCallNode
Expand All @@ -29,8 +28,7 @@ import scala.util.Try
*/
case class BackendJobDescriptorKey(call: CommandCallNode,
index: Option[Int],
attempt: Int,
memoryMultiplier: MemoryRetryMultiplierRefined = refineMV[MemoryRetryMultiplier](1.0)) extends CallKey {
attempt: Int) extends CallKey {
def node = call
private val indexString = index map { _.toString } getOrElse "NA"
lazy val tag = s"${call.fullyQualifiedName}:$indexString:$attempt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import common.validation.IOChecked._
import common.validation.Validation._
import cromwell.backend.BackendJobExecutionActor.{BackendJobExecutionResponse, JobAbortedResponse, JobReconnectionNotSupportedException}
import cromwell.backend.BackendLifecycleActor.AbortJobCommand
import cromwell.backend.BackendLifecycleActorFactory.{FailedRetryCountKey, MemoryMultiplierKey}
import cromwell.backend.OutputEvaluator._
import cromwell.backend.SlowJobWarning.{WarnAboutSlownessAfter, WarnAboutSlownessIfNecessary}
import cromwell.backend._
Expand All @@ -23,11 +24,10 @@ import cromwell.backend.standard.StandardAdHocValue._
import cromwell.backend.validation._
import cromwell.core.io.{AsyncIoActorClient, DefaultIoCommandBuilder, IoCommandBuilder}
import cromwell.core.path.Path
import cromwell.core.{CromwellAggregatedException, CromwellFatalExceptionMarker, ExecutionEvent, StandardPaths, WorkflowOptions}
import cromwell.core._
import cromwell.services.keyvalue.KeyValueServiceActor._
import cromwell.services.keyvalue.KvClient
import cromwell.services.metadata.CallMetadataKeys
import eu.timepit.refined.api._
import eu.timepit.refined.refineV
import mouse.all._
import net.ceedubs.ficus.Ficus._
Expand Down Expand Up @@ -715,6 +715,20 @@ trait StandardAsyncExecutionActor
case _ => 0
}

/**
* Returns the memory multiplier for previous attempt if available
*/
lazy val previousMemoryMultiplier: Option[Double] = jobDescriptor.prefetchedKvStoreEntries.get(BackendLifecycleActorFactory.MemoryMultiplierKey) match {
case Some(KvPair(_,v)) => Try(v.toDouble) match {
case Success(m) => Option(m)
case Failure(e) =>
// should not happen as Cromwell itself had written the value as a Double
log.error(e, s"Programmer error: unexpected failure attempting to convert value of MemoryMultiplierKey from JOB_KEY_VALUE_ENTRY table to Double.")
None
}
case _ => None
}

/**
* Execute the job specified in the params. Should return a `StandardAsyncPendingExecutionHandle`, or a
* `FailedExecutionHandle`.
Expand Down Expand Up @@ -1006,12 +1020,13 @@ trait StandardAsyncExecutionActor
val maxRetriesNotReachedYet = previousFailedRetries < maxRetries
failedRetryableOrNonRetryable match {
case failed: FailedNonRetryableExecutionHandle if maxRetriesNotReachedYet =>
val currentMemoryMultiplier = jobDescriptor.key.memoryMultiplier
(retryWithMoreMemory, memoryRetryFactor) match {
case (true, Some(retryFactor)) =>
val newMultiplier = Refined.unsafeApply[Double, MemoryRetryMultiplier](currentMemoryMultiplier.value * retryFactor.value)
saveAttrsAndRetry(failed, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true, Option(newMultiplier))
case (_, _) => saveAttrsAndRetry(failed, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true)
(retryWithMoreMemory, memoryRetryFactor, previousMemoryMultiplier) match {
case (true, Some(retryFactor), Some(previousMultiplier)) =>
val nextMemoryMultiplier = previousMultiplier * retryFactor.value
saveAttrsAndRetry(failed, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true, Option(nextMemoryMultiplier))
case (true, Some(retryFactor), None) =>
saveAttrsAndRetry(failed, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true, Option(retryFactor.value))
case (_, _, _) => saveAttrsAndRetry(failed, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true)
}
case failedNonRetryable: FailedNonRetryableExecutionHandle => Future.successful(failedNonRetryable)
case failedRetryable: FailedRetryableExecutionHandle => saveAttrsAndRetry(failedRetryable, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = false)
Expand All @@ -1024,15 +1039,14 @@ trait StandardAsyncExecutionActor
kvPrev: Map[String, KvPair],
kvNext: Map[String, KvPair],
incFailedCount: Boolean,
memoryMultiplier: Option[MemoryRetryMultiplierRefined] = None): Future[FailedRetryableExecutionHandle] = {
nextMemoryMultiplier: Option[Double] = None): Future[FailedRetryableExecutionHandle] = {
failedExecHandle match {
case failedNonRetryable: FailedNonRetryableExecutionHandle =>
saveKvPairsForNextAttempt(kvPrev, kvNext, incFailedCount) map { _ =>
val currentMemoryMultiplier = jobDescriptor.key.memoryMultiplier
FailedRetryableExecutionHandle(failedNonRetryable.throwable, failedNonRetryable.returnCode, memoryMultiplier.getOrElse(currentMemoryMultiplier), None)
saveKvPairsForNextAttempt(kvPrev, kvNext, incFailedCount, nextMemoryMultiplier) map { _ =>
FailedRetryableExecutionHandle(failedNonRetryable.throwable, failedNonRetryable.returnCode, None)
}
case failedRetryable: FailedRetryableExecutionHandle =>
saveKvPairsForNextAttempt(kvPrev, kvNext, incFailedCount) map (_ => failedRetryable)
saveKvPairsForNextAttempt(kvPrev, kvNext, incFailedCount, nextMemoryMultiplier) map (_ => failedRetryable)
}
}

Expand All @@ -1044,18 +1058,29 @@ trait StandardAsyncExecutionActor
*/
private def saveKvPairsForNextAttempt(kvsFromPreviousAttempt: Map[String, KvPair],
kvsForNextAttempt: Map[String, KvPair],
incrementFailedRetryCount: Boolean): Future[Seq[KvResponse]] = {
incrementFailedRetryCount: Boolean,
nextMemoryMultiplierOption: Option[Double]): Future[Seq[KvResponse]] = {
val nextKvJobKey = KvJobKey(jobDescriptor.key.call.fullyQualifiedName, jobDescriptor.key.index, jobDescriptor.key.attempt + 1)

def getNextKvPair[A](key: String, value: String): Map[String, KvPair] = {
val nextScopedKey = ScopedKey(jobDescriptor.workflowDescriptor.id, nextKvJobKey, key)
val nextKvPair = KvPair(nextScopedKey, value)
Map(key -> nextKvPair)
}

val kvsFromPreviousAttemptUpd = kvsFromPreviousAttempt.mapValues(kvPair => kvPair.copy(key = kvPair.key.copy(jobKey = nextKvJobKey)))
val mergedKvs = if (incrementFailedRetryCount) {
val failedRetryCountScopedKey = ScopedKey(jobDescriptor.workflowDescriptor.id, nextKvJobKey, BackendLifecycleActorFactory.FailedRetryCountKey)
val failedRetryCountKvPair = KvPair(failedRetryCountScopedKey, (previousFailedRetries + 1).toString)

kvsFromPreviousAttemptUpd ++ kvsForNextAttempt + (BackendLifecycleActorFactory.FailedRetryCountKey -> failedRetryCountKvPair)
} else {
kvsFromPreviousAttemptUpd ++ kvsForNextAttempt
val failedRetryCountKvPair: Map[String, KvPair] =
if (incrementFailedRetryCount) getNextKvPair(FailedRetryCountKey, (previousFailedRetries + 1).toString)
else Map.empty[String, KvPair]

val memoryMultiplierKvPair = nextMemoryMultiplierOption match {
case Some(memoryMultiplier) => getNextKvPair(MemoryMultiplierKey, memoryMultiplier.toString)
case None => Map.empty[String, KvPair]
}

val mergedKvs = kvsFromPreviousAttemptUpd ++ kvsForNextAttempt ++ failedRetryCountKvPair ++ memoryMultiplierKvPair

makeKvRequest(mergedKvs.values.map(KvPut).toSeq) map { respSeq =>
val failures = respSeq.filter(_.isInstanceOf[KvFailure])
if (failures.isEmpty) {
Expand Down Expand Up @@ -1287,20 +1312,20 @@ trait StandardAsyncExecutionActor
retryElseFail(executionHandle)
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
case Success(returnCodeAsInt) if !continueOnReturnCode.continueFor(returnCodeAsInt) =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
case Success(returnCodeAsInt) if retryWithMoreMemory =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
case Success(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
case Failure(_) =>
Future.successful(FailedNonRetryableExecutionHandle(ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption), kvPairsToSave = None))
}
} else {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if retryWithMoreMemory =>
case Success(returnCodeAsInt) if retryWithMoreMemory && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
case _ =>
Expand Down
2 changes: 1 addition & 1 deletion backend/src/test/scala/cromwell/backend/BackendSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito with ScaledTim
case (JobFailedNonRetryableResponse(_, failure, _), JobFailedNonRetryableResponse(_, expectedFailure, _)) =>
failure.getClass shouldBe expectedFailure.getClass
concatenateCauseMessages(failure) should include(expectedFailure.getMessage)
case (JobFailedRetryableResponse(_, failure, _, _), JobFailedRetryableResponse(_, expectedFailure, _, _)) =>
case (JobFailedRetryableResponse(_, failure, _), JobFailedRetryableResponse(_, expectedFailure, _)) =>
failure.getClass shouldBe expectedFailure.getClass
case (response, expectation) =>
fail(s"Execution response $response wasn't conform to expectation $expectation")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: lots_of_inputs_scattered_papiv2
testFormat: workflowsuccess
# This test usually runs successfully but its scale presents a lot of challenges for CI.
# Too much log output to run on Travis, > 1 million metadata rows in the db, Akka HTTP response size nearly 1 GiB
# (after numerous failed retry attempts), and "java.lang.OutOfMemoryError: Java heap space" on Jenkins workers.
# `ignore`ing for now but hopefully can be re-enabled in Jenkins if full metadata fetches can be sidestepped (BT-380)
# or perhaps migrated to the Cromwell perf environment.
ignore: true
backends: [ Papiv2 ]

files {
workflow: scale/lots_of_inputs_scattered/lots_of_inputs_scattered.wdl
}

metadata {
workflowName: lots_of_inputs_scattered
status: Succeeded
}
Loading

0 comments on commit a4567f6

Please sign in to comment.