Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
kshakir committed Jan 11, 2021
2 parents 0963109 + a49e1fc commit 768221e
Show file tree
Hide file tree
Showing 264 changed files with 6,738 additions and 1,700 deletions.
5 changes: 2 additions & 3 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
#
# Default: @asap
#
#pullRequests.frequency = "0 0 ? * 3" # every thursday on midnight
pullRequests.frequency = "@asap"
pullRequests.frequency = "0 0 1 1,4,7,10 ?" # Run at 00:00 on the 1st day of Jan,Apr,Jul,Oct (whatever day that is)

# Only these dependencies which match the given patterns are updated.
#
Expand All @@ -51,7 +50,7 @@ pullRequests.frequency = "@asap"
# If set, Scala Steward will only attempt to create or update `n` PRs.
# Useful if running frequently and/or CI build are costly
# Default: None
updates.limit = 5
# updates.limit = 5

# By default, Scala Steward does not update scala version since its tricky, error-prone
# and results in bad PRs and/or failed builds
Expand Down
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ env:
- >-
BUILD_TYPE=centaurBcs
BUILD_MYSQL=5.7
- >-
BUILD_TYPE=centaurDummy
BUILD_MYSQL=5.7
- >-
BUILD_TYPE=centaurEngineUpgradeLocal
BUILD_MYSQL=5.7
Expand Down
33 changes: 32 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,41 @@
# Cromwell Change Log

## 55 Release Notes

### Apple Silicon support statement

Users with access to the new Mac hardware should review [important information provided here](https://cromwell.readthedocs.io/en/stable/Releases).

### Bug Fixes

* Fixed a bug that prevented `read_json()` from working with arrays and primitives. The function now works as expected for all valid JSON data inputs.
More information on JSON Type to WDL Type conversion can be found [here](https://github.com/openwdl/wdl/blob/main/versions/1.0/SPEC.md#mixed-read_jsonstringfile).

* Now retries HTTP 408 responses as well as HTTP 429 responses during DOS/DRS resolution requests.

* Fixed a bug that prevented the call caching diff endpoint from working with scatters in workflows with archived metadata.

### New Features

#### Reference disk support on PAPI v2

Cromwell now offers support for the use of reference disks on the PAPI v2 backend as an alternative to localizing
reference inputs. More details [here](https://cromwell.readthedocs.io/en/develop/backends/Google#reference-disk-support).

#### Docker image cache support on PAPI v2 lifesciences beta

Cromwell now offers support for the use of Docker image caches on the PAPI v2 lifesciences beta backend. More details [here](https://cromwell.readthedocs.io/en/develop/backends/Google#docker-image-cache-support).

#### Preemptible Recovery via Checkpointing

* Cromwell can now help tasks recover from preemption by allowing them to specify a 'checkpoint' file which will be restored
to the worker VM on the next attempt if the task is interrupted. More details [here](https://cromwell.readthedocs.io/en/develop/optimizations/CheckpointFiles)

## 54 Release Notes

### Bug Fixes

* Fixed a bug where `write_json()` failed for `Array[_]` inputs. It should now work for `Boolean`, `String`, `Integer`, `Float`,
* Fixed a bug that prevented `write_json()` from working with arrays and primitives. The function now works as expected for `Boolean`, `String`, `Integer`, `Float`,
`Pair[_, _]`, `Object`, `Map[_, _]` and `Array[_]` (including array of objects) type inputs. More information on WDL Type to JSON Type
conversion can be found [here](https://github.com/openwdl/wdl/blob/main/versions/1.0/SPEC.md#mixed-read_jsonstringfile).

Expand Down
35 changes: 29 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,43 @@

## Welcome to Cromwell

Cromwell is a Workflow Management System geared towards scientific workflows. Cromwell is open sourced under the [BSD 3-Clause license](LICENSE.txt).
Cromwell is an open-source Workflow Management System for bioinformatics. Licensing is [BSD 3-Clause](LICENSE.txt).

The Cromwell documentation has a new home, [click here to check it out](https://cromwell.readthedocs.io/en/stable)!
The [Cromwell documentation has a dedicated site](https://cromwell.readthedocs.io/en/stable).

First time to Cromwell? Get started with [Tutorials](https://cromwell.readthedocs.io/en/stable/tutorials/FiveMinuteIntro/)!
First time to Cromwell? Get started with [Tutorials](https://cromwell.readthedocs.io/en/stable/tutorials/FiveMinuteIntro/).

### Community

Thinking about contributing to Cromwell? Get started by reading our [Contributor Guide](CONTRIBUTING.md).

Cromwell has a growing ecosystem of community-backed projects to make your experience even better! Check out our [Ecosystem](https://cromwell.readthedocs.io/en/stable/Ecosystem/) page to learn more.

### Issue tracking is now on JIRA
Talk to us:
- [Join the Cromwell Slack workspace](https://join.slack.com/t/cromwellhq/shared_invite/zt-dxmmrtye-JHxwKE53rfKE_ZWdOHIB4g) to discuss the Cromwell workflow engine.
- [Join the OpenWDL Slack workspace](https://join.slack.com/t/openwdl/shared_invite/zt-ctmj4mhf-cFBNxIiZYs6SY9HgM9UAVw) to discuss the evolution of the WDL language itself.
- More information about WDL is available in [that project's repository](https://github.com/openwdl/wdl).

### Capabilities and roadmap

A majority of Cromwell users today run their workflows in [Terra](https://app.terra.bio/), a fully-managed cloud-native bioinformatics computing platform. See [here](https://support.terra.bio/hc/en-us/articles/360036379771-Get-started-running-workflows) for a quick-start guide.

Users with specialized needs who wish to install and maintain their own Cromwell instances can [download](https://github.com/broadinstitute/cromwell/releases) a JAR or Docker image. The development team accepts reproducible bug reports from self-managed instances, but cannot feasibly provide direct support.

[Cromwell's backends](https://cromwell.readthedocs.io/en/stable/backends/Backends/) receive development resources proportional to customer demand. The team is actively developing for Google Cloud and AWS. Maintenance of other backends is primarily community-based.

Cromwell [supports](https://cromwell.readthedocs.io/en/stable/LanguageSupport/) the WDL and CWL workflow languages. The Cromwell team is actively developing WDL, while maintenance for CWL is primarily community-based.

### Issue tracking in JIRA

<!--
AEN external issue filing tested 2020-12-08 with `[email protected]` / `https://broadworkbench.atlassian.net/browse/CROM-6681`
-->

Need to file an issue? Head over to [our JIRA](https://broadworkbench.atlassian.net/jira/software/c/projects/CROM/issues). You must create a free profile to view or create.

Need to file an issue? Head over to [our JIRA](https://broadworkbench.atlassian.net/projects/BA/issues). You can sign in with any Google account.
[Issues in Github](https://github.com/broadinstitute/cromwell/issues) remain available for discussion among community members but are not actively monitored by the development team.

As of May 2019, we are in the process of migrating all issues from Github to JIRA. At a later date to be announced, submitting new Github issues will be disabled.
![Cromwell JIRA](docs/img/cromwell_jira.png)

![Jamie, the Cromwell pig](docs/jamie_the_cromwell_pig.png)
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package cromwell.backend.dummy

import java.time.OffsetDateTime
import java.util.UUID

import cats.data.NonEmptyList
import cats.data.Validated.{Invalid, Valid}
import cats.implicits._
import common.exception.AggregatedMessageException
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.BackendJobLifecycleActor
import cromwell.backend.async.{ExecutionHandle, FailedNonRetryableExecutionHandle, PendingExecutionHandle, SuccessfulExecutionHandle}
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.core.CallOutputs
import cromwell.core.retry.SimpleExponentialBackoff
import cromwell.services.instrumentation.CromwellInstrumentation
import wom.expression.NoIoFunctionSet
import wom.graph.GraphNodePort.{ExpressionBasedOutputPort, OutputPort}
import wom.values.WomValue

import scala.concurrent.Future
import scala.concurrent.duration._

class DummyAsyncExecutionActor(override val standardParams: StandardAsyncExecutionActorParams)
extends BackendJobLifecycleActor
with StandardAsyncExecutionActor
with CromwellInstrumentation {

/** The type of the run info when a job is started. */
override type StandardAsyncRunInfo = String
/** The type of the run status returned during each poll. */
override type StandardAsyncRunState = String

/** Should return true if the status contained in `thiz` is equivalent to `that`, delta any other data that might be carried around
* in the state type.
*/
override def statusEquivalentTo(thiz: String)(that: String): Boolean = thiz == that

/**
* Returns true when a job is complete, either successfully or unsuccessfully.
*
* @param runStatus The run status.
* @return True if the job has completed.
*/
override def isTerminal(runStatus: String): Boolean = runStatus == "DummyDone"

override def dockerImageUsed: Option[String] = None

override def pollBackOff: SimpleExponentialBackoff = SimpleExponentialBackoff(initialInterval = 1.second, maxInterval = 300.seconds, multiplier = 1.1)

override def executeOrRecoverBackOff: SimpleExponentialBackoff = SimpleExponentialBackoff(initialInterval = 1.second, maxInterval = 300.seconds, multiplier = 1.1)

override val logJobIds: Boolean = false

val singletonActor = standardParams.backendSingletonActorOption.getOrElse(
throw new RuntimeException("Dummy Backend actor cannot exist without its singleton actor"))

var finishTime: Option[OffsetDateTime] = None

override def executeAsync(): Future[ExecutionHandle] = {
finishTime = Option(OffsetDateTime.now().plusMinutes(3))
increment(NonEmptyList("jobs", List("dummy", "executing", "starting")))
singletonActor ! DummySingletonActor.PlusOne
Future.successful(
PendingExecutionHandle[StandardAsyncJob, StandardAsyncRunInfo, StandardAsyncRunState](
jobDescriptor = jobDescriptor,
pendingJob = StandardAsyncJob(UUID.randomUUID().toString),
runInfo = Option("pending"),
previousState = None
)
)
}

override def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[String] = {
finishTime match {
case Some(ft) if (ft.isBefore(OffsetDateTime.now)) => Future.successful("done")
case Some(_) => Future.successful("running")
case None => Future.failed(new Exception("Dummy backend polling for status before finishTime is established(!!?)"))
}

}

override def handlePollSuccess(oldHandle: StandardAsyncPendingExecutionHandle, state: String): Future[ExecutionHandle] = {

if (state == "done") {

increment(NonEmptyList("jobs", List("dummy", "executing", "done")))
singletonActor ! DummySingletonActor.MinusOne

val outputsValidation: ErrorOr[Map[OutputPort, WomValue]] = jobDescriptor.taskCall.outputPorts.toList.traverse {
case expressionBasedOutputPort: ExpressionBasedOutputPort =>
expressionBasedOutputPort.expression.evaluateValue(Map.empty, NoIoFunctionSet).map(expressionBasedOutputPort -> _)
case other => s"Unknown output port type for Dummy backend output evaluator: ${other.getClass.getSimpleName}".invalidNel
}.map(_.toMap)

outputsValidation match {
case Valid(outputs) =>
Future.successful(SuccessfulExecutionHandle(
outputs = CallOutputs(outputs.toMap),
returnCode = 0,
jobDetritusFiles = Map.empty,
executionEvents = Seq.empty,
resultsClonedFrom = None
))
case Invalid(errors) =>
Future.successful(FailedNonRetryableExecutionHandle(
throwable = AggregatedMessageException("Evaluate outputs from dummy job", errors.toList),
returnCode = None,
kvPairsToSave = None
))
}
}
else if (state == "running") {
Future.successful(
PendingExecutionHandle[StandardAsyncJob, StandardAsyncRunInfo, StandardAsyncRunState](
jobDescriptor = jobDescriptor,
pendingJob = StandardAsyncJob(UUID.randomUUID().toString),
runInfo = Option("pending"),
previousState = Option(state)
)
)
}
else {
Future.failed(new Exception(s"Unexpected Dummy state in handlePollSuccess: $state"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cromwell.backend.dummy

import cats.syntax.validated._
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.standard.{StandardInitializationActor, StandardInitializationActorParams, StandardValidatedRuntimeAttributesBuilder}
import cromwell.backend.validation.RuntimeAttributesValidation
import wom.expression.WomExpression
import wom.types.{WomStringType, WomType}
import wom.values.{WomString, WomValue}

class DummyInitializationActor(pipelinesParams: StandardInitializationActorParams)
extends StandardInitializationActor(pipelinesParams) {

override protected lazy val runtimeAttributeValidators: Map[String, Option[WomExpression] => Boolean] = Map("backend" -> { _ => true } )

// Specific validator for "backend" to let me specify it in test cases (to avoid accidentally submitting the workflow to real backends!)
val backendAttributeValidation: RuntimeAttributesValidation[String] = new RuntimeAttributesValidation[String] {
override def key: String = "backend"

override def coercion: Traversable[WomType] = Vector(WomStringType)

override protected def validateValue: PartialFunction[WomValue, ErrorOr[String]] = {
case WomString("Dummy") => "Dummy".validNel
case other => s"Unexpected dummy backend value: $other".invalidNel
}
}

override def runtimeAttributesBuilder: StandardValidatedRuntimeAttributesBuilder = super.runtimeAttributesBuilder.withValidation(backendAttributeValidation)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cromwell.backend.dummy

import akka.actor.{ActorRef, Props}
import cromwell.backend.BackendConfigurationDescriptor
import cromwell.backend.standard.callcaching.{StandardCacheHitCopyingActor, StandardFileHashingActor}
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardInitializationActor, StandardLifecycleActorFactory}

class DummyLifecycleActorFactory(override val name: String, override val configurationDescriptor: BackendConfigurationDescriptor) extends StandardLifecycleActorFactory {

/**
* @return the key to use for storing and looking up the job id.
*/
override def jobIdKey: String = "__dummy_operation_id"

/**
* @return the asynchronous executor class.
*/
override def asyncExecutionActorClass: Class[_ <: StandardAsyncExecutionActor] = classOf[DummyAsyncExecutionActor]

// Don't cache-hit copy
override lazy val cacheHitCopyingActorClassOption: Option[Class[_ <: StandardCacheHitCopyingActor]] = None

// Don't hash files
override lazy val fileHashingActorClassOption: Option[Class[_ <: StandardFileHashingActor]] = None

override def backendSingletonActorProps(serviceRegistryActor: ActorRef): Option[Props] = Option(Props(new DummySingletonActor()))

override lazy val initializationActorClass: Class[_ <: StandardInitializationActor] = classOf[DummyInitializationActor]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package cromwell.backend.dummy

import java.io.File
import java.time.OffsetDateTime
import java.util.UUID

import akka.actor.Actor
import com.typesafe.scalalogging.StrictLogging
import cromwell.backend.dummy.DummySingletonActor._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

final class DummySingletonActor() extends Actor with StrictLogging {

implicit val ec: ExecutionContext = context.dispatcher
var count: Int = 0

var countHistory: Vector[(OffsetDateTime, Int)] = Vector.empty

override def receive: Receive = {
case PlusOne => count = count + 1
case MinusOne => count = count - 1
case PrintCount =>
if(countHistory.lastOption.exists(_._2 != count)) {
countHistory = countHistory :+ (OffsetDateTime.now() -> count)
logger.info("The current count is now: " + count)
if (count == 0) {
outputCountHistory()
countHistory = Vector.empty
}
} else {
countHistory = countHistory :+ (OffsetDateTime.now() -> count)
}

}

private def outputCountHistory() = {
import java.io.BufferedWriter
import java.io.FileOutputStream
import java.io.OutputStreamWriter
val fout = new File(s"timestamps-${UUID.randomUUID().toString}.tsv")
val fos = new FileOutputStream(fout)

val bw = new BufferedWriter(new OutputStreamWriter(fos))

for ((timestamp, count) <- countHistory) {
bw.write(s"$timestamp\t$count")
bw.newLine()
}
bw.flush()
bw.close()
}

context.system.scheduler.schedule(10.seconds, 1.second) { self ! PrintCount }
}

object DummySingletonActor {
case object PlusOne
case object MinusOne
case object PrintCount
}

Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ trait StandardAsyncExecutionActor
default = s"""$$(mkdir -p "${runtimeEnvironment.tempPath}" && echo "${runtimeEnvironment.tempPath}")"""
)

val logJobIds: Boolean = true

/** Used to convert cloud paths into local paths. */
protected def preProcessWomFile(womFile: WomFile): WomFile = womFile

Expand Down Expand Up @@ -1105,7 +1107,7 @@ trait StandardAsyncExecutionActor
configurationDescriptor.slowJobWarningAfter foreach { duration => self ! WarnAboutSlownessAfter(handle.pendingJob.jobId, duration) }

tellKvJobId(handle.pendingJob) map { _ =>
jobLogger.info(s"job id: ${handle.pendingJob.jobId}")
if (logJobIds) jobLogger.info(s"job id: ${handle.pendingJob.jobId}")
tellMetadata(Map(CallMetadataKeys.JobId -> handle.pendingJob.jobId))
/*
NOTE: Because of the async nature of the Scala Futures, there is a point in time where we have submitted this or
Expand Down
Loading

0 comments on commit 768221e

Please sign in to comment.