Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a job's run time and attempt as environment variables exposed to the job #629

Merged
merged 1 commit into from
Apr 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import java.util.logging.Logger
import javax.inject.Inject

import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.jobs.{Fetch, BaseJob}
import org.apache.mesos.chronos.scheduler.jobs.{TaskUtils, EnvironmentVariable, Fetch, BaseJob}

import com.google.common.base.Charsets
import com.google.protobuf.ByteString
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
Expand Down Expand Up @@ -50,38 +51,42 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
builder.build()
}

def envs(taskIdStr: String, job: BaseJob, offer: Offer): Environment.Builder = {
val (_, start, attempt, _) = TaskUtils.parseTaskId(taskIdStr)
val baseEnv = Map(
"mesos_task_id" -> taskIdStr,
"CHRONOS_JOB_OWNER" -> job.owner,
"CHRONOS_JOB_NAME" -> job.name,
"HOST" -> offer.getHostname,
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
"CHRONOS_JOB_RUN_TIME" -> start.toString,
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
)

// If the job defines custom environment variables, add them to the builder
// Don't add them if they already exist to prevent overwriting the defaults
val finalEnv =
if (job.environmentVariables != null && job.environmentVariables.nonEmpty) {
job.environmentVariables.foldLeft(baseEnv)((envs, env) =>
if (envs.contains(env.name)) envs else envs + (env.name -> env.value)
)
} else {
baseEnv
}

finalEnv.foldLeft(Environment.newBuilder())((builder, env) =>
builder.addVariables(Variable.newBuilder().setName(env._1).setValue(env._2)))
}

def getMesosTaskInfoBuilder(taskIdStr: String, job: BaseJob, offer: Offer): TaskInfo.Builder = {
//TODO(FL): Allow adding more fine grained resource controls.
val taskId = TaskID.newBuilder().setValue(taskIdStr).build()
val taskInfo = TaskInfo.newBuilder()
.setName(taskNameTemplate.format(job.name))
.setTaskId(taskId)
val environment = Environment.newBuilder()
.addVariables(Variable.newBuilder()
.setName("mesos_task_id").setValue(taskIdStr))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_JOB_OWNER").setValue(job.owner))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_JOB_NAME").setValue(job.name))
.addVariables(Variable.newBuilder()
.setName("HOST").setValue(offer.getHostname))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_RESOURCE_MEM").setValue(job.mem.toString))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_RESOURCE_CPU").setValue(job.cpus.toString))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_RESOURCE_DISK").setValue(job.disk.toString))

// If the job defines custom environment variables, add them to the builder
// Don't add them if they already exist to prevent overwriting the defaults
val builtinEnvNames = environment.getVariablesList.asScala.map(_.getName).toSet
if (job.environmentVariables != null && job.environmentVariables.nonEmpty) {
job.environmentVariables.foreach(env =>
if (!builtinEnvNames.contains(env.name)) {
environment.addVariables(Variable.newBuilder().setName(env.name).setValue(env.value))
}
)
}
val environment = envs(taskIdStr, job, offer)

val fetch = job.fetch ++ job.uris.map { Fetch(_) }
val uriCommand = fetch.map { f =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.apache.mesos.chronos.scheduler.mesos

import scala.collection.JavaConversions._
import org.apache.mesos.Protos._
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.jobs.Parameter
import org.apache.mesos.chronos.scheduler.jobs.Volume
import org.apache.mesos.chronos.scheduler.jobs._
import org.apache.mesos.chronos.scheduler.jobs.constraints.{LikeConstraint, EqualsConstraint}
import org.joda.time.Minutes
import org.specs2.mock.Mockito
import org.specs2.mutable.SpecificationWithJUnit


class MesosTaskBuilderSpec extends SpecificationWithJUnit with Mockito {

val taskId = "ct:1454467003926:0:test2Execution:run"

val (_, start, attempt, _) = TaskUtils.parseTaskId(taskId)

val offer = Offer.newBuilder().mergeFrom(Offer.getDefaultInstance)
.setHostname("localport")
.setId(OfferID.newBuilder().setValue("123").build())
.setFrameworkId(FrameworkID.newBuilder().setValue("123").build())
.setSlaveId(SlaveID.newBuilder().setValue("123").build())
.build()

val job = {
val volumes = Seq(
Volume(Option("/host/dir"), "container/dir", Option(VolumeMode.RW)),
Volume(None, "container/dir", None)
)

var parameters = scala.collection.mutable.ListBuffer[Parameter]()

val container = DockerContainer("dockerImage", volumes, parameters, NetworkMode.HOST, true)

val constraints = Seq(
EqualsConstraint("rack", "rack-1"),
LikeConstraint("rack", "rack-[1-3]")
)

new ScheduleBasedJob("FOO/BAR/BAM", "AJob", "noop", Minutes.minutes(5).toPeriod, 10L, 20L,
"fooexec", "fooflags", "none", 7, "[email protected]", "Foo", "Test schedule based job", "TODAY",
"YESTERDAY", true, cpus = 2, disk = 3, mem = 5, container = container, environmentVariables = Seq(),
shell = true, arguments = Seq(), softError = true, constraints = constraints)
}

val defaultEnv = Map(
"mesos_task_id" -> taskId,
"CHRONOS_JOB_OWNER" -> job.owner,
"CHRONOS_JOB_NAME" -> job.name,
"HOST" -> offer.getHostname,
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
"CHRONOS_JOB_RUN_TIME" -> start.toString,
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
)

def toMap(envs: Environment): Map[String, String] =
envs.getVariablesList.foldLeft(Map[String, String]())((m, v) => m + (v.getName -> v.getValue))

"MesosTaskBuilder" should {
"Setup all the default environment variables" in {
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])

defaultEnv must_== toMap(target.envs(taskId, job, offer).build())
}
}

"MesosTaskBuilder" should {
"Setup all the default environment variables and job environment variables" in {
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])

val testJob = job.copy(environmentVariables = Seq(
EnvironmentVariable("FOO", "BAR"),
EnvironmentVariable("TOM", "JERRY")
))

val finalEnv = defaultEnv ++ Map("FOO" -> "BAR", "TOM" -> "JERRY")

finalEnv must_== toMap(target.envs(taskId, testJob, offer).build())
}
}

"MesosTaskBuilder" should {
"Should not allow job environment variables to overwrite any default environment variables" in {
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])

val testJob = job.copy(environmentVariables = Seq(
EnvironmentVariable("CHRONOS_RESOURCE_MEM", "10000"),
EnvironmentVariable("CHRONOS_RESOURCE_DISK", "40000")
))

defaultEnv must_== toMap(target.envs(taskId, testJob, offer).build())
}
}
}