Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add markJobSuccessful api call #595

Merged
merged 5 commits into from
Mar 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ You can manually start a job by issuing an HTTP request.
* Example: `curl -L -X PUT chronos-node:8080/scheduler/job/job_name?arguments=-debug`
* Response: HTTP 204

## Marking a job as successful

You can manually mark a job as successful by issuing an HTTP request. If a job is marked successful, the success count
of the job is incremented, the latest successful run time is updated, and all downstream dependencies are handled as if
the job had completed executing the code in a standard run.
the job normally runs.

* Endpoint: ___/scheduler/job/success/<jobname>
* Method: __PUT__
* Query string parameters: `arguments` - jobname to be marked success
* Example: `curl -L -X PUT chronos-node:8080/scheduler/job/success/request_event_counter_hourly`
* Response: boolean (true or false depending on success of request)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The response is actually a string formatted with: "marked job %s as successful: %b"

It is always 200 OK even if success is false, which basically forces clients of the API to parse the returned string. Returning JSON would simplify parsing, using a different status code would be even better.


## Adding a Scheduled Job

The heart of job scheduling is a JSON POST request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,28 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
}
}

/**
* Mark Job successful
*/
@Path(PathConstants.jobSuccessPath)
@PUT
@Timed
def markJobSuccessful(@PathParam("jobName") jobName: String): Response = {
try {
val success = jobScheduler.markJobSuccessAndFireOffDependencies(jobName)
Response.ok("marked job %s as successful: %b".format(jobName, success)).build()
} catch {
case ex: IllegalArgumentException =>
log.log(Level.INFO, "Bad Request", ex)
Response.status(Response.Status.BAD_REQUEST).entity(ex.getMessage)
.build()
case ex: Exception =>
log.log(Level.WARNING, "Exception while serving request", ex)
Response.serverError().build
}
}


/**
* Allows an user to update the elements processed count for a job that
* supports data tracking. The processed count has to be non-negative.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ object PathConstants {
final val allStatsPath = "stats/{percentile}"
final val jobStatsPatternPath = "job/stat/{jobName}"
final val jobTaskProgressPath = "job/{jobName}/task/{taskId}/progress"
final val jobSuccessPath = "job/success/{jobName}"
final val graphBasePath = "/graph"
final val jobGraphDotPath = "dot"
final val jobGraphCsvPath = "csv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val job = jobOption.get
jobsObserver.apply(JobFinished(job, taskStatus, attempt))

val newJob = job match {
case job: ScheduleBasedJob =>
job.copy(successCount = job.successCount + 1,
errorsSinceLastSuccess = 0,
lastSuccess = DateTime.now(DateTimeZone.UTC).toString)
case job: DependencyBasedJob =>
job.copy(successCount = job.successCount + 1,
errorsSinceLastSuccess = 0,
lastSuccess = DateTime.now(DateTimeZone.UTC).toString)
case _ =>
throw new IllegalArgumentException("Cannot handle unknown task type")
}
val newJob = getNewSuccessfulJob(job)
replaceJob(job, newJob)
processDependencies(jobName, taskDate)

Expand Down Expand Up @@ -310,6 +299,43 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
}
}

/**
* Mark job by job name as successful. Trigger any dependent children jobs that should be run as a result
*/
def markJobSuccessAndFireOffDependencies(jobName : String): Boolean = {
val optionalJob = jobGraph.getJobForName(jobName)
if (optionalJob.isEmpty) {
log.warning("%s not found in job graph, not marking success".format(jobName))
return false
} else {
val job = optionalJob.get
jobMetrics.updateJobStatus(jobName, success = true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't jobsObserver be notified here?

val newJob = getNewSuccessfulJob(job)
replaceJob(job, newJob)
log.info("Resetting dependency invocations for %s".format(newJob))
jobGraph.resetDependencyInvocations(jobName)
log.info("Processing dependencies for %s".format(jobName))
processDependencies(jobName, Option(DateTime.parse(newJob.lastSuccess)))
}
true
}

private def getNewSuccessfulJob(job: BaseJob): BaseJob = {
val newJob = job match {
case job: ScheduleBasedJob =>
job.copy(successCount = job.successCount + 1,
errorsSinceLastSuccess = 0,
lastSuccess = DateTime.now(DateTimeZone.UTC).toString)
case job: DependencyBasedJob =>
job.copy(successCount = job.successCount + 1,
errorsSinceLastSuccess = 0,
lastSuccess = DateTime.now(DateTimeZone.UTC).toString)
case _ =>
throw new scala.IllegalArgumentException("Cannot handle unknown task type")
}
newJob
}

def replaceJob(oldJob: BaseJob, newJob: BaseJob) {
lock.synchronized {
jobGraph.replaceVertex(oldJob, newJob)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,63 @@ class JobSchedulerIntegrationTest extends SpecificationWithJUnit with Mockito {
there was one(mockJobsObserver).apply(JobFailed(Right(job3), TaskUtils.getTaskStatus(job1, DateTime.parse("2012-01-03T00:00:01.000Z"), 0), 0))
}

"Marking a job successful updates the success and error counts and triggers children" in {
val epsilon = Minutes.minutes(20).toPeriod
val job1 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1D",
name = "job1", command = "fooo", epsilon = epsilon, retries = 0)
val dependentJob = new DependencyBasedJob(Set("job1", "job3"), name = "dependentJob", command = "CMD", disabled = false)
val job3 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1D",
name = "job3", command = "fooo", epsilon = epsilon, retries = 0)

val horizon = Minutes.minutes(5).toPeriod
val mockTaskManager = mock[TaskManager]
val jobGraph = new JobGraph()
val mockPersistenceStore = mock[PersistenceStore]
val mockJobsObserver = mockFullObserver

val scheduler = mockScheduler(horizon, mockTaskManager, jobGraph, mockPersistenceStore, mockJobsObserver)
val date = DateTime.parse("2011-01-01T00:05:01.000Z")

val edgeInvocationCount = jobGraph.edgeInvocationCount

scheduler.leader.set(true)
scheduler.registerJob(job1, persist = true, date)
scheduler.registerJob(job3, persist = true, date)
scheduler.registerJob(dependentJob, persist = true, date)

scheduler.run(() => {
date
})

val failedDate = date.plusMinutes(1)
val passingDate = date.plusMinutes(1)

scheduler.handleFailedTask(TaskUtils.getTaskStatus(job1, failedDate, 0))
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(job3, passingDate, 0))
val failedJob = jobGraph.lookupVertex("job1").get
failedJob.errorCount must_== 1
failedJob.successCount must_== 0
failedJob.errorsSinceLastSuccess must_== 1

scheduler.markJobSuccessAndFireOffDependencies("job1")
val jobMarkedSuccess = jobGraph.lookupVertex("job1").get
jobMarkedSuccess.errorCount must_== 1
jobMarkedSuccess.successCount must_== 1
jobMarkedSuccess.errorsSinceLastSuccess must_== 0
val lastSuccess = DateTime.parse(jobMarkedSuccess.lastSuccess)
there was one(mockTaskManager).enqueue(TaskUtils.getTaskId(dependentJob, lastSuccess, 0),
highPriority = false)
scheduler.handleStartedTask(TaskUtils.getTaskStatus(dependentJob, lastSuccess, 0))
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(dependentJob, lastSuccess, 0))
edgeInvocationCount.get(jobGraph.dag.getEdge("job1", "dependentJob")) must_== Some(0L)
jobGraph.lookupVertex("dependentJob").get.successCount must_== 1

scheduler.handleFinishedTask(TaskUtils.getTaskStatus(job1, passingDate, 0))
scheduler.markJobSuccessAndFireOffDependencies("dependentJob")
jobGraph.lookupVertex("dependentJob").get.successCount must_== 2
edgeInvocationCount.get(jobGraph.dag.getEdge("job1", "dependentJob")) must_== Some(0L)
}

"Tests that a disabled job does not run and does not execute dependant children." in {
val epsilon = Minutes.minutes(20).toPeriod
val job1 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1M",
Expand Down