Skip to content

Commit

Permalink
Fix k8s timeline start TRY 3 (#1687)
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Smoot <[email protected]>
  • Loading branch information
mes5k authored Aug 14, 2020
1 parent dc029c0 commit 24dff87
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package nextflow.k8s

import java.nio.file.Files
import java.nio.file.Path
import java.time.Instant
import java.time.format.DateTimeFormatter

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
Expand Down Expand Up @@ -274,6 +276,36 @@ class K8sTaskHandler extends TaskHandler {
return false
}

long getEpochMilli(String timeString) {
final time = DateTimeFormatter.ISO_INSTANT.parse(timeString)
return Instant.from(time).toEpochMilli()
}

/**
* Update task start and end times based on pod timestamps.
* We update timestamps because it's possible for a task to run so quickly
* (less than 1 second) that it skips right over the RUNNING status.
* If this happens, the startTimeMillis never gets set and remains equal to 0.
* To make sure startTimeMillis is non-zero we update it with the pod start time.
* We update completTimeMillis from the same pod info to be consistent.
*/
void updateTimestamps(Map terminated) {
try {
startTimeMillis = getEpochMilli(terminated.startedAt as String)
completeTimeMillis = getEpochMilli(terminated.finishedAt as String)
} catch( Exception e ) {
log.debug "Failed updating timestamps '${terminated.toString()}'", e
// Only update if startTimeMillis hasn't already been set.
// If startTimeMillis _has_ been set, then both startTimeMillis
// and completeTimeMillis will have been set with the normal
// TaskHandler mechanism, so there's no need to reset them here.
if (!startTimeMillis) {
startTimeMillis = System.currentTimeMillis()
completeTimeMillis = System.currentTimeMillis()
}
}
}

@Override
boolean checkIfCompleted() {
if( !podName ) throw new IllegalStateException("Missing K8s pod name - cannot check if complete")
Expand All @@ -286,6 +318,7 @@ class K8sTaskHandler extends TaskHandler {
status = TaskStatus.COMPLETED
savePodLogOnError(task)
deletePodIfSuccessful(task)
updateTimestamps(state.terminated as Map)
return true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ class K8sTaskHandlerTest extends Specification {
def task = new TaskRun()
def client = Mock(K8sClient)
def handler = Spy(K8sTaskHandler)
def termState = [ reason: "Completed",
startedAt: "2018-01-13T10:09:36Z",
finishedAt: "2018-01-13T10:19:36Z",
exitCode: 0 ]
def fullState = [terminated: termState]
handler.task = task
handler.client = client
handler.podName = POD_NAME
Expand All @@ -448,14 +453,17 @@ class K8sTaskHandlerTest extends Specification {
when:
result = handler.checkIfCompleted()
then:
1 * handler.getState() >> [terminated: ["reason": "Completed", "finishedAt": "2018-01-13T10:19:36Z", exitCode: 0]]
1 * handler.getState() >> fullState
1 * handler.updateTimestamps(termState)
1 * handler.readExitFile() >> EXIT_STATUS
1 * handler.deletePodIfSuccessful(task) >> null
1 * handler.savePodLogOnError(task) >> null
handler.task.exitStatus == EXIT_STATUS
handler.task.@stdout == OUT_FILE
handler.task.@stderr == ERR_FILE
handler.status == TaskStatus.COMPLETED
handler.startTimeMillis == 1515838176000
handler.completeTimeMillis == 1515838776000
result == true

}
Expand Down Expand Up @@ -722,4 +730,56 @@ class K8sTaskHandlerTest extends Specification {
1 * k8sConfig.getPodOptions() >> new PodOptions([ [env:'BRAVO', value:'HOTEL'] ])
opts == new PodOptions([[env:'HELLO', value:'WORLD'], [env:'BRAVO', value:'HOTEL']])
}

def 'should update startTimeMillis and completeTimeMillis with terminated state' () {

given:
def handler = Spy(K8sTaskHandler)
def termState = [ startedAt: "2018-01-13T10:09:36Z",
finishedAt: "2018-01-13T10:19:36Z" ]

when:
handler.updateTimestamps(termState)
then:
handler.startTimeMillis == 1515838176000
handler.completeTimeMillis == 1515838776000
}

def 'should update timestamps with current time with missing or malformed time' () {

given:
def handler = Spy(K8sTaskHandler)
def malformedTime = [ startedAt: "2018-01-13 10:09:36",
finishedAt: "2018-01-13T10:19:36Z" ]

def garbage = [ what: "nope" ]

when:
handler.updateTimestamps(malformedTime)
then:
handler.startTimeMillis > 0 // confirms that timestamps have been updated
handler.startTimeMillis <= handler.completeTimeMillis // confirms that order is sane

when:
handler.updateTimestamps(garbage)
then:
handler.startTimeMillis > 0
handler.startTimeMillis <= handler.completeTimeMillis
}

def 'should not update timestamps with malformed time and when startTimeMillis already set' () {

given:
def handler = Spy(K8sTaskHandler)
handler.startTimeMillis = 10
handler.completeTimeMillis = 20
def malformedTime = [ startedAt: "2018-01-13 10:09:36",
finishedAt: "2018-01-13T10:19:36Z" ]

when:
handler.updateTimestamps(malformedTime)
then:
handler.startTimeMillis == 10
handler.completeTimeMillis == 20
}
}

0 comments on commit 24dff87

Please sign in to comment.