Skip to content

Commit

Permalink
Better logging, more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Scott Frazer authored and scottfrazer committed Jun 23, 2015
1 parent 4a03dc3 commit 2e6ab29
Show file tree
Hide file tree
Showing 24 changed files with 291 additions and 160 deletions.
14 changes: 1 addition & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,18 +370,6 @@ Contains implementations of an interface to launch jobs. `cromwell.engine` will

### cromwell.engine

![Engine Actors](http://i.imgur.com/ByFUakW.png)
![Engine Actors](http://i.imgur.com/sF9vMt2.png)

Contains the Akka code and actor system to execute a workflow. This layer should operate entirely on objects returned from the `cromwell.binding` layer.

|Start|End|Message|Parameters|State Change|
|-----|---|-------|----------|------------|
|![WMA](http://i.imgur.com/98ugXkZ.png)|![WA](http://i.imgur.com/eJxn3wu.png)|Start||
|![WMA](http://i.imgur.com/98ugXkZ.png)|![WA](http://i.imgur.com/eJxn3wu.png)|SubscribeTransitionCallback|WorkflowManagerActor|
|![WA](http://i.imgur.com/eJxn3wu.png)|![SA](http://i.imgur.com/JzNNe64.png)|UpdateStatus|Call,status|
|![WA](http://i.imgur.com/eJxn3wu.png)|![SA](http://i.imgur.com/JzNNe64.png)|CallCompleted|Call,outputs|
|![WA](http://i.imgur.com/eJxn3wu.png)|![SA](http://i.imgur.com/JzNNe64.png)|GetOutputs||
|![WA](http://i.imgur.com/eJxn3wu.png)|![CA](http://i.imgur.com/Nyoln74.png)|Start||

> **TODO**: This table is not complete.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ libraryDependencies ++= Seq(
"io.spray" %% "spray-http" % sprayV,
"io.spray" %% "spray-json" % DowngradedSprayV,
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-slf4j" % akkaV,
"commons-codec" % "commons-codec" % "1.10",
"ch.qos.logback" % "logback-classic" % "1.1.3",
//---------- Test libraries -------------------//
"ch.qos.logback" % "logback-access" % "1.1.3",
"org.codehaus.janino" % "janino" % "2.7.8",
"io.spray" %% "spray-testkit" % sprayV % Test,
"org.scalatest" %% "scalatest" % "2.2.5" % Test,
"com.typesafe.akka" %% "akka-testkit" % akkaV % Test
Expand Down
12 changes: 12 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
akka {
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
actor {
debug {
receive = on
event-stream = on
fsm = on
}
}
}
30 changes: 30 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<configuration>
<if condition='property("CROMWELL_LOGGER").equals("SERVER")'>
<then>
<appender name="SERVER_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- Support multiple-JVM writing to the same log file -->
<prudent>true</prudent>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logFile.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
</encoder>
</appender>
<root level="TRACE">
<appender-ref ref="SERVER_APPENDER" />
</root>
</then>
<else>
<appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="cromwell.logging.TerminalLayout" />
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE_APPENDER" />
</root>
</else>
</if>
</configuration>
4 changes: 1 addition & 3 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
webservice.port = 8000
webservice.interface = 0.0.0.0
instance.name = "reference"
#vault.url = "https://api-ci.vault.broadinstitute.org/api"
#ces.url = "https://snoop-dev.broadinstitute.org"

swagger {
apiDocs = "api-docs"
Expand All @@ -25,4 +23,4 @@ spray.can {
request-timeout = 40s
connecting-timeout = 40s
}
}
}
43 changes: 32 additions & 11 deletions src/main/scala/cromwell/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,35 @@ import java.io.File
import java.nio.file.Paths

import cromwell.binding._
import cromwell.engine.{WorkflowManagerActor, SingleWorkflowRunnerActor}
import cromwell.binding.formatter.{AnsiSyntaxHighlighter, SyntaxFormatter}
import cromwell.engine.SingleWorkflowRunnerActor
import cromwell.parser.WdlParser.SyntaxError
import cromwell.server.{WorkflowManagerSystem, CromwellServer}
import cromwell.server.{CromwellServer, DefaultWorkflowManagerSystem, WorkflowManagerSystem}
import cromwell.util.FileUtil
import org.slf4j.LoggerFactory
import spray.json._

import scala.util.{Failure, Success}

object Actions extends Enumeration {
val Parse, Validate, Highlight, Run, Inputs, Server = Value
}

object Main extends App {

getAction(args.headOption map { _.capitalize }) match {
val props = System.getProperties
val loggerProperty = "CROMWELL_LOGGER"
Option(props.getProperty(loggerProperty)) match {
case None => args.headOption.map {_.capitalize}.find {_ == "SERVER"} match {
case Some(x) => props.setProperty(loggerProperty, "SERVER")
case _ =>
}
case _ =>
}

lazy val log = LoggerFactory.getLogger("main")

getAction(args.headOption map {
_.capitalize
}) match {
case Some(x) if x == Actions.Validate => validate(args.tail)
case Some(x) if x == Actions.Highlight => highlight(args.tail)
case Some(x) if x == Actions.Inputs => inputs(args.tail)
Expand All @@ -35,7 +48,7 @@ object Main extends App {
try {
WdlNamespace.load(new File(args(0)))
} catch {
case e:SyntaxError => println(e)
case e: SyntaxError => println(e)
}
}

Expand All @@ -52,13 +65,17 @@ object Main extends App {
val namespace = WdlNamespace.load(new File(args(0)))
println(namespace.workflows.head.inputs.toJson.prettyPrint)
} catch {
case e:SyntaxError => println(e)
case e: SyntaxError => println(e)
}
}

def run(args: Array[String]): Unit = {
def run(args: Array[String], workflowManagerSystem: WorkflowManagerSystem = new DefaultWorkflowManagerSystem): Unit = {
if (args.length != 2) usageAndExit()

log.info(s"RUN sub-command")
log.info(s" WDL file: ${args(0)}")
log.info(s" Inputs: ${args(1)}")

try {
val wdl = FileUtil.slurp(Paths.get(args(0)))
val jsValue = FileUtil.slurp(Paths.get(args(1))).parseJson
Expand All @@ -68,9 +85,11 @@ object Main extends App {
case _ => throw new RuntimeException("Expecting a JSON object")
}

val workflowManagerSystem = new WorkflowManagerSystem {}
inputs foreach { case (k, v) => log.info(s"input: $k => $v") }
val singleWorkflowRunner = SingleWorkflowRunnerActor.props(wdl, inputs, workflowManagerSystem.workflowManagerActor)

val actor = workflowManagerSystem.actorSystem.actorOf(singleWorkflowRunner)
workflowManagerSystem.actorSystem.awaitTermination()
// And now we just wait for the magic to happen
} catch {
case e: Exception =>
Expand Down Expand Up @@ -121,11 +140,13 @@ object Main extends App {
| Starts a web server on port 8000. See the web server
| documentation for more details about the API endpoints.
""".stripMargin)
if(exit) System.exit(-1)
if (exit) System.exit(-1)
}

def getAction(firstArg: Option[String]): Option[Actions.Value] = for {
arg <- firstArg
a <- Actions.values find { _.toString == arg }
a <- Actions.values find {
_.toString == arg
}
} yield a
}
12 changes: 6 additions & 6 deletions src/main/scala/cromwell/binding/formatter/SyntaxFormatter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cromwell.binding._
import cromwell.binding.command.{Command, ParameterCommandPart, StringCommandPart}
import cromwell.binding.types.WdlType
import cromwell.parser.WdlParser.{Ast, AstList, AstNode, Terminal}
import cromwell.util.TerminalUtil

import scala.collection.JavaConverters._

Expand All @@ -21,15 +22,14 @@ trait SyntaxHighlighter {
object NullSyntaxHighlighter extends SyntaxHighlighter

object AnsiSyntaxHighlighter extends SyntaxHighlighter {
def highlight(string: String, color: Int) = s"\033[38;5;${color}m${string}\033[0m"
override def keyword(s: String): String = highlight(s, 214)
override def name(s: String): String = highlight(s, 253)
override def keyword(s: String): String = TerminalUtil.highlight(214, s)
override def name(s: String): String = TerminalUtil.highlight(253, s)
override def section(s: String): String = s
override def wdlType(t: WdlType): String = highlight(t.toWdlString, 33)
override def variable(s: String): String = highlight(s, 112)
override def wdlType(t: WdlType): String = TerminalUtil.highlight(33, t.toWdlString)
override def variable(s: String): String = TerminalUtil.highlight(112, s)
override def alias(s: String): String = s
override def command(s: String): String = s
override def function(s: String): String = highlight(s, 13)
override def function(s: String): String = TerminalUtil.highlight(13, s)
}

object HtmlSyntaxHighlighter extends SyntaxHighlighter {
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/cromwell/binding/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ package object binding {
*/
case class WorkflowDescriptor(namespace: WdlNamespace, actualInputs: WorkflowCoercedInputs) {
val id = UUID.randomUUID()
val shortId = id.toString.split("-")(0)
val name = namespace.workflows.head.name
}
}
21 changes: 13 additions & 8 deletions src/main/scala/cromwell/engine/CallActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,24 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.language.postfixOps


object CallActor {
sealed trait CallActorMessage
case object Start extends CallActorMessage

def props(call: Call, backend: Backend, workflowDescriptor: WorkflowDescriptor, storeActor: ActorRef, name: String): Props =
def props(call: Call, backend: Backend, workflowDescriptor: WorkflowDescriptor, storeActor: ActorRef): Props =
Props(new CallActor(call, backend, workflowDescriptor, storeActor))
}


/** Actor to manage the execution of a single call. */
class CallActor(call: Call, backend: Backend, workflowDescriptor: WorkflowDescriptor, storeActor: ActorRef) extends Actor with CromwellActor {

private val log = Logging(context.system, this)
private val log = Logging(context.system, classOf[CallActor])
val tag = s"CallActor [UUID(${workflowDescriptor.shortId}):${call.name}]"

override def receive = LoggingReceive {
case CallActor.Start => handleStart()
case badMessage =>
val diagnostic = s"Received unexpected message $badMessage."
val diagnostic = s"$tag: unexpected message $badMessage."
log.error(diagnostic)
context.parent ! WorkflowActor.CallFailed(call, diagnostic)
}
Expand Down Expand Up @@ -58,6 +57,7 @@ class CallActor(call: Call, backend: Backend, workflowDescriptor: WorkflowDescri
backendInputs = backend.adjustInputPaths(call, inputs)
commandLine <- Future.fromTry(call.instantiateCommandLine(backendInputs))
} yield {
log.info(s"$tag: launching `$commandLine`")
originalSender ! WorkflowActor.CallStarted(call)
val tryOutputs = backend.executeCommand(commandLine, workflowDescriptor, call, s => inputs.get(s).get)
val (successes, failures) = tryOutputs.partition {
Expand All @@ -67,12 +67,17 @@ class CallActor(call: Call, backend: Backend, workflowDescriptor: WorkflowDescri
if (failures.isEmpty) {
// Materialize the Successes.
val outputs = successes.map { case (key, value) => key -> value.get }
log.info(s"$tag: success")
context.parent ! WorkflowActor.CallCompleted(call, outputs)
} else {
val errorMessages = TryUtil.stringifyFailures(failures.values).mkString("\n")
val errorMessages = TryUtil.stringifyFailures(failures.values)
log.error(s"$tag: failed")
errorMessages foreach {m =>
log.error(s"$tag: $m")
}

log.error(errorMessages)
context.parent ! WorkflowActor.CallFailed(call, errorMessages)
log.error(errorMessages.mkString("\n"))
context.parent ! WorkflowActor.CallFailed(call, errorMessages.mkString("\n"))
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions src/main/scala/cromwell/engine/ExecutionStore.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cromwell.engine

import cromwell.binding.{Call, WdlNamespace}
import akka.event.Logging
import cromwell.binding.{WorkflowDescriptor, Call, WdlNamespace}
import org.slf4j.LoggerFactory

object ExecutionStatus extends Enumeration {
type ExecutionStatus = Value
Expand All @@ -11,13 +13,16 @@ object ExecutionStatus extends Enumeration {
/**
* Corresponds to the "execution table" of our discussions.
*/
class ExecutionStore(namespace: WdlNamespace) {

class ExecutionStore(workflow: WorkflowDescriptor) {
val log = LoggerFactory.getLogger("ExecutionStore")
val tag = s"ExecutionStore [UUID(${workflow.shortId})]"
var table = workflow.namespace.workflows.head.calls.map { call => call -> ExecutionStatus.NotStarted }.toMap
def isWorkflowDone: Boolean = table.forall(_._2 == ExecutionStatus.Done)

def updateStatus(call: Call, status: ExecutionStatus.Value): Unit = table += (call -> status)

var table = namespace.workflows.head.calls.map { call => call -> ExecutionStatus.NotStarted }.toMap
def updateStatus(call: Call, status: ExecutionStatus.Value): Unit = {
log.info(s"$tag: ${call.name} update to $status")
table += (call -> status)
}

/**
* Start all calls which are currently in state `NotStarted` and whose prerequisites are all `Done`,
Expand Down
Loading

0 comments on commit 2e6ab29

Please sign in to comment.