Skip to content

Commit

Permalink
fix: Tighten up status check on delayed launch [DET-9474] (determined…
Browse files Browse the repository at this point in the history
…-ai#854)

If the synchronous launch takes too long, we may run into a
job monitoring poll which will detect the dispatchID as 404/NOT_FOUND
and stop monitoring. This would cause status updates to stop and
the job to show only QUEUED even though it is executing.
Add a launchInProgress flag to the job monitor to cause 404/NOT_FOUND
to be ignored until the launch has succeeded to avoid this case.
  • Loading branch information
jerryharrow authored and determined-ci committed Jun 21, 2023
1 parent 64aafed commit dd77ca9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
45 changes: 39 additions & 6 deletions master/internal/rm/dispatcherrm/dispatcher_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type launcherJob struct {
totalContainers int
runningContainers map[int]containerInfo
jobWasTerminated bool
launchInProgress bool // Launch proceeding concurrent with monitoring
}

// launcherMonitor describes the monitoring of jobs created by the launcher.
Expand Down Expand Up @@ -98,7 +99,10 @@ func newDispatchWatcher(apiClient *launcherAPIClient) *launcherMonitor {

// monitorJob adds the specified job to the collection of jobs whose status is monitored.
// payload name may be empty (when reconnecting), monitor will retrieve if necessary.
func (m *launcherMonitor) monitorJob(user string, dispatchID string, payloadName string) {
// When launchPending is true, a later call to notifyJobLaunched is required once the
// launcher has initiated the launch and the dispatchID will be valid for status checks.
func (m *launcherMonitor) monitorJob(
user string, dispatchID string, payloadName string, launchPending bool) {
m.newLauncherJob <- launcherJob{
user: user,
dispatcherID: dispatchID,
Expand All @@ -107,9 +111,28 @@ func (m *launcherMonitor) monitorJob(user string, dispatchID string, payloadName
totalContainers: 0,
runningContainers: make(map[int]containerInfo),
jobWasTerminated: false,
launchInProgress: launchPending,
}
}

// When monitoring commences with parameter isLaunching:true specified, 404/NOT_FOUND
// status returns are ignored for this job until notifyJobLaunched is called to enable
// normal monitoring.
func (m *launcherMonitor) notifyJobLaunched(
ctx *actor.Context,
dispatchID string,
) {
m.mu.Lock()
defer m.mu.Unlock()

if job, ok := m.monitoredJobs[dispatchID]; ok {
job.launchInProgress = false
return
}

ctx.Log().Errorf("Could not find dispatchID %s to mark as launched", dispatchID)
}

// removeJob removes the specified job from the collection of jobs whose status is monitored.
func (m *launcherMonitor) removeJob(dispatchID string) {
m.removeLauncherJob <- launcherJob{
Expand Down Expand Up @@ -244,6 +267,9 @@ func filterOutSuperfluousMessages(allMessages []string) []string {
return messagesMatchingPattern
}

// Receive notification that one of the ranks of the job has started
// execution. It generates experiment log messages as the ranks
// start, and updates job info so we know when all have started.
func (m *launcherMonitor) notifyContainerRunning(
ctx *actor.Context,
dispatchID string,
Expand All @@ -261,6 +287,7 @@ func (m *launcherMonitor) notifyContainerRunning(
return
}

job.launchInProgress = false
job.totalContainers = int(numPeers)

switch existingEntry, ok := job.runningContainers[int(rank)]; {
Expand Down Expand Up @@ -645,7 +672,7 @@ func (m *launcherMonitor) updateJobStatus(ctx *actor.Context, job *launcherJob)

ctx.Log().WithField("dispatch-id", dispatchID).Debug("Checking status of launcher job")

resp, ok := m.getDispatchStatus(ctx, owner, dispatchID)
resp, ok := m.getDispatchStatus(ctx, owner, dispatchID, job.launchInProgress)

// Dispatch was not found.
if !ok {
Expand Down Expand Up @@ -761,7 +788,7 @@ dispatch exists, or false if it no longer exists (i.e. 404).
*/
func (m *launcherMonitor) getDispatchStatus(
ctx *actor.Context, owner string,
dispatchID string,
dispatchID string, ignoreNotFound bool,
) (dispatchInfo launcher.DispatchInfo, dispatchFound bool) {
resp, r, err := m.apiClient.MonitoringApi.
GetEnvironmentStatus(m.apiClient.withAuth(context.TODO()), owner, dispatchID).
Expand All @@ -777,8 +804,8 @@ func (m *launcherMonitor) getDispatchStatus(
ctx.Log().WithField("dispatch-id", dispatchID).
Infof("The job status could not be obtained because the launcher returned HTTP code 404")

// No details, but we know dispatch does not exist
return launcher.DispatchInfo{}, false
// No details, but we know dispatch does not exist (false unless ignoreNotFound)
return launcher.DispatchInfo{}, ignoreNotFound
}

if r != nil && (r.StatusCode == http.StatusUnauthorized ||
Expand Down Expand Up @@ -933,13 +960,19 @@ func (m *launcherMonitor) getTaskLogsFromDispatcher(

/*
Return true if the specified dispatch is in a non-terminal (still running) state.
Or launch is still in progress.
*/
func (m *launcherMonitor) isDispatchInProgress(
ctx *actor.Context,
owner string,
dispatchID string,
) bool {
resp, ok := m.getDispatchStatus(ctx, owner, dispatchID)
ignoreNotFound := false
if job, ok := m.getJobByDispatchID(dispatchID); ok {
ignoreNotFound = job.launchInProgress
}

resp, ok := m.getDispatchStatus(ctx, owner, dispatchID, ignoreNotFound)

// Dispatch was not found.
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion master/internal/rm/dispatcherrm/dispatcher_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestMonitorJobOperations(t *testing.T) {
job := getJob()

// Add the job to the monitored jobs.
jobWatcher.monitorJob(job.user, job.dispatcherID, job.payloadName)
jobWatcher.monitorJob(job.user, job.dispatcherID, job.payloadName, false)
// Wait for the job to be added to the monitored jobs with a timeout of 30 seconds.
timeout := time.Now().Add(30 * time.Second)
for !(jobWatcher.isJobBeingMonitored(job.dispatcherID)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ func (m *dispatcherResourceManager) startLauncherJob(
// Pre-register dispatchID (which is now the AllocationID) with the job
// monitor such that notifyContainerRunning calls that might be delivered prior
// to the synchronous launch returning will be handled properly.
m.jobWatcher.monitorJob(impersonatedUser, dispatchID, payloadName)
m.jobWatcher.monitorJob(impersonatedUser, dispatchID, payloadName, true)

tempDispatchID, err := m.sendManifestToDispatcher(
ctx, manifest, impersonatedUser, string(msg.AllocationID))
Expand All @@ -1040,6 +1040,9 @@ func (m *dispatcherResourceManager) startLauncherJob(

sendResourceStateChangedErrorResponse(ctx, err, msg,
"unable to create the launcher job")
} else {
// Successful launch, clear launchInProgress status
m.jobWatcher.notifyJobLaunched(ctx, dispatchID)
}

if tempDispatchID != dispatchID {
Expand Down Expand Up @@ -1674,7 +1677,7 @@ func (m *dispatcherResourceManager) assignResources(
ctx.Log().Infof("Reconnecting ResourceID %s, DispatchID %s, ImpersontatedUser: %s",
rID, dispatchID, impersonatedUser)

m.jobWatcher.monitorJob(impersonatedUser, dispatchID, "")
m.jobWatcher.monitorJob(impersonatedUser, dispatchID, "", false)
}
} else {
ctx.Log().
Expand Down

0 comments on commit dd77ca9

Please sign in to comment.