From 2e6ab295059d729ebe6f46168a974827b18e0fc7 Mon Sep 17 00:00:00 2001 From: Scott Frazer Date: Wed, 10 Jun 2015 14:07:59 -0400 Subject: [PATCH] Better logging, more docs --- README.md | 14 +--- build.sbt | 4 +- src/main/resources/application.conf | 12 ++++ src/main/resources/logback.xml | 30 ++++++++ src/main/resources/reference.conf | 4 +- src/main/scala/cromwell/Main.scala | 43 +++++++++--- .../binding/formatter/SyntaxFormatter.scala | 12 ++-- src/main/scala/cromwell/binding/package.scala | 1 + .../scala/cromwell/engine/CallActor.scala | 21 +++--- .../cromwell/engine/ExecutionStore.scala | 17 +++-- .../engine/SingleWorkflowRunnerActor.scala | 28 ++++---- .../scala/cromwell/engine/StoreActor.scala | 17 +++-- .../scala/cromwell/engine/SymbolStore.scala | 1 - .../scala/cromwell/engine/WorkflowActor.scala | 68 ++++++++++++------- .../cromwell/logging/CromwellLogger.scala | 31 +++++++++ .../cromwell/server/CromwellServer.scala | 2 +- ...ala => DefaultWorkflowManagerSystem.scala} | 11 ++- .../scala/cromwell/util/TerminalUtil.scala | 5 ++ .../scala/cromwell/CromwellTestkitSpec.scala | 25 +++++-- .../scala/cromwell/HelloWorldActorSpec.scala | 17 +---- src/test/scala/cromwell/MainSpec.scala | 20 ++++++ .../scala/cromwell/ThreeStepActorSpec.scala | 37 ++++------ .../engine/ActorWorkflowManagerSpec.scala | 14 ++-- .../SingleWorkflowRunnerActorSpec.scala | 17 ++--- 24 files changed, 291 insertions(+), 160 deletions(-) create mode 100644 src/main/resources/application.conf create mode 100644 src/main/resources/logback.xml create mode 100644 src/main/scala/cromwell/logging/CromwellLogger.scala rename src/main/scala/cromwell/server/{WorkflowManagerSystem.scala => DefaultWorkflowManagerSystem.scala} (57%) create mode 100644 src/main/scala/cromwell/util/TerminalUtil.scala diff --git a/README.md b/README.md index 387d004d0ad..67b8478cac3 100644 --- a/README.md +++ b/README.md @@ -370,18 +370,6 @@ Contains implementations of an interface to launch jobs. `cromwell.engine` will ### cromwell.engine -![Engine Actors](http://i.imgur.com/ByFUakW.png) +![Engine Actors](http://i.imgur.com/sF9vMt2.png) Contains the Akka code and actor system to execute a workflow. This layer should operate entirely on objects returned from the `cromwell.binding` layer. - -|Start|End|Message|Parameters|State Change| -|-----|---|-------|----------|------------| -|![WMA](http://i.imgur.com/98ugXkZ.png)|![WA](http://i.imgur.com/eJxn3wu.png)|Start|| -|![WMA](http://i.imgur.com/98ugXkZ.png)|![WA](http://i.imgur.com/eJxn3wu.png)|SubscribeTransitionCallback|WorkflowManagerActor| -|![WA](http://i.imgur.com/eJxn3wu.png)|![SA](http://i.imgur.com/JzNNe64.png)|UpdateStatus|Call,status| -|![WA](http://i.imgur.com/eJxn3wu.png)|![SA](http://i.imgur.com/JzNNe64.png)|CallCompleted|Call,outputs| -|![WA](http://i.imgur.com/eJxn3wu.png)|![SA](http://i.imgur.com/JzNNe64.png)|GetOutputs|| -|![WA](http://i.imgur.com/eJxn3wu.png)|![CA](http://i.imgur.com/Nyoln74.png)|Start|| - -> **TODO**: This table is not complete. - diff --git a/build.sbt b/build.sbt index 28ca585dc2f..57a481826af 100644 --- a/build.sbt +++ b/build.sbt @@ -21,9 +21,11 @@ libraryDependencies ++= Seq( "io.spray" %% "spray-http" % sprayV, "io.spray" %% "spray-json" % DowngradedSprayV, "com.typesafe.akka" %% "akka-actor" % akkaV, + "com.typesafe.akka" %% "akka-slf4j" % akkaV, "commons-codec" % "commons-codec" % "1.10", "ch.qos.logback" % "logback-classic" % "1.1.3", - //---------- Test libraries -------------------// + "ch.qos.logback" % "logback-access" % "1.1.3", + "org.codehaus.janino" % "janino" % "2.7.8", "io.spray" %% "spray-testkit" % sprayV % Test, "org.scalatest" %% "scalatest" % "2.2.5" % Test, "com.typesafe.akka" %% "akka-testkit" % akkaV % Test diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 00000000000..399ad9a0300 --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,12 @@ +akka { + event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "DEBUG" + actor { + debug { + receive = on + event-stream = on + fsm = on + } + } +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 00000000000..63d4cdb6d1b --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,30 @@ + + + + + + true + + logFile.%d{yyyy-MM-dd}.log + 30 + + + %-4relative [%thread] %-5level %logger{35} - %msg%n + + + + + + + + + + + + + + + + + + diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index efb9a5199cd..a5acfebaec1 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -1,8 +1,6 @@ webservice.port = 8000 webservice.interface = 0.0.0.0 instance.name = "reference" -#vault.url = "https://api-ci.vault.broadinstitute.org/api" -#ces.url = "https://snoop-dev.broadinstitute.org" swagger { apiDocs = "api-docs" @@ -25,4 +23,4 @@ spray.can { request-timeout = 40s connecting-timeout = 40s } -} \ No newline at end of file +} diff --git a/src/main/scala/cromwell/Main.scala b/src/main/scala/cromwell/Main.scala index 89422e661c7..21f4b797b69 100644 --- a/src/main/scala/cromwell/Main.scala +++ b/src/main/scala/cromwell/Main.scala @@ -4,22 +4,35 @@ import java.io.File import java.nio.file.Paths import cromwell.binding._ -import cromwell.engine.{WorkflowManagerActor, SingleWorkflowRunnerActor} import cromwell.binding.formatter.{AnsiSyntaxHighlighter, SyntaxFormatter} +import cromwell.engine.SingleWorkflowRunnerActor import cromwell.parser.WdlParser.SyntaxError -import cromwell.server.{WorkflowManagerSystem, CromwellServer} +import cromwell.server.{CromwellServer, DefaultWorkflowManagerSystem, WorkflowManagerSystem} import cromwell.util.FileUtil +import org.slf4j.LoggerFactory import spray.json._ -import scala.util.{Failure, Success} - object Actions extends Enumeration { val Parse, Validate, Highlight, Run, Inputs, Server = Value } object Main extends App { - getAction(args.headOption map { _.capitalize }) match { + val props = System.getProperties + val loggerProperty = "CROMWELL_LOGGER" + Option(props.getProperty(loggerProperty)) match { + case None => args.headOption.map {_.capitalize}.find {_ == "SERVER"} match { + case Some(x) => props.setProperty(loggerProperty, "SERVER") + case _ => + } + case _ => + } + + lazy val log = LoggerFactory.getLogger("main") + + getAction(args.headOption map { + _.capitalize + }) match { case Some(x) if x == Actions.Validate => validate(args.tail) case Some(x) if x == Actions.Highlight => highlight(args.tail) case Some(x) if x == Actions.Inputs => inputs(args.tail) @@ -35,7 +48,7 @@ object Main extends App { try { WdlNamespace.load(new File(args(0))) } catch { - case e:SyntaxError => println(e) + case e: SyntaxError => println(e) } } @@ -52,13 +65,17 @@ object Main extends App { val namespace = WdlNamespace.load(new File(args(0))) println(namespace.workflows.head.inputs.toJson.prettyPrint) } catch { - case e:SyntaxError => println(e) + case e: SyntaxError => println(e) } } - def run(args: Array[String]): Unit = { + def run(args: Array[String], workflowManagerSystem: WorkflowManagerSystem = new DefaultWorkflowManagerSystem): Unit = { if (args.length != 2) usageAndExit() + log.info(s"RUN sub-command") + log.info(s" WDL file: ${args(0)}") + log.info(s" Inputs: ${args(1)}") + try { val wdl = FileUtil.slurp(Paths.get(args(0))) val jsValue = FileUtil.slurp(Paths.get(args(1))).parseJson @@ -68,9 +85,11 @@ object Main extends App { case _ => throw new RuntimeException("Expecting a JSON object") } - val workflowManagerSystem = new WorkflowManagerSystem {} + inputs foreach { case (k, v) => log.info(s"input: $k => $v") } val singleWorkflowRunner = SingleWorkflowRunnerActor.props(wdl, inputs, workflowManagerSystem.workflowManagerActor) + val actor = workflowManagerSystem.actorSystem.actorOf(singleWorkflowRunner) + workflowManagerSystem.actorSystem.awaitTermination() // And now we just wait for the magic to happen } catch { case e: Exception => @@ -121,11 +140,13 @@ object Main extends App { | Starts a web server on port 8000. See the web server | documentation for more details about the API endpoints. """.stripMargin) - if(exit) System.exit(-1) + if (exit) System.exit(-1) } def getAction(firstArg: Option[String]): Option[Actions.Value] = for { arg <- firstArg - a <- Actions.values find { _.toString == arg } + a <- Actions.values find { + _.toString == arg + } } yield a } diff --git a/src/main/scala/cromwell/binding/formatter/SyntaxFormatter.scala b/src/main/scala/cromwell/binding/formatter/SyntaxFormatter.scala index 58f61b91917..292b8e6c78f 100644 --- a/src/main/scala/cromwell/binding/formatter/SyntaxFormatter.scala +++ b/src/main/scala/cromwell/binding/formatter/SyntaxFormatter.scala @@ -4,6 +4,7 @@ import cromwell.binding._ import cromwell.binding.command.{Command, ParameterCommandPart, StringCommandPart} import cromwell.binding.types.WdlType import cromwell.parser.WdlParser.{Ast, AstList, AstNode, Terminal} +import cromwell.util.TerminalUtil import scala.collection.JavaConverters._ @@ -21,15 +22,14 @@ trait SyntaxHighlighter { object NullSyntaxHighlighter extends SyntaxHighlighter object AnsiSyntaxHighlighter extends SyntaxHighlighter { - def highlight(string: String, color: Int) = s"\033[38;5;${color}m${string}\033[0m" - override def keyword(s: String): String = highlight(s, 214) - override def name(s: String): String = highlight(s, 253) + override def keyword(s: String): String = TerminalUtil.highlight(214, s) + override def name(s: String): String = TerminalUtil.highlight(253, s) override def section(s: String): String = s - override def wdlType(t: WdlType): String = highlight(t.toWdlString, 33) - override def variable(s: String): String = highlight(s, 112) + override def wdlType(t: WdlType): String = TerminalUtil.highlight(33, t.toWdlString) + override def variable(s: String): String = TerminalUtil.highlight(112, s) override def alias(s: String): String = s override def command(s: String): String = s - override def function(s: String): String = highlight(s, 13) + override def function(s: String): String = TerminalUtil.highlight(13, s) } object HtmlSyntaxHighlighter extends SyntaxHighlighter { diff --git a/src/main/scala/cromwell/binding/package.scala b/src/main/scala/cromwell/binding/package.scala index cf9778c81c7..5fe51407e3b 100644 --- a/src/main/scala/cromwell/binding/package.scala +++ b/src/main/scala/cromwell/binding/package.scala @@ -40,6 +40,7 @@ package object binding { */ case class WorkflowDescriptor(namespace: WdlNamespace, actualInputs: WorkflowCoercedInputs) { val id = UUID.randomUUID() + val shortId = id.toString.split("-")(0) val name = namespace.workflows.head.name } } diff --git a/src/main/scala/cromwell/engine/CallActor.scala b/src/main/scala/cromwell/engine/CallActor.scala index 69c2fd7938e..e61d45ffd37 100644 --- a/src/main/scala/cromwell/engine/CallActor.scala +++ b/src/main/scala/cromwell/engine/CallActor.scala @@ -12,25 +12,24 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.language.postfixOps - object CallActor { sealed trait CallActorMessage case object Start extends CallActorMessage - def props(call: Call, backend: Backend, workflowDescriptor: WorkflowDescriptor, storeActor: ActorRef, name: String): Props = + def props(call: Call, backend: Backend, workflowDescriptor: WorkflowDescriptor, storeActor: ActorRef): Props = Props(new CallActor(call, backend, workflowDescriptor, storeActor)) } /** Actor to manage the execution of a single call. */ class CallActor(call: Call, backend: Backend, workflowDescriptor: WorkflowDescriptor, storeActor: ActorRef) extends Actor with CromwellActor { - - private val log = Logging(context.system, this) + private val log = Logging(context.system, classOf[CallActor]) + val tag = s"CallActor [UUID(${workflowDescriptor.shortId}):${call.name}]" override def receive = LoggingReceive { case CallActor.Start => handleStart() case badMessage => - val diagnostic = s"Received unexpected message $badMessage." + val diagnostic = s"$tag: unexpected message $badMessage." log.error(diagnostic) context.parent ! WorkflowActor.CallFailed(call, diagnostic) } @@ -58,6 +57,7 @@ class CallActor(call: Call, backend: Backend, workflowDescriptor: WorkflowDescri backendInputs = backend.adjustInputPaths(call, inputs) commandLine <- Future.fromTry(call.instantiateCommandLine(backendInputs)) } yield { + log.info(s"$tag: launching `$commandLine`") originalSender ! WorkflowActor.CallStarted(call) val tryOutputs = backend.executeCommand(commandLine, workflowDescriptor, call, s => inputs.get(s).get) val (successes, failures) = tryOutputs.partition { @@ -67,12 +67,17 @@ class CallActor(call: Call, backend: Backend, workflowDescriptor: WorkflowDescri if (failures.isEmpty) { // Materialize the Successes. val outputs = successes.map { case (key, value) => key -> value.get } + log.info(s"$tag: success") context.parent ! WorkflowActor.CallCompleted(call, outputs) } else { - val errorMessages = TryUtil.stringifyFailures(failures.values).mkString("\n") + val errorMessages = TryUtil.stringifyFailures(failures.values) + log.error(s"$tag: failed") + errorMessages foreach {m => + log.error(s"$tag: $m") + } - log.error(errorMessages) - context.parent ! WorkflowActor.CallFailed(call, errorMessages) + log.error(errorMessages.mkString("\n")) + context.parent ! WorkflowActor.CallFailed(call, errorMessages.mkString("\n")) } } } diff --git a/src/main/scala/cromwell/engine/ExecutionStore.scala b/src/main/scala/cromwell/engine/ExecutionStore.scala index eed7f5dd4ce..bb527017718 100644 --- a/src/main/scala/cromwell/engine/ExecutionStore.scala +++ b/src/main/scala/cromwell/engine/ExecutionStore.scala @@ -1,6 +1,8 @@ package cromwell.engine -import cromwell.binding.{Call, WdlNamespace} +import akka.event.Logging +import cromwell.binding.{WorkflowDescriptor, Call, WdlNamespace} +import org.slf4j.LoggerFactory object ExecutionStatus extends Enumeration { type ExecutionStatus = Value @@ -11,13 +13,16 @@ object ExecutionStatus extends Enumeration { /** * Corresponds to the "execution table" of our discussions. */ -class ExecutionStore(namespace: WdlNamespace) { - +class ExecutionStore(workflow: WorkflowDescriptor) { + val log = LoggerFactory.getLogger("ExecutionStore") + val tag = s"ExecutionStore [UUID(${workflow.shortId})]" + var table = workflow.namespace.workflows.head.calls.map { call => call -> ExecutionStatus.NotStarted }.toMap def isWorkflowDone: Boolean = table.forall(_._2 == ExecutionStatus.Done) - def updateStatus(call: Call, status: ExecutionStatus.Value): Unit = table += (call -> status) - - var table = namespace.workflows.head.calls.map { call => call -> ExecutionStatus.NotStarted }.toMap + def updateStatus(call: Call, status: ExecutionStatus.Value): Unit = { + log.info(s"$tag: ${call.name} update to $status") + table += (call -> status) + } /** * Start all calls which are currently in state `NotStarted` and whose prerequisites are all `Done`, diff --git a/src/main/scala/cromwell/engine/SingleWorkflowRunnerActor.scala b/src/main/scala/cromwell/engine/SingleWorkflowRunnerActor.scala index e158fc337a4..a799a11f3b5 100644 --- a/src/main/scala/cromwell/engine/SingleWorkflowRunnerActor.scala +++ b/src/main/scala/cromwell/engine/SingleWorkflowRunnerActor.scala @@ -1,7 +1,7 @@ package cromwell.engine -import akka.actor.FSM.{Transition, CurrentState} -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.FSM.{CurrentState, Transition} +import akka.actor.{Actor, ActorRef, Props} import akka.event.Logging import akka.pattern.ask import akka.util.Timeout @@ -9,11 +9,12 @@ import cromwell.binding import cromwell.binding.WdlSource import cromwell.engine.WorkflowManagerActor._ import spray.json._ -import scala.concurrent.duration._ + import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.{Success, Failure} +import scala.util.{Failure, Success} object SingleWorkflowRunnerActor { def props(wdl: WdlSource, inputs: binding.WorkflowRawInputs, workflowManager: ActorRef): Props = { @@ -29,17 +30,19 @@ object SingleWorkflowRunnerActor { case class SingleWorkflowRunnerActor(wdl: WdlSource, inputs: binding.WorkflowRawInputs, workflowManager: ActorRef) extends Actor { - private val log = Logging(context.system, this) + val log = Logging(context.system, classOf[SingleWorkflowRunnerActor]) + val tag = "SingleWorkflowRunnerActor" private implicit val timeout = Timeout(5 seconds) // Note that id isn't used until *after* the submitWorkflow Future is complete private var id: WorkflowId = _ override def preStart(): Unit = { + log.info(s"$tag: launching workflow") val eventualId = workflowManager.ask(SubmitWorkflow(wdl, inputs)).mapTo[WorkflowId] eventualId onComplete { case Success(x) => subscribeToWorkflow(x) case Failure(e) => - log.error(e.getMessage) + log.error(s"$tag: ${e.getMessage}") terminate() } } @@ -47,12 +50,14 @@ case class SingleWorkflowRunnerActor(wdl: WdlSource, def receive = { case CurrentState(_, state: WorkflowState) if state.isTerminal => handleTermination(state) case Transition(_, _, state: WorkflowState) if state.isTerminal => handleTermination(state) - case m => log.debug(s"Received unexpected message: $m") + case CurrentState(_, state: WorkflowState) => + log.info(s"$tag: received CurrentState($state)") + case m => + log.warning(s"$tag: received unexpected message: $m") } private def handleTermination(state: WorkflowState): Unit = { - log.info(s"Workflow complete: $state") - + log.info(s"$tag: workflow finished with status '$state'") // If this is a successful termination, retrieve & print out the outputs if (state == WorkflowSucceeded) { val eventualOutputs = workflowManager.ask(WorkflowOutputs(id)).mapTo[binding.WorkflowOutputs] @@ -60,7 +65,6 @@ case class SingleWorkflowRunnerActor(wdl: WdlSource, import cromwell.binding.values.WdlValueJsonFormatter._ println(outputs.toJson.prettyPrint) } - terminate() } @@ -72,11 +76,11 @@ case class SingleWorkflowRunnerActor(wdl: WdlSource, */ private def subscribeToWorkflow(workflowId: WorkflowId): Unit = { id = workflowId - log.info(s"Workflow ID: $id") + log.info(s"SingleWorkflowRunnerActor: workflow ID UUID($id)") workflowManager ! SubscribeToWorkflow(id) } private def terminate(): Unit = { context.system.shutdown() } -} \ No newline at end of file +} diff --git a/src/main/scala/cromwell/engine/StoreActor.scala b/src/main/scala/cromwell/engine/StoreActor.scala index a777c31551d..76a67702f04 100644 --- a/src/main/scala/cromwell/engine/StoreActor.scala +++ b/src/main/scala/cromwell/engine/StoreActor.scala @@ -4,6 +4,7 @@ import akka.actor.{Actor, Props} import akka.event.{Logging, LoggingReceive} import cromwell.binding._ import cromwell.binding.values.WdlValue +import cromwell.engine.ExecutionStatus.ExecutionStatus import cromwell.engine.StoreActor._ import cromwell.util.TryUtil @@ -11,8 +12,7 @@ import scala.language.postfixOps import scala.util.Try object StoreActor { - def props(namespace: WdlNamespace, hostInputs: HostInputs) = Props(new StoreActor(namespace, hostInputs)) - + def props(workflow: WorkflowDescriptor, hostInputs: HostInputs) = Props(new StoreActor(workflow, hostInputs)) sealed trait StoreActorMessage case class CallCompleted(call: Call, callOutputs: Map[String, WdlValue]) extends StoreActorMessage case object StartRunnableCalls extends StoreActorMessage @@ -24,10 +24,11 @@ object StoreActor { /** * Actor to hold symbol and execution status data for a single workflow. This actor * guards mutable state over the symbol and execution stores. */ -class StoreActor(namespace: WdlNamespace, hostInputs: HostInputs) extends Actor with CromwellActor { - private val symbolStore = new SymbolStore(namespace, hostInputs) - private val executionStore = new ExecutionStore(namespace) +class StoreActor(workflow: WorkflowDescriptor, hostInputs: HostInputs) extends Actor with CromwellActor { + private val symbolStore = new SymbolStore(workflow.namespace, hostInputs) + private val executionStore = new ExecutionStore(workflow) private val log = Logging(context.system, this) + val tag = s"StoreActor [UUID(${workflow.shortId})]" override def receive: Receive = LoggingReceive { case CallCompleted(call, callOutputs) => @@ -44,11 +45,15 @@ class StoreActor(namespace: WdlNamespace, hostInputs: HostInputs) extends Actor private def symbolStoreEntryToMapEntry(e: SymbolStoreEntry): (String, WdlValue) = { e.key.scope + "." + e.key.name -> e.wdlValue.get } - + private def updateOutputs(call: Call, callOutputs: Map[String, WdlValue]): Unit = { def addOutputValueToSymbolStore(callOutput: (String, WdlValue)): Try[Unit] = symbolStore.addOutputValue(call.fullyQualifiedName, callOutput._1, Some(callOutput._2), callOutput._2.wdlType) + callOutputs foreach {case (k, v) => + log.info(s"$tag: set ${call.fullyQualifiedName}.$k => $v") + } + val addedEntries = callOutputs map addOutputValueToSymbolStore val failureMessages = TryUtil.stringifyFailures(addedEntries) diff --git a/src/main/scala/cromwell/engine/SymbolStore.scala b/src/main/scala/cromwell/engine/SymbolStore.scala index 061674210de..2a3c633a146 100644 --- a/src/main/scala/cromwell/engine/SymbolStore.scala +++ b/src/main/scala/cromwell/engine/SymbolStore.scala @@ -20,7 +20,6 @@ case class SymbolStoreEntry(key: SymbolStoreKey, wdlType: WdlType, wdlValue: Opt } class SymbolStore(namespace: WdlNamespace, inputs: HostInputs) { - private val store = mutable.Set[SymbolStoreEntry]() inputs.foreach { case (fullyQualifiedName, value) => diff --git a/src/main/scala/cromwell/engine/WorkflowActor.scala b/src/main/scala/cromwell/engine/WorkflowActor.scala index 7f43f2d8c9c..6c899cc0c39 100644 --- a/src/main/scala/cromwell/engine/WorkflowActor.scala +++ b/src/main/scala/cromwell/engine/WorkflowActor.scala @@ -1,6 +1,7 @@ package cromwell.engine import akka.actor.{FSM, LoggingFSM, Props} +import akka.event.Logging import akka.pattern.{ask, pipe} import cromwell.binding._ import cromwell.engine.WorkflowActor._ @@ -27,9 +28,11 @@ object WorkflowActor { case class FailureMessage(msg: String) extends WorkflowFailure with WorkflowActorMessage } -case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) extends LoggingFSM[WorkflowState, WorkflowFailure] with CromwellActor { +case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) extends FSM[WorkflowState, WorkflowFailure] with CromwellActor { - private val storeActor = context.actorOf(StoreActor.props(workflow.namespace, backend.initializeForWorkflow(workflow))) + private val storeActor = context.actorOf(StoreActor.props(workflow, backend.initializeForWorkflow(workflow))) + val tag: String = s"WorkflowActor [UUID(${workflow.shortId})]" + override val log = Logging(context.system, classOf[WorkflowActor]) startWith(WorkflowSubmitted, NoFailureMessage) @@ -38,22 +41,11 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) extends } when(WorkflowRunning) { - case Event(CallStarted(call), NoFailureMessage) => - storeActor ! StoreActor.UpdateStatus(call, ExecutionStatus.Running) - stay() - case Event(CallCompleted(call, callOutputs), NoFailureMessage) => - storeActor ! StoreActor.CallCompleted(call, callOutputs) - stay() - case Event(RunnableCalls(runnableCalls), NoFailureMessage) => - if (runnableCalls.nonEmpty) { - log.info("Starting calls: " + runnableCalls.map {_.name}.toSeq.sorted.mkString(", ")) - } - runnableCalls foreach startCallActor - stay() - case Event(CallFailed(call, failure), NoFailureMessage) => - storeActor ! StoreActor.UpdateStatus(call, ExecutionStatus.Failed) - goto(WorkflowFailed) using FailureMessage(failure) - case Event(Complete, NoFailureMessage) => goto(WorkflowSucceeded) + case Event(CallStarted(call), NoFailureMessage) => updateCallStatusToRunning(call) + case Event(CallCompleted(completedCall, callOutputs), NoFailureMessage) => updateCallStatusToCompleted(completedCall, callOutputs) + case Event(RunnableCalls(runnableCalls), NoFailureMessage) => receiveRunnableCalls(runnableCalls) + case Event(CallFailed(call, failure), NoFailureMessage) => updateCallStatusToFailed(call, failure) + case Event(WorkflowActor.Complete, NoFailureMessage) => goto(WorkflowSucceeded) } when(WorkflowFailed) { @@ -78,10 +70,38 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) extends case WorkflowSubmitted -> WorkflowRunning => storeActor ! StoreActor.StartRunnableCalls } - /** Create a per-call `CallActor` for the specified `Call` and send it a `Start` message to - * begin execution. */ - private def startCallActor(call: Call): Unit = { - val callActorProps = CallActor.props(call, backend, workflow, storeActor, "CallActor-" + call.name) - context.actorOf(callActorProps) ! CallActor.Start + private def updateCallStatusToRunning(call: Call) = { + log.info(s"$tag: call '${call.name}' running") + storeActor ! StoreActor.UpdateStatus(call, ExecutionStatus.Running) + stay() + } + + private def updateCallStatusToCompleted(call: Call, outputs: WorkflowOutputs) = { + log.info(s"$tag: call '${call.name}' completed") + storeActor ! StoreActor.CallCompleted(call, outputs) + stay() + } + + private def updateCallStatusToFailed(call: Call, failure: String) = { + log.info(s"$tag: call '${call.name}' failed ($failure)") + storeActor ! StoreActor.UpdateStatus(call, ExecutionStatus.Failed) + goto(WorkflowFailed) using FailureMessage(failure) + } + + private def unknownMessage(e: Any) = { + log.warning(s"$tag: Unexpected message: $e") + stay() } -} \ No newline at end of file + + private def receiveRunnableCalls(calls: Iterable[Call]) = { + if (calls.nonEmpty) { + log.info(s"$tag: starting " + calls.map {_.name}.toSeq.sorted.mkString(", ")) + } + calls foreach { call => + log.info(s"$tag: launching CallActor for '${call.name}'") + val callActorProps = CallActor.props(call, backend, workflow, storeActor) + context.actorOf(callActorProps) ! CallActor.Start + } + stay() + } +} diff --git a/src/main/scala/cromwell/logging/CromwellLogger.scala b/src/main/scala/cromwell/logging/CromwellLogger.scala new file mode 100644 index 00000000000..3ee068476a7 --- /dev/null +++ b/src/main/scala/cromwell/logging/CromwellLogger.scala @@ -0,0 +1,31 @@ +package cromwell.logging + +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.{ConsoleAppender, LayoutBase} +import cromwell.util.TerminalUtil + +import scala.collection.JavaConverters._ + +class TerminalLayout extends LayoutBase[ILoggingEvent] { + def doLayout(event: ILoggingEvent): String = { + val level = event.getLevel match { + case Level.WARN => TerminalUtil.highlight(220, "warn") + case Level.ERROR => TerminalUtil.highlight(1, "error") + case x => x.toString.toLowerCase + } + + val highlightedMessage = event.getFormattedMessage + .replaceAll("UUID\\((.*?)\\)", TerminalUtil.highlight(2, "$1")) + .replaceAll("`(.*?)`", TerminalUtil.highlight(9, "$1")) + + /* For some reason a '{}' is the value of getMessage only for Actors. + This prepends a highlighted asterisk to messages that come from actors. + */ + val prefix = if (event.getMessage == "{}") s"[${TerminalUtil.highlight(129, "*")}] " else "" + + s"$prefix[$level] $highlightedMessage\n" + } +} + +class TerminalAppender extends ConsoleAppender diff --git a/src/main/scala/cromwell/server/CromwellServer.scala b/src/main/scala/cromwell/server/CromwellServer.scala index 10d34c45df5..cc2a0683fba 100644 --- a/src/main/scala/cromwell/server/CromwellServer.scala +++ b/src/main/scala/cromwell/server/CromwellServer.scala @@ -15,7 +15,7 @@ import scala.reflect.runtime.universe._ import scala.util.{Failure, Success} // Note that as per the language specification, this is instiated lazily and only used when necessary (i.e. server mode) -object CromwellServer extends WorkflowManagerSystem { +object CromwellServer extends DefaultWorkflowManagerSystem { val conf = ConfigFactory.parseFile(new File("/etc/cromwell.conf")) val swaggerConfig = conf.getConfig("swagger") diff --git a/src/main/scala/cromwell/server/WorkflowManagerSystem.scala b/src/main/scala/cromwell/server/DefaultWorkflowManagerSystem.scala similarity index 57% rename from src/main/scala/cromwell/server/WorkflowManagerSystem.scala rename to src/main/scala/cromwell/server/DefaultWorkflowManagerSystem.scala index e0655c256a1..731de8b1b24 100644 --- a/src/main/scala/cromwell/server/WorkflowManagerSystem.scala +++ b/src/main/scala/cromwell/server/DefaultWorkflowManagerSystem.scala @@ -1,13 +1,18 @@ package cromwell.server -import akka.actor.ActorSystem +import akka.actor.{ActorRef, ActorSystem} +import com.typesafe.config.ConfigFactory import cromwell.engine.WorkflowManagerActor trait WorkflowManagerSystem { + val systemName: String + implicit val actorSystem: ActorSystem + val workflowManagerActor: ActorRef +} + +case class DefaultWorkflowManagerSystem() extends WorkflowManagerSystem { val systemName = "cromwell-system" implicit val actorSystem = ActorSystem(systemName) - actorSystem.registerOnTermination {actorSystem.log.info(s"$systemName shutting down")} - val workflowManagerActor = actorSystem.actorOf(WorkflowManagerActor.props) } diff --git a/src/main/scala/cromwell/util/TerminalUtil.scala b/src/main/scala/cromwell/util/TerminalUtil.scala new file mode 100644 index 00000000000..992dff25474 --- /dev/null +++ b/src/main/scala/cromwell/util/TerminalUtil.scala @@ -0,0 +1,5 @@ +package cromwell.util + +object TerminalUtil { + def highlight(colorCode:Int, string:String) = s"\033[38;5;${colorCode}m${string}\033[0m" +} diff --git a/src/test/scala/cromwell/CromwellTestkitSpec.scala b/src/test/scala/cromwell/CromwellTestkitSpec.scala index 10c34459331..18d922d6279 100644 --- a/src/test/scala/cromwell/CromwellTestkitSpec.scala +++ b/src/test/scala/cromwell/CromwellTestkitSpec.scala @@ -2,14 +2,31 @@ package cromwell import akka.actor.ActorSystem import akka.testkit.{DefaultTimeout, EventFilter, ImplicitSender, TestKit} +import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} -abstract class CromwellTestkitSpec(actorSystem: ActorSystem) extends TestKit(actorSystem) with DefaultTimeout -with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures { +object CromwellTestkitSpec { + val akkaConfigString = + """ + |akka { + | loggers = ["akka.event.slf4j.Slf4jLogger", "akka.testkit.TestEventListener"] + | loglevel = "DEBUG" + | actor { + | debug { + | receive = on + | } + | } + |} + """.stripMargin +} + +abstract class CromwellTestkitSpec(name: String) + extends TestKit(ActorSystem(name, ConfigFactory.parseString(CromwellTestkitSpec.akkaConfigString))) + with DefaultTimeout with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures { def startingCallsFilter(callNames: String*): EventFilter = - EventFilter.info(message = s"Starting calls: ${callNames.mkString(", ")}", occurrences = 1) + EventFilter.info(pattern = s"starting ${callNames.mkString(", ")}", occurrences = 1) def waitForHandledMessage[T](named: String)(block: => T): T = { waitForHandledMessagePattern(s"^received handled message $named")(block) @@ -20,6 +37,4 @@ with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with block } } - - protected def getActorSystem = actorSystem } diff --git a/src/test/scala/cromwell/HelloWorldActorSpec.scala b/src/test/scala/cromwell/HelloWorldActorSpec.scala index 3b15bf96286..1e7ba2e6d15 100644 --- a/src/test/scala/cromwell/HelloWorldActorSpec.scala +++ b/src/test/scala/cromwell/HelloWorldActorSpec.scala @@ -5,8 +5,6 @@ import java.util.UUID import akka.actor.ActorSystem import akka.testkit._ import akka.pattern.ask -import com.typesafe.config.ConfigFactory -import cromwell.HelloWorldActorSpec._ import cromwell.binding.values.WdlString import cromwell.binding.{WorkflowDescriptor, WorkflowOutputs, UnsatisfiedInputsException, WdlNamespace} import cromwell.engine._ @@ -19,19 +17,8 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps -object HelloWorldActorSpec { - val Config = - """ - |akka { - | loggers = ["akka.testkit.TestEventListener"] - | loglevel = "DEBUG" - | actor.debug.receive = on - |} - """.stripMargin -} - // Copying from http://doc.akka.io/docs/akka/snapshot/scala/testkit-example.html#testkit-example -class HelloWorldActorSpec extends CromwellTestkitSpec(ActorSystem("HelloWorldActorSpec", ConfigFactory.parseString(Config))) { +class HelloWorldActorSpec extends CromwellTestkitSpec("HelloWorldActorSpec") { private def buildWorkflowActor(name: String = UUID.randomUUID().toString, rawInputs: binding.WorkflowRawInputs = HelloWorld.RawInputs): TestActorRef[WorkflowActor] = { val namespace = WdlNamespace.load(HelloWorld.WdlSource) @@ -59,8 +46,8 @@ class HelloWorldActorSpec extends CromwellTestkitSpec(ActorSystem("HelloWorldAct val fsm = TestFSMRef(new WorkflowActor(descriptor, new LocalBackend)) assert(fsm.stateName == WorkflowSubmitted) startingCallsFilter("hello").intercept { - fsm ! Start within(TestExecutionTimeout) { + fsm ! Start awaitCond(fsm.stateName == WorkflowRunning) awaitCond(fsm.stateName == WorkflowSucceeded) val outputName = "hello.hello.salutation" diff --git a/src/test/scala/cromwell/MainSpec.scala b/src/test/scala/cromwell/MainSpec.scala index c0a073ef217..f9dddddfc39 100644 --- a/src/test/scala/cromwell/MainSpec.scala +++ b/src/test/scala/cromwell/MainSpec.scala @@ -1,9 +1,20 @@ package cromwell +import akka.actor.ActorSystem +import akka.testkit.EventFilter +import com.typesafe.config.ConfigFactory +import cromwell.engine.WorkflowManagerActor +import cromwell.server.{WorkflowManagerSystem, DefaultWorkflowManagerSystem} import cromwell.util.FileUtil import cromwell.util.SampleWdl.ThreeStep import org.scalatest.{FlatSpec, Matchers} +trait TestWorkflowManagerSystem extends WorkflowManagerSystem { + val systemName = "cromwell-system" + implicit val actorSystem = ActorSystem(systemName, ConfigFactory.parseString(CromwellTestkitSpec.akkaConfigString)) + val workflowManagerActor = actorSystem.actorOf(WorkflowManagerActor.props) +} + class MainSpec extends FlatSpec with Matchers { val wdlFilePathAndWriter = FileUtil.tempFileAndWriter("wdl") val inputsJsonPathAndWriter = FileUtil.tempFileAndWriter("inputs") @@ -89,6 +100,15 @@ class MainSpec extends FlatSpec with Matchers { assert(stream.toString.contains("\"three_step.cgrep.pattern\"")) } + it should "run" in { + val stream = new java.io.ByteArrayOutputStream() + val workflowManagerSystem = new TestWorkflowManagerSystem {} + implicit val system = workflowManagerSystem.actorSystem + EventFilter.info(pattern = s"workflow finished", occurrences = 1).intercept { + Main.run(Array(wdlFilePathAndWriter._1.toAbsolutePath.toString, inputsJsonPathAndWriter._1.toAbsolutePath.toString), workflowManagerSystem) + } + } + it should "print usage" in { val stream = new java.io.ByteArrayOutputStream() Console.withOut(stream) { diff --git a/src/test/scala/cromwell/ThreeStepActorSpec.scala b/src/test/scala/cromwell/ThreeStepActorSpec.scala index 50f1c219ee5..67e03f02ce8 100644 --- a/src/test/scala/cromwell/ThreeStepActorSpec.scala +++ b/src/test/scala/cromwell/ThreeStepActorSpec.scala @@ -2,10 +2,8 @@ package cromwell import java.io.{File, FileWriter} -import akka.actor.ActorSystem import akka.pattern.ask import akka.testkit.TestFSMRef -import com.typesafe.config.ConfigFactory import cromwell.CromwellSpec.DockerTest import cromwell.binding._ import cromwell.binding.values.{WdlInteger, WdlValue} @@ -18,15 +16,6 @@ import scala.concurrent.duration._ import scala.language.postfixOps object ThreeStepActorSpec { - val Config = - """ - |akka { - | loggers = ["akka.testkit.TestEventListener"] - | loglevel = "DEBUG" - | actor.debug.receive = on - |} - """.stripMargin - def wdlSource(runtime: String): WdlSource = """ |task ps { @@ -82,7 +71,14 @@ object ThreeStepActorSpec { |joeblaux 6440 1.4 2.2 4496164 362136 ?? S Sun09PM 74:29.40 /Applications/Google Chrome.app/Contents/MacOS/Google Chrome """.stripMargin.trim - val TestExecutionTimeout = 5000 milliseconds + def createDummyPsFile: File = { + val file = File.createTempFile("dummy_ps", ".out") + val writer = new FileWriter(file) + writer.write(DummyProcessOutput) + writer.flush() + writer.close() + file + } object Inputs { val Pattern = "three_step.cgrep.pattern" @@ -93,26 +89,17 @@ object ThreeStepActorSpec { val DummyPs3File = "three_step.ps3.dummy_ps_file" } - def createDummyPsFile: File = { - val file = File.createTempFile("dummy_ps", ".out") - val writer = new FileWriter(file) - writer.write(DummyProcessOutput) - writer.flush() - writer.close() - file - } } -class ThreeStepActorSpec extends CromwellTestkitSpec(ActorSystem("ThreeStepActorSpec", ConfigFactory.parseString(ThreeStepActorSpec.Config))) { +class ThreeStepActorSpec extends CromwellTestkitSpec("ThreeStepActorSpec") { import ThreeStepActorSpec._ - private def buildWorkflowActorFsm(runtime: String) = { - import ThreeStepActorSpec._ val workflowInputs = Map( Inputs.Pattern -> "joeblaux", Inputs.DummyPsFile -> createDummyPsFile.getAbsolutePath, Inputs.DummyPs2File -> createDummyPsFile.getAbsolutePath, - Inputs.DummyPs3File -> createDummyPsFile.getAbsolutePath) + Inputs.DummyPs3File -> createDummyPsFile.getAbsolutePath + ) val namespace = WdlNamespace.load(ThreeStepActorSpec.wdlSource(runtime)) // This is a test and is okay with just throwing if coerceRawInputs returns a Failure. @@ -133,7 +120,7 @@ class ThreeStepActorSpec extends CromwellTestkitSpec(ActorSystem("ThreeStepActor assert(fsm.stateName == WorkflowSubmitted) startingCallsFilter("cgrep", "wc").intercept { fsm ! Start - within(TestExecutionTimeout) { + within(5000 milliseconds) { awaitCond(fsm.stateName == WorkflowRunning) awaitCond(fsm.stateName == WorkflowSucceeded) val outputs = Await.result(fsm.ask(GetOutputs).mapTo[WorkflowOutputs], 5 seconds) diff --git a/src/test/scala/cromwell/engine/ActorWorkflowManagerSpec.scala b/src/test/scala/cromwell/engine/ActorWorkflowManagerSpec.scala index 651ac2c2522..5d91cbc2962 100644 --- a/src/test/scala/cromwell/engine/ActorWorkflowManagerSpec.scala +++ b/src/test/scala/cromwell/engine/ActorWorkflowManagerSpec.scala @@ -1,21 +1,16 @@ package cromwell.engine -import akka.actor.ActorSystem import akka.testkit.TestActorRef -import com.typesafe.config.ConfigFactory -import cromwell.{binding, CromwellTestkitSpec} -import cromwell.HelloWorldActorSpec._ -import cromwell.binding.FullyQualifiedName -import cromwell.binding.values.{WdlString, WdlValue} +import cromwell.CromwellTestkitSpec +import cromwell.binding.values.WdlString import cromwell.engine.WorkflowManagerActor.{SubmitWorkflow, WorkflowOutputs, WorkflowStatus} import cromwell.util.ActorTestUtil import cromwell.util.SampleWdl.HelloWorld +import cromwell.{CromwellSpec, binding} import scala.language.{higherKinds, postfixOps, reflectiveCalls} - -class ActorWorkflowManagerSpec extends CromwellTestkitSpec(ActorSystem("ActorWorkflowManagerSpec", ConfigFactory.parseString(Config))) { - +class ActorWorkflowManagerSpec extends CromwellTestkitSpec("ActorWorkflowManagerSpec") { "An ActorWorkflowManager" should { "run the Hello World workflow" in { implicit val workflowManagerActor = TestActorRef(WorkflowManagerActor.props, self, "Test the ActorWorkflowManager") @@ -28,7 +23,6 @@ class ActorWorkflowManagerSpec extends CromwellTestkitSpec(ActorSystem("ActorWor status shouldEqual WorkflowSucceeded val outputs = ActorTestUtil.messageAndWait(WorkflowOutputs(workflowId), _.mapTo[binding.WorkflowOutputs]) - val actual = outputs.map { case (k, WdlString(string)) => k -> string } actual shouldEqual Map(HelloWorld.OutputKey -> HelloWorld.OutputValue) } diff --git a/src/test/scala/cromwell/engine/SingleWorkflowRunnerActorSpec.scala b/src/test/scala/cromwell/engine/SingleWorkflowRunnerActorSpec.scala index 972cf147317..f1f78626185 100644 --- a/src/test/scala/cromwell/engine/SingleWorkflowRunnerActorSpec.scala +++ b/src/test/scala/cromwell/engine/SingleWorkflowRunnerActorSpec.scala @@ -1,23 +1,20 @@ package cromwell.engine -import cromwell.HelloWorldActorSpec._ -import cromwell.util.SampleWdl.ThreeStep -import akka.actor.ActorSystem import akka.testkit.EventFilter -import com.typesafe.config.ConfigFactory import cromwell.CromwellTestkitSpec +import cromwell.util.SampleWdl.ThreeStep + import scala.language.postfixOps -class SingleWorkflowRunnerActorSpec extends CromwellTestkitSpec(ActorSystem("ActorWorkflowManagerSpec", ConfigFactory.parseString(Config))) { - val actorSystem = super.getActorSystem - val workflowManagerActor = actorSystem.actorOf(WorkflowManagerActor.props) +class SingleWorkflowRunnerActorSpec extends CromwellTestkitSpec("SingleWorkflowRunnerActorSpec") { + val workflowManagerActor = this.system.actorOf(WorkflowManagerActor.props) val props = SingleWorkflowRunnerActor.props(ThreeStep.WdlSource, ThreeStep.RawInputs, workflowManagerActor) "A SingleWorkflowRunnerActor" should { "successfully run a workflow" in { - EventFilter.info(message = "Workflow complete: Succeeded", occurrences = 1) intercept { - actorSystem.actorOf(props) + EventFilter.info(message = "SingleWorkflowRunnerActor: workflow finished with status 'Succeeded'", occurrences = 1) intercept { + system.actorOf(props) } } } -} \ No newline at end of file +}