diff --git a/master/internal/rm/dispatcherrm/dispatcher_monitor.go b/master/internal/rm/dispatcherrm/dispatcher_monitor.go index d3a4d763fc40..ad59f9d2c3b5 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_monitor.go +++ b/master/internal/rm/dispatcherrm/dispatcher_monitor.go @@ -67,6 +67,7 @@ type launcherJob struct { totalContainers int runningContainers map[int]containerInfo jobWasTerminated bool + launchInProgress bool // Launch proceeding concurrent with monitoring } // launcherMonitor describes the monitoring of jobs created by the launcher. @@ -98,7 +99,10 @@ func newDispatchWatcher(apiClient *launcherAPIClient) *launcherMonitor { // monitorJob adds the specified job to the collection of jobs whose status is monitored. // payload name may be empty (when reconnecting), monitor will retrieve if necessary. -func (m *launcherMonitor) monitorJob(user string, dispatchID string, payloadName string) { +// When launchPending is true, a later call to notifyJobLaunched is required once the +// launcher has initiated the launch and the dispatchID will be valid for status checks. +func (m *launcherMonitor) monitorJob( + user string, dispatchID string, payloadName string, launchPending bool) { m.newLauncherJob <- launcherJob{ user: user, dispatcherID: dispatchID, @@ -107,9 +111,28 @@ func (m *launcherMonitor) monitorJob(user string, dispatchID string, payloadName totalContainers: 0, runningContainers: make(map[int]containerInfo), jobWasTerminated: false, + launchInProgress: launchPending, } } +// When monitoring commences with parameter isLaunching:true specified, 404/NOT_FOUND +// status returns are ignored for this job until notifyJobLaunched is called to enable +// normal monitoring. +func (m *launcherMonitor) notifyJobLaunched( + ctx *actor.Context, + dispatchID string, +) { + m.mu.Lock() + defer m.mu.Unlock() + + if job, ok := m.monitoredJobs[dispatchID]; ok { + job.launchInProgress = false + return + } + + ctx.Log().Errorf("Could not find dispatchID %s to mark as launched", dispatchID) +} + // removeJob removes the specified job from the collection of jobs whose status is monitored. func (m *launcherMonitor) removeJob(dispatchID string) { m.removeLauncherJob <- launcherJob{ @@ -244,6 +267,9 @@ func filterOutSuperfluousMessages(allMessages []string) []string { return messagesMatchingPattern } +// Receive notification that one of the ranks of the job has started +// execution. It generates experiment log messages as the ranks +// start, and updates job info so we know when all have started. func (m *launcherMonitor) notifyContainerRunning( ctx *actor.Context, dispatchID string, @@ -261,6 +287,7 @@ func (m *launcherMonitor) notifyContainerRunning( return } + job.launchInProgress = false job.totalContainers = int(numPeers) switch existingEntry, ok := job.runningContainers[int(rank)]; { @@ -645,7 +672,7 @@ func (m *launcherMonitor) updateJobStatus(ctx *actor.Context, job *launcherJob) ctx.Log().WithField("dispatch-id", dispatchID).Debug("Checking status of launcher job") - resp, ok := m.getDispatchStatus(ctx, owner, dispatchID) + resp, ok := m.getDispatchStatus(ctx, owner, dispatchID, job.launchInProgress) // Dispatch was not found. if !ok { @@ -761,7 +788,7 @@ dispatch exists, or false if it no longer exists (i.e. 404). */ func (m *launcherMonitor) getDispatchStatus( ctx *actor.Context, owner string, - dispatchID string, + dispatchID string, ignoreNotFound bool, ) (dispatchInfo launcher.DispatchInfo, dispatchFound bool) { resp, r, err := m.apiClient.MonitoringApi. GetEnvironmentStatus(m.apiClient.withAuth(context.TODO()), owner, dispatchID). @@ -777,8 +804,8 @@ func (m *launcherMonitor) getDispatchStatus( ctx.Log().WithField("dispatch-id", dispatchID). Infof("The job status could not be obtained because the launcher returned HTTP code 404") - // No details, but we know dispatch does not exist - return launcher.DispatchInfo{}, false + // No details, but we know dispatch does not exist (false unless ignoreNotFound) + return launcher.DispatchInfo{}, ignoreNotFound } if r != nil && (r.StatusCode == http.StatusUnauthorized || @@ -933,13 +960,19 @@ func (m *launcherMonitor) getTaskLogsFromDispatcher( /* Return true if the specified dispatch is in a non-terminal (still running) state. +Or launch is still in progress. */ func (m *launcherMonitor) isDispatchInProgress( ctx *actor.Context, owner string, dispatchID string, ) bool { - resp, ok := m.getDispatchStatus(ctx, owner, dispatchID) + ignoreNotFound := false + if job, ok := m.getJobByDispatchID(dispatchID); ok { + ignoreNotFound = job.launchInProgress + } + + resp, ok := m.getDispatchStatus(ctx, owner, dispatchID, ignoreNotFound) // Dispatch was not found. if !ok { diff --git a/master/internal/rm/dispatcherrm/dispatcher_monitor_test.go b/master/internal/rm/dispatcherrm/dispatcher_monitor_test.go index e208378783cb..01ebe1e22002 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_monitor_test.go +++ b/master/internal/rm/dispatcherrm/dispatcher_monitor_test.go @@ -239,7 +239,7 @@ func TestMonitorJobOperations(t *testing.T) { job := getJob() // Add the job to the monitored jobs. - jobWatcher.monitorJob(job.user, job.dispatcherID, job.payloadName) + jobWatcher.monitorJob(job.user, job.dispatcherID, job.payloadName, false) // Wait for the job to be added to the monitored jobs with a timeout of 30 seconds. timeout := time.Now().Add(30 * time.Second) for !(jobWatcher.isJobBeingMonitored(job.dispatcherID)) { diff --git a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go index bf829b10bf04..cd7613079481 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go +++ b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go @@ -1020,7 +1020,7 @@ func (m *dispatcherResourceManager) startLauncherJob( // Pre-register dispatchID (which is now the AllocationID) with the job // monitor such that notifyContainerRunning calls that might be delivered prior // to the synchronous launch returning will be handled properly. - m.jobWatcher.monitorJob(impersonatedUser, dispatchID, payloadName) + m.jobWatcher.monitorJob(impersonatedUser, dispatchID, payloadName, true) tempDispatchID, err := m.sendManifestToDispatcher( ctx, manifest, impersonatedUser, string(msg.AllocationID)) @@ -1040,6 +1040,9 @@ func (m *dispatcherResourceManager) startLauncherJob( sendResourceStateChangedErrorResponse(ctx, err, msg, "unable to create the launcher job") + } else { + // Successful launch, clear launchInProgress status + m.jobWatcher.notifyJobLaunched(ctx, dispatchID) } if tempDispatchID != dispatchID { @@ -1674,7 +1677,7 @@ func (m *dispatcherResourceManager) assignResources( ctx.Log().Infof("Reconnecting ResourceID %s, DispatchID %s, ImpersontatedUser: %s", rID, dispatchID, impersonatedUser) - m.jobWatcher.monitorJob(impersonatedUser, dispatchID, "") + m.jobWatcher.monitorJob(impersonatedUser, dispatchID, "", false) } } else { ctx.Log().