Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
cjllanwarne committed Jun 16, 2021
2 parents e90facf + 7ea3fd3 commit ed067b8
Show file tree
Hide file tree
Showing 19 changed files with 190 additions and 83 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Cromwell Change Log

## 65 Release Notes

* An additional set of metrics relating to metadata age were added.
* No user facing changes in Cromwell 65.

## 64 Release Notes

### Intel Cascade Lake support on PAPI v2
Expand Down
45 changes: 39 additions & 6 deletions core/src/main/scala/cromwell/core/retry/Retry.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package cromwell.core.retry

import java.sql.SQLTransactionRollbackException

import akka.actor.ActorSystem
import akka.pattern.after
import com.typesafe.scalalogging.StrictLogging
import common.util.Backoff
import cromwell.core.CromwellFatalException

import scala.concurrent.duration._
import scala.language.postfixOps
import scala.concurrent.{ExecutionContext, Future}
import akka.pattern.after
import common.util.Backoff
import scala.language.postfixOps

object Retry extends StrictLogging {

def throwableToFalse(t: Throwable) = false

def noopOnRetry(t: Throwable) = {}

object Retry {
/**
* Retries a Future on a designated backoff strategy until either a designated number of retries or a fatal error
* is reached.
Expand Down Expand Up @@ -49,7 +57,32 @@ object Retry {
}
}

def throwableToFalse(t: Throwable) = false
def noopOnRetry(t: Throwable) = {}
/**
* Retries a Future if a 'SQLTransactionRollbackException' (i.e. deadlock exception) occurs on a designated
* backoff strategy until a designated number of retries is reached.
*
* @param f A function Unit => Future which will be executed once per cycle.
* @param maxRetries Number of times to retry the future
* @param backoff An exponential backoff strategy to use for each retry strategy.
* @tparam A The return type of the Future
* @return The final completed Future[A]
*/
def withRetryForTransactionRollback[A](f: () => Future[A],
maxRetries: Int = 5,
backoff: Backoff = SimpleExponentialBackoff(5 seconds, 10 seconds, 1.1D))
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[A] = {
val delay = backoff.backoffMillis.millis

f() recoverWith {
case throwable if throwable.isInstanceOf[SQLTransactionRollbackException] =>
val retriesLeft = maxRetries - 1
if (retriesLeft > 0) {
logger.info(s"Received 'SQLTransactionRollbackException'. Retries left $retriesLeft. Will retry now...")
after(delay, actorSystem.scheduler)(withRetryForTransactionRollback(f, retriesLeft, backoff.next))
} else {
Future.failed(new CromwellFatalException(throwable))
}
}
}
}

2 changes: 1 addition & 1 deletion cromwell.example.backends/cromwell.examples.conf
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ services {
# config {
# # For the standard MetadataService implementation, cromwell.services.metadata.impl.MetadataServiceActor:
# # Set this value to "Inf" to turn off metadata summary refresh. The default value is currently "1 second".
# metadata-summary-refresh-interval = "Inf"
# metadata-summary-refresh-interval = "1 second"
#
# # Set this value to the maximum number of metadata rows to be considered per summarization cycle.
# metadata-summary-refresh-limit = 5000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
countSummaryQueueEntries()
)

override def getMetadataArchiveStatus(workflowId: String)(implicit ec: ExecutionContext): Future[Option[String]] = {
val action = dataAccess.metadataArchiveStatusByWorkflowId(workflowId).result.headOption
runTransaction(action).map(_.flatten)
override def getMetadataArchiveStatusAndEndTime(workflowId: String)(implicit ec: ExecutionContext): Future[(Option[String], Option[Timestamp])] = {
val action = dataAccess.metadataArchiveStatusAndEndTimeByWorkflowId(workflowId).result.headOption
runTransaction(action).map(_.getOrElse((None, None)))
}

override def queryWorkflowsToArchiveThatEndedOnOrBeforeThresholdTimestamp(workflowStatuses: List[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ trait WorkflowMetadataSummaryEntryComponent {
if workflowMetadataSummaryEntry.workflowExecutionUuid === workflowExecutionUuid
} yield workflowMetadataSummaryEntry.metadataArchiveStatus)

val metadataArchiveStatusAndEndTimeByWorkflowId = Compiled(
(workflowExecutionUuid: Rep[String]) => for {
workflowMetadataSummaryEntry <- workflowMetadataSummaryEntries
if workflowMetadataSummaryEntry.workflowExecutionUuid === workflowExecutionUuid
} yield (workflowMetadataSummaryEntry.metadataArchiveStatus, workflowMetadataSummaryEntry.endTimestamp))

private def fetchAllWorkflowsToArchiveThatEndedOnOrBeforeThresholdTimestamp(workflowStatuses: List[String],
workflowEndTimestampThreshold: Timestamp): Query[WorkflowMetadataSummaryEntries, WorkflowMetadataSummaryEntry, Seq] = {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ trait MetadataSqlDatabase extends SqlDatabase {

def getSummaryQueueSize()(implicit ec: ExecutionContext): Future[Int]

def getMetadataArchiveStatus(workflowId: String)(implicit ec: ExecutionContext): Future[Option[String]]
def getMetadataArchiveStatusAndEndTime(workflowId: String)(implicit ec: ExecutionContext): Future[(Option[String], Option[Timestamp])]

def queryWorkflowsToArchiveThatEndedOnOrBeforeThresholdTimestamp(workflowStatuses: List[String],
workflowEndTimestampThreshold: Timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package cromwell.engine.workflow.workflowstore

import java.time.OffsetDateTime

import akka.actor.ActorRef
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.ask
import akka.util.Timeout
import cats.data.NonEmptyVector
import cromwell.core.WorkflowId
import cromwell.core.retry.Retry.withRetryForTransactionRollback
import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.WorkflowStoreAbortResponse.WorkflowStoreAbortResponse

import scala.concurrent.duration.FiniteDuration
Expand All @@ -24,16 +25,16 @@ import scala.concurrent.{ExecutionContext, Future}
sealed trait WorkflowStoreAccess {
def writeWorkflowHeartbeats(workflowIds: NonEmptyVector[(WorkflowId, OffsetDateTime)],
heartbeatDateTime: OffsetDateTime)
(implicit ec: ExecutionContext): Future[Int]
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int]

def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
(implicit ec: ExecutionContext): Future[List[WorkflowToStart]]
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]]

def abort(workflowId: WorkflowId)
(implicit ec: ExecutionContext): Future[WorkflowStoreAbortResponse]
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[WorkflowStoreAbortResponse]

def deleteFromStore(workflowId: WorkflowId)
(implicit ec: ExecutionContext): Future[Int]
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int]

}

Expand All @@ -45,20 +46,20 @@ case class UncoordinatedWorkflowStoreAccess(store: WorkflowStore) extends Workfl

override def writeWorkflowHeartbeats(workflowIds: NonEmptyVector[(WorkflowId, OffsetDateTime)],
heartbeatDateTime: OffsetDateTime)
(implicit ec: ExecutionContext): Future[Int] = {
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int] = {
store.writeWorkflowHeartbeats(workflowIds.toVector.toSet, heartbeatDateTime)
}

override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] = {
store.fetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl)
}

override def deleteFromStore(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[Int] = {
override def deleteFromStore(workflowId: WorkflowId)(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int] = {
store.deleteFromStore(workflowId)
}

override def abort(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowStoreAbortResponse] = {
override def abort(workflowId: WorkflowId)(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[WorkflowStoreAbortResponse] = {
store.abort(workflowId)
}
}
Expand All @@ -72,22 +73,30 @@ case class CoordinatedWorkflowStoreAccess(coordinatedWorkflowStoreAccessActor: A

override def writeWorkflowHeartbeats(workflowIds: NonEmptyVector[(WorkflowId, OffsetDateTime)],
heartbeatDateTime: OffsetDateTime)
(implicit ec: ExecutionContext): Future[Int] = {
coordinatedWorkflowStoreAccessActor.ask(WorkflowStoreCoordinatedAccessActor.WriteHeartbeats(workflowIds, heartbeatDateTime)).mapTo[Int]
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int] = {
withRetryForTransactionRollback(
() => coordinatedWorkflowStoreAccessActor.ask(WorkflowStoreCoordinatedAccessActor.WriteHeartbeats(workflowIds, heartbeatDateTime)).mapTo[Int]
)
}

override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] = {
val message = WorkflowStoreCoordinatedAccessActor.FetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl)
coordinatedWorkflowStoreAccessActor.ask(message).mapTo[List[WorkflowToStart]]
withRetryForTransactionRollback(
() => coordinatedWorkflowStoreAccessActor.ask(message).mapTo[List[WorkflowToStart]]
)
}

override def deleteFromStore(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[Int] = {
coordinatedWorkflowStoreAccessActor.ask(WorkflowStoreCoordinatedAccessActor.DeleteFromStore(workflowId)).mapTo[Int]
override def deleteFromStore(workflowId: WorkflowId)(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int] = {
withRetryForTransactionRollback(
() => coordinatedWorkflowStoreAccessActor.ask(WorkflowStoreCoordinatedAccessActor.DeleteFromStore(workflowId)).mapTo[Int]
)
}

override def abort(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[WorkflowStoreAbortResponse] = {
coordinatedWorkflowStoreAccessActor.ask(WorkflowStoreCoordinatedAccessActor.Abort(workflowId)).mapTo[WorkflowStoreAbortResponse]
override def abort(workflowId: WorkflowId)(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[WorkflowStoreAbortResponse] = {
withRetryForTransactionRollback(
() => coordinatedWorkflowStoreAccessActor.ask(WorkflowStoreCoordinatedAccessActor.Abort(workflowId)).mapTo[WorkflowStoreAbortResponse]
)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cromwell.engine.workflow.workflowstore

import java.time.OffsetDateTime

import akka.actor.{Actor, Props, Status}
import akka.actor.{Actor, ActorSystem, Props, Status}
import cats.data.NonEmptyVector
import cromwell.core.{Dispatcher, WorkflowId}
import cromwell.engine.workflow.workflowstore.WorkflowStoreCoordinatedAccessActor._
Expand All @@ -20,6 +20,7 @@ import scala.util.{Failure, Success, Try}
*/
class WorkflowStoreCoordinatedAccessActor(workflowStore: WorkflowStore) extends Actor {
implicit val ec: ExecutionContext = context.system.dispatcher
implicit val actorSystem: ActorSystem = context.system

def run[A](future: Future[A]): Unit = {
val result = Try(Await.result(future, Timeout)) match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cromwell.engine.workflow.workflowstore

import akka.actor.{ActorLogging, ActorRef, LoggingFSM, PoisonPill, Props, Timers}
import akka.actor.{ActorLogging, ActorRef, ActorSystem, LoggingFSM, PoisonPill, Props, Timers}
import cats.data.NonEmptyList
import cromwell.core.Dispatcher._
import cromwell.core.WorkflowProcessingEvents.DescriptionEventValue.PickedUp
Expand All @@ -27,6 +27,7 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore,
workflowHeartbeatConfig: WorkflowHeartbeatConfig)
extends LoggingFSM[WorkflowStoreActorState, WorkflowStoreActorData] with ActorLogging with WorkflowInstrumentation with CromwellInstrumentationScheduler with WorkflowMetadataHelper with Timers {

implicit val actorSystem: ActorSystem = context.system
implicit val ec: ExecutionContext = context.dispatcher

startWith(Unstarted, WorkflowStoreActorData(None, List.empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cromwell.engine.workflow.workflowstore
import java.time.{OffsetDateTime, Duration => JDuration}
import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, CoordinatedShutdown, Props}
import akka.actor.{ActorRef, ActorSystem, CoordinatedShutdown, Props}
import cats.data.{NonEmptyList, NonEmptyVector}
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.core.WorkflowId
Expand All @@ -26,6 +26,8 @@ case class WorkflowStoreHeartbeatWriteActor(workflowStoreAccess: WorkflowStoreAc
flushRate = workflowHeartbeatConfig.heartbeatInterval,
batchSize = workflowHeartbeatConfig.writeBatchSize) {

implicit val actorSystem: ActorSystem = context.system

override val threshold = workflowHeartbeatConfig.writeThreshold

private val failureShutdownDuration = workflowHeartbeatConfig.failureShutdownDuration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cromwell.jobstore

import akka.actor.{ActorRef, Props}
import akka.actor.{ActorRef, ActorSystem, Props}
import cats.data.{NonEmptyList, NonEmptyVector}
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.core.LoadConfig
Expand All @@ -27,6 +27,8 @@ case class JobStoreWriterActor(jsd: JobStore,
extends EnhancedBatchActor[CommandAndReplyTo[JobStoreWriterCommand]](flushRate, batchSize) {

override protected def process(nonEmptyData: NonEmptyVector[CommandAndReplyTo[JobStoreWriterCommand]]): Future[Int] = instrumentedProcess {
implicit val actorSystem: ActorSystem = context.system

val data = nonEmptyData.toVector
log.debug("Flushing {} job store commands to the DB", data.length)
val completions = data.collect({ case CommandAndReplyTo(c: JobStoreWriterCommand, _) => c.completion })
Expand All @@ -50,9 +52,9 @@ case class JobStoreWriterActor(jsd: JobStore,
combinedAction onComplete {
case Success(_) =>
data foreach { case CommandAndReplyTo(c: JobStoreWriterCommand, r) => r ! JobStoreWriteSuccess(c) }
case Failure(regerts) =>
log.error(regerts, "Failed to write job store entries to database")
data foreach { case CommandAndReplyTo(_, r) => r ! JobStoreWriteFailure(regerts) }
case Failure(error) =>
log.error(error, "Failed to write job store entries to database")
data foreach { case CommandAndReplyTo(_, r) => r ! JobStoreWriteFailure(error) }
}

combinedAction.map(_ => 1)
Expand Down
Loading

0 comments on commit ed067b8

Please sign in to comment.