Skip to content

Commit

Permalink
fix: Launcher DeleteEnvironment causes DispatchRM lockup [DET-9157] (#…
Browse files Browse the repository at this point in the history
…752)

Deletion of the dispatch is done synchronously in the DispatchExited event handler, need to make it async to avoid blocking the event handler.

Moved content of DispatchExited to a go routine (dispatchExited) except for accesses to m.reqList which should remain to avoid the need for additional synchronization.
Identified one other synchronous call to m.removeDispatchEnvironment that needed to be made async.
Added comments to other call sites indicating they are already invoked from an existing go routine so are non-blocking.

Extracted the m.reqList use from startLauncherJob go routine back into the event handler to avoid the need for additional synchronization.
  • Loading branch information
jerryharrow authored and eecsliu committed Jul 22, 2023
1 parent 053c172 commit 566c224
Showing 1 changed file with 101 additions and 72 deletions.
173 changes: 101 additions & 72 deletions master/internal/rm/dispatcherrm/dispatcher_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,14 +680,23 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
m.addTask(ctx, msg)

case StartDispatcherResources:
// Perform any necessary actions on m.reqList before going async
req, ok := m.reqList.TaskByHandler(msg.TaskActor)
if !ok {
sendResourceStateChangedErrorResponse(ctx, errors.New("no such task"), msg,
"task not found in the task list")
// no request to process, so bail
return nil
}

// Start each launcher job in a goroutine to prevent incoming messages
// from backing up, due to the main thread being busy handling one
// message at a time. Adaptive searches may create many launcher jobs
// for a single experiment, so we must allow the main thread to continue
// handling incoming messages while the previous messages are still
// being processed. The UI will become unresponsive if the messages
// start backing up.
go m.startLauncherJob(ctx, msg)
go m.startLauncherJob(ctx, msg, req)

case sproto.PendingPreemption:
ctx.Log().Infof("PendingPreemption of %s. Terminating.", msg.AllocationID)
Expand Down Expand Up @@ -769,7 +778,10 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
})

case DispatchExited:
// Perform any necessary accesses to the m.reqList directly in
// the handler to avoid any synchronization issues.
log := ctx.Log().WithField("dispatch-id", msg.DispatchID)

allocationID, ok := m.getAllocationIDFromDispatchID(msg.DispatchID)
if !ok {
log.Warnf("Received DispatchExited but cannot map the dispatch ID to an allocation ID")
Expand All @@ -787,72 +799,10 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
log.Warnf("allocation has malformed resources: %v", alloc)
return nil
}
r := maps.Values(alloc.Resources)[0]
rID := r.Summary().ResourcesID

if strings.TrimSpace(msg.Message) != "" {
ctx.Tell(task.AllocationRef, sproto.ContainerLog{
AuxMessage: &msg.Message,
Level: ptrs.Ptr("ERROR"),
})
}

stopped := sproto.ResourcesStopped{}
if msg.ExitCode > 0 {
stopped.Failure = sproto.NewResourcesFailure(
sproto.ResourcesFailed,
"",
ptrs.Ptr(sproto.ExitCode(msg.ExitCode)),
)
}

// Turn off printing the last line (exit code 1) from resources.go
if msg.ExitCode == -1 {
stopped.Failure = sproto.NewResourcesFailure(
sproto.ResourcesFailed,
"",
nil,
)
}
// Now preform the actual work asych to avoid blocking
go m.dispatchExited(ctx, msg, allocationID, task, alloc)

log.Infof("Dispatch exited with exit code %d", msg.ExitCode)

ctx.Tell(task.AllocationRef, sproto.ResourcesStateChanged{
ResourcesID: rID,
ResourcesState: sproto.Terminated,
ResourcesStopped: &stopped,
})

// Find the Dispatch IDs associated with the allocation ID. We'll need the
// Dispatch ID to clean up the dispatcher environments for the job.
dispatches, err := db.ListDispatchesByAllocationID(context.TODO(), allocationID)
if err != nil {
ctx.Log().WithError(err).Errorf(
"Failed to retrieve the DispatchIDs associated with AllocationID %s",
allocationID)
return nil
}
ctx.Log().Debugf("Found %d jobs associated with AllocationID %s",
len(dispatches), allocationID)

// Cleanup all the dispatcher environments associated with current allocation
for _, dispatch := range dispatches {
dispatchID := dispatch.DispatchID
impersonatedUser := dispatch.ImpersonatedUser

if ctx.Log().Logger.Level < logrus.DebugLevel {
ctx.Log().WithField("allocation-id", allocationID).Infof(
"Deleting dispatcher environment for job with DispatchID %s initiated by %s",
dispatchID, impersonatedUser)

// Cleanup the dispatcher environment
m.removeDispatchEnvironment(ctx, impersonatedUser, dispatchID)
}
}

// Remove the dispatch from mapping tables.
m.removeDispatchIDFromAllocationIDMap(msg.DispatchID)
m.removeDispatchIDFromHpcJobIDMap(msg.DispatchID)
case sproto.SetGroupMaxSlots:
m.getOrCreateGroup(ctx, msg.Handler).MaxSlots = msg.MaxSlots

Expand All @@ -872,7 +822,85 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
return nil
}

// Called only from DispatchExited event and always run via go routine.
func (m *dispatcherResourceManager) dispatchExited(
ctx *actor.Context,
msg DispatchExited,
allocationID model.AllocationID,
task *sproto.AllocateRequest,
alloc *sproto.ResourcesAllocated,
) {
log := ctx.Log().WithField("dispatch-id", msg.DispatchID)
r := maps.Values(alloc.Resources)[0]
rID := r.Summary().ResourcesID

if strings.TrimSpace(msg.Message) != "" {
ctx.Tell(task.AllocationRef, sproto.ContainerLog{
AuxMessage: &msg.Message,
Level: ptrs.Ptr("ERROR"),
})
}

stopped := sproto.ResourcesStopped{}
if msg.ExitCode > 0 {
stopped.Failure = sproto.NewResourcesFailure(
sproto.ResourcesFailed,
"",
ptrs.Ptr(sproto.ExitCode(msg.ExitCode)),
)
}

// Turn off printing the last line (exit code 1) from resources.go
if msg.ExitCode == -1 {
stopped.Failure = sproto.NewResourcesFailure(
sproto.ResourcesFailed,
"",
nil,
)
}

log.Infof("Dispatch exited with exit code %d", msg.ExitCode)

ctx.Tell(task.AllocationRef, sproto.ResourcesStateChanged{
ResourcesID: rID,
ResourcesState: sproto.Terminated,
ResourcesStopped: &stopped,
})

// Find the Dispatch IDs associated with the allocation ID. We'll need the
// Dispatch ID to clean up the dispatcher environments for the job.
dispatches, err := db.ListDispatchesByAllocationID(context.TODO(), allocationID)
if err != nil {
ctx.Log().WithError(err).Errorf(
"Failed to retrieve the DispatchIDs associated with AllocationID %s",
allocationID)
return
}
ctx.Log().Debugf("Found %d jobs associated with AllocationID %s",
len(dispatches), allocationID)

// Cleanup all the dispatcher environments associated with current allocation
for _, dispatch := range dispatches {
dispatchID := dispatch.DispatchID
impersonatedUser := dispatch.ImpersonatedUser

if ctx.Log().Logger.Level < logrus.DebugLevel {
ctx.Log().WithField("allocation-id", allocationID).Infof(
"Deleting dispatcher environment for job with DispatchID %s initiated by %s",
dispatchID, impersonatedUser)

// Cleanup the dispatcher environment
m.removeDispatchEnvironment(ctx, impersonatedUser, dispatchID)
}
}

// Remove the dispatch from mapping tables.
m.removeDispatchIDFromAllocationIDMap(msg.DispatchID)
m.removeDispatchIDFromHpcJobIDMap(msg.DispatchID)
}

// Common method for sending a terminate request, and appropriately clean up a dispatch.
// Called only from killAllInactiveDispatches which is always run via go routine.
func (m *dispatcherResourceManager) terminateAndDeleteDispatch(
ctx *actor.Context, dispatchID string, impersonatedUser string,
) {
Expand Down Expand Up @@ -925,6 +953,7 @@ func (m *dispatcherResourceManager) waitForDispatchTerminalState(ctx *actor.Cont
func (m *dispatcherResourceManager) startLauncherJob(
ctx *actor.Context,
msg StartDispatcherResources,
req *sproto.AllocateRequest,
) {
var err error

Expand All @@ -938,12 +967,6 @@ func (m *dispatcherResourceManager) startLauncherJob(
WithField("description", msg.Spec.Description).
Info("Received request to launch job")

req, ok := m.reqList.TaskByHandler(msg.TaskActor)
if !ok {
sendResourceStateChangedErrorResponse(ctx, errors.New("no such task"), msg,
"task not found in the task list")
}

slotType := device.CPU

// Only resolve the slot type if the number of slots requested is non-zero.
Expand Down Expand Up @@ -1026,6 +1049,7 @@ func (m *dispatcherResourceManager) startLauncherJob(
m.jobWatcher.monitorJob(impersonatedUser, dispatchID, payloadName)
}

// Used only via KillDispatcherResources and called via go routine.
func (m *dispatcherResourceManager) stopLauncherJob(ctx *actor.Context,
msg KillDispatcherResources,
) {
Expand Down Expand Up @@ -1255,6 +1279,9 @@ func (m *dispatcherResourceManager) receiveJobQueueMsg(ctx *actor.Context) error
return nil

case sproto.DeleteJob:
// Under normal conditions dispatches are removed on termination of the job
// This path allows the cleanup of dispatches associated with a job under
// exceptional conditions (debug mode, crashes, etc).
ctx.Log().Infof("Delete job %s", string(msg.JobID))

dispatches, err := db.ListDispatchesByJobID(context.TODO(), string(msg.JobID))
Expand All @@ -1267,7 +1294,7 @@ func (m *dispatcherResourceManager) receiveJobQueueMsg(ctx *actor.Context) error
}
for _, dispatch := range dispatches {
ctx.Log().Debugf("Found dispatch %s associated with job %s", dispatch.DispatchID, msg.JobID)
m.removeDispatchEnvironment(ctx, dispatch.ImpersonatedUser, dispatch.DispatchID)
go m.removeDispatchEnvironment(ctx, dispatch.ImpersonatedUser, dispatch.DispatchID)
}
ctx.Log().Debugf("Delete job successful %s", msg.JobID)
ctx.Respond(sproto.EmptyDeleteJobResponse())
Expand Down Expand Up @@ -1743,6 +1770,8 @@ func (m *dispatcherResourceManager) hpcResourcesToDebugLog(
// On any REST failure where we cannot confirm the dispatch has been removed
// by the launcher, we skip any attempt to delete the Dispatch from the DB.
// The Dispatch is left in the DB, for a future cleanup attempt on startup.
// Called only from fetchHpcResourceDetails and always run via go routine
// except the one time during startup to retrieve initial cluster cache.
func (m *dispatcherResourceManager) ResourceQueryPostActions(ctx *actor.Context,
dispatchID string, owner string,
) {
Expand Down

0 comments on commit 566c224

Please sign in to comment.