Skip to content

Commit

Permalink
Allow Chronos jobs to override taskInfo data
Browse files Browse the repository at this point in the history
- Add 'taskInfoData' field to Chronos jobs
- If specified on a job, this data is placed in the
  data field of the taskInfo object, which is passed
  to the mesos executor.
- If not specified, then default Chronos task data
  is used (as per previous behaviour).
  • Loading branch information
jamesmulcahy committed Mar 1, 2016
1 parent 9d1d46b commit c2ae724
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 82 deletions.
1 change: 1 addition & 0 deletions docs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ When specifying the `command` field in your job hash, use `url-runner.bash` (mak
| `epsilon` | If, for any reason, a job can't be started at the scheduled time, this is the window in which Chronos will attempt to run the job again | `PT60S` or `--task_epsilon` |
| `executor` | Mesos executor. By default Chronos uses the Mesos command executor. | - |
| `executorFlags` | Flags to pass to Mesos executor. | - |
| `taskInfoData` | Data to pass to the taskInfo data field. If set, this overrides the default data set by Chronos. | - |
| `retries` | Number of retries to attempt if a command returns a non-zero status | `2` |
| `owner` | Email address(es) to send job failure notifications. Use comma-separated list for multiple addresses. | - |
| `ownerName` | Name of the individual responsible for the job. | - |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
errorCount = childJob.errorCount,
executor = childJob.executor,
executorFlags = childJob.executorFlags,
taskInfoData = childJob.taskInfoData,
retries = childJob.retries,
owner = childJob.owner,
lastError = childJob.lastError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,73 +160,11 @@ object JobUtils {
}
}

def getJobWithArguments(job: BaseJob, arguments: String): BaseJob = {
def getJobWithArguments(job : BaseJob, arguments: String): BaseJob = {
val commandWithArgs = job.command + " " + arguments
val jobWithArguments = job match {
case j: DependencyBasedJob =>
new DependencyBasedJob(
parents = Set(),
name = job.name,
command = commandWithArgs,
epsilon = job.epsilon,
successCount = job.successCount,
errorCount = job.errorCount,
executor = job.executor,
executorFlags = job.executorFlags,
retries = job.retries,
owner = job.owner,
lastError = job.lastError,
lastSuccess = job.lastSuccess,
async = job.async,
cpus = job.cpus,
disk = job.disk,
mem = job.mem,
disabled = job.disabled,
errorsSinceLastSuccess = job.errorsSinceLastSuccess,
softError = job.softError,
uris = job.uris,
fetch = job.fetch,
highPriority = job.highPriority,
runAsUser = job.runAsUser,
container = job.container,
environmentVariables = job.environmentVariables,
shell = job.shell,
arguments = job.arguments,
constraints = job.constraints
)
case j: ScheduleBasedJob =>
new ScheduleBasedJob(
schedule = j.schedule,
scheduleTimeZone = j.scheduleTimeZone,
name = job.name,
command = commandWithArgs,
epsilon = job.epsilon,
successCount = job.successCount,
errorCount = job.errorCount,
executor = job.executor,
executorFlags = job.executorFlags,
retries = job.retries,
owner = job.owner,
lastError = job.lastError,
lastSuccess = job.lastSuccess,
async = job.async,
cpus = job.cpus,
disk = job.disk,
mem = job.mem,
disabled = job.disabled,
errorsSinceLastSuccess = job.errorsSinceLastSuccess,
softError = job.softError,
uris = job.uris,
fetch = job.fetch,
highPriority = job.highPriority,
runAsUser = job.runAsUser,
container = job.container,
environmentVariables = job.environmentVariables,
shell = job.shell,
arguments = job.arguments,
constraints = job.constraints
)
job match {
case j: DependencyBasedJob => j.copy(command = commandWithArgs)
case j: ScheduleBasedJob => j.copy(command = commandWithArgs)
}
jobWithArguments
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ trait BaseJob {

def executorFlags: String = ""

def taskInfoData: String = ""

def retries: Int = 2

def owner: String = ""
Expand Down Expand Up @@ -95,6 +97,7 @@ case class ScheduleBasedJob(
@JsonProperty override val errorCount: Long = 0L,
@JsonProperty override val executor: String = "",
@JsonProperty override val executorFlags: String = "",
@JsonProperty override val taskInfoData: String = "",
@JsonProperty override val retries: Int = 2,
@JsonProperty override val owner: String = "",
@JsonProperty override val ownerName: String = "",
Expand Down Expand Up @@ -132,6 +135,7 @@ case class DependencyBasedJob(
@JsonProperty override val errorCount: Long = 0L,
@JsonProperty override val executor: String = "",
@JsonProperty override val executorFlags: String = "",
@JsonProperty override val taskInfoData: String = "",
@JsonProperty override val retries: Int = 2,
@JsonProperty override val owner: String = "",
@JsonProperty override val ownerName: String = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
.build()).build
}

def appendExecutorData(taskInfo: TaskInfo.Builder, job: BaseJob, environment: Environment.Builder, uriProtos: Seq[CommandInfo.URI]) {
private def appendExecutorData(taskInfo: TaskInfo.Builder, job: BaseJob, environment: Environment.Builder, uriProtos: Seq[CommandInfo.URI]) {
log.info("Appending executor:" + job.executor + ", flags:" + job.executorFlags + ", command:" + job.command)
val command = CommandInfo.newBuilder()
.setValue(job.executor)
Expand All @@ -194,14 +194,15 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
if (job.container != null) {
executor.setContainer(createContainerInfo(job))
}
taskInfo.setExecutor(executor)
.setData(getDataBytes(job.executorFlags, job.command))
taskInfo.setExecutor(executor).setData(getDataBytes(job))
}

def getDataBytes(executorFlags: String, executorArgs: String) = {
val dataStr = executorArgsPattern.format(executorFlags, executorArgs)
ByteString.copyFrom(dataStr.getBytes(Charsets.UTF_8))
private def getDataBytes(job : BaseJob) : ByteString = {
val string = if (job.taskInfoData != "") {
job.taskInfoData
} else {
executorArgsPattern.format(job.executorFlags, job.command)
}
ByteString.copyFrom(string.getBytes(Charsets.UTF_8))
}

def getExecutorName(x: String) = "%s".format(x)
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
if (node.has("executor") && node.get("executor") != null) node.get("executor").asText
else ""


val executorFlags =
if (node.has("executorFlags") && node.get("executorFlags") != null) node.get("executorFlags").asText
else ""

val taskInfoData =
if (node.has("taskInfoData") && node.get("taskInfoData") != null) node.get("taskInfoData").asText
else ""

val retries =
if (node.has("retries") && node.get("retries") != null) node.get("retries").asInt
else 2
Expand Down Expand Up @@ -216,7 +219,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
}
new DependencyBasedJob(parents = parentList.toSet,
name = name, command = command, epsilon = epsilon, successCount = successCount, errorCount = errorCount,
executor = executor, executorFlags = executorFlags, retries = retries, owner = owner,
executor = executor, executorFlags = executorFlags, taskInfoData = taskInfoData, retries = retries, owner = owner,
ownerName = ownerName, description = description, lastError = lastError, lastSuccess = lastSuccess,
async = async, cpus = cpus, disk = disk, mem = mem, disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess, fetch = fetch, uris = uris, highPriority = highPriority,
Expand All @@ -227,7 +230,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
val scheduleTimeZone = if (node.has("scheduleTimeZone")) node.get("scheduleTimeZone").asText else ""
new ScheduleBasedJob(node.get("schedule").asText, name = name, command = command,
epsilon = epsilon, successCount = successCount, errorCount = errorCount, executor = executor,
executorFlags = executorFlags, retries = retries, owner = owner, ownerName = ownerName,
executorFlags = executorFlags, taskInfoData = taskInfoData, retries = retries, owner = owner, ownerName = ownerName,
description = description, lastError = lastError, lastSuccess = lastSuccess, async = async,
cpus = cpus, disk = disk, mem = mem, disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess, fetch = fetch, uris = uris, highPriority = highPriority,
Expand All @@ -237,7 +240,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
} else {
/* schedule now */
new ScheduleBasedJob("R1//PT24H", name = name, command = command, epsilon = epsilon, successCount = successCount,
errorCount = errorCount, executor = executor, executorFlags = executorFlags, retries = retries, owner = owner,
errorCount = errorCount, executor = executor, executorFlags = executorFlags, taskInfoData = taskInfoData, retries = retries, owner = owner,
ownerName = ownerName, description = description, lastError = lastError, lastSuccess = lastSuccess,
async = async, cpus = cpus, disk = disk, mem = mem, disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess, fetch = fetch, uris = uris, highPriority = highPriority,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class JobSerializer extends JsonSerializer[BaseJob] {
json.writeFieldName("executorFlags")
json.writeString(baseJob.executorFlags)

json.writeFieldName("taskInfoData")
json.writeString(baseJob.taskInfoData)

json.writeFieldName("retries")
json.writeNumber(baseJob.retries)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SerDeTest extends SpecificationWithJUnit {
val fetch = Seq(Fetch("https://mesos.github.io/chronos/", true, false, true))

val a = new DependencyBasedJob(Set("B", "C", "D", "E"), "A", "noop", Minutes.minutes(5).toPeriod, 10L,
20L, "fooexec", "fooflags", 7, "[email protected]", "Foo", "Test dependency-based job", "TODAY",
20L, "fooexec", "fooflags", "", 7, "[email protected]", "Foo", "Test dependency-based job", "TODAY",
"YESTERDAY", true, container = container, environmentVariables = environmentVariables,
shell = false, arguments = arguments, softError = true, constraints = constraints, fetch = fetch)

Expand Down Expand Up @@ -90,7 +90,7 @@ class SerDeTest extends SpecificationWithJUnit {
val fetch = Seq(Fetch("https://mesos.github.io/chronos/", true, false, true))

val a = new ScheduleBasedJob("FOO/BAR/BAM", "A", "noop", Minutes.minutes(5).toPeriod, 10L, 20L,
"fooexec", "fooflags", 7, "[email protected]", "Foo", "Test schedule-based job", "TODAY",
"fooexec", "fooflags", "", 7, "[email protected]", "Foo", "Test schedule-based job", "TODAY",
"YESTERDAY", true, container = container, environmentVariables = environmentVariables,
shell = true, arguments = arguments, softError = true, constraints = constraints, fetch = fetch)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ class PersistenceStoreSpec extends SpecificationWithJUnit with Mockito {
val startTime = "R1/2012-01-01T00:00:01.000Z/PT1M"
val job = new ScheduleBasedJob(schedule = startTime, name = "sample-name",
command = "sample-command", successCount = 1L, epsilon = Hours.hours(1).toPeriod,
executor = "fooexecutor", executorFlags = "args")
executor = "fooexecutor", executorFlags = "args", taskInfoData = "SomeData")

store.persistJob(job)
val job2 = store.getJob(job.name)

job2.name must_== job.name
job2.executor must_== job.executor
job2.taskInfoData must_== job.taskInfoData
job2.successCount must_== job.successCount
job2.command must_== job.command

Expand All @@ -36,7 +37,7 @@ class PersistenceStoreSpec extends SpecificationWithJUnit with Mockito {
name = "sample-dep", command = "sample-command",
epsilon = epsilon, softError = true,
successCount = 1L, errorCount = 0L,
executor = "fooexecutor", executorFlags = "-w",
executor = "fooexecutor", executorFlags = "-w", taskInfoData="SomeData",
retries = 1, disabled = false)

store.persistJob(job)
Expand All @@ -49,6 +50,7 @@ class PersistenceStoreSpec extends SpecificationWithJUnit with Mockito {
job2.errorCount must_== job.errorCount
job2.executor must_== job.executor
job2.executorFlags must_== job.executorFlags
job2.taskInfoData must_== job.taskInfoData
job2.retries must_== job.retries
job2.disabled must_== job.disabled
}
Expand Down

0 comments on commit c2ae724

Please sign in to comment.