Skip to content

Commit

Permalink
chore: Pass in the allocation ID to releasedResources() (determined-a…
Browse files Browse the repository at this point in the history
  • Loading branch information
CanmingCobble authored and determined-ci committed Jun 21, 2023
1 parent 8f4abc8 commit 0572c44
Showing 1 changed file with 20 additions and 33 deletions.
53 changes: 20 additions & 33 deletions master/internal/rm/dispatcherrm/dispatcher_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,11 +792,7 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
// the handler to avoid any synchronization issues.
log := ctx.Log().WithField("dispatch-id", msg.DispatchID)

allocationID, ok := m.getAllocationIDFromDispatchID(msg.DispatchID)
if !ok {
log.Warnf("Received DispatchExited but cannot map the dispatch ID to an allocation ID")
return nil
}
allocationID := model.AllocationID(msg.DispatchID)

// Job completed while it was sitting in the cancelation queue, so
// remove it so that we don't send a request to the launcher to
Expand All @@ -818,7 +814,7 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
}

// Now preform the actual work asych to avoid blocking
go m.dispatchExited(ctx, msg, allocationID, task, alloc)
go m.dispatchExited(ctx, msg, task, alloc)

case sproto.SetGroupMaxSlots:
m.getOrCreateGroup(ctx, msg.Handler).MaxSlots = msg.MaxSlots
Expand All @@ -830,7 +826,7 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
m.receiveSetTaskName(ctx, msg)

case sproto.ResourcesReleased:
m.resourcesReleased(ctx, msg.AllocationRef)
m.resourcesReleased(ctx, msg.AllocationID, msg.AllocationRef)

default:
ctx.Log().Errorf("receiveRequestMsg: unexpected message %T", msg)
Expand Down Expand Up @@ -858,7 +854,6 @@ func (m *dispatcherResourceManager) getAssociatedTask(
func (m *dispatcherResourceManager) dispatchExited(
ctx *actor.Context,
msg DispatchExited,
allocationID model.AllocationID,
task *sproto.AllocateRequest,
alloc *sproto.ResourcesAllocated,
) {
Expand Down Expand Up @@ -899,6 +894,8 @@ func (m *dispatcherResourceManager) dispatchExited(
ResourcesStopped: &stopped,
})

allocationID := model.AllocationID(msg.DispatchID)

// Find the Dispatch IDs associated with the allocation ID. We'll need the
// Dispatch ID to clean up the dispatcher environments for the job.
dispatches, err := db.ListDispatchesByAllocationID(context.TODO(), allocationID)
Expand Down Expand Up @@ -1230,13 +1227,6 @@ func (m *dispatcherResourceManager) stopLauncherJob(ctx *actor.Context) {
}
}

// Gets the allocation ID for the specified dispatch ID.
func (m *dispatcherResourceManager) getAllocationIDFromDispatchID(
dispatchID string,
) (model.AllocationID, bool) {
return model.AllocationID(dispatchID), true
}

// Log the failure, and send a ResourcesStateChanged describing the failure.
func sendResourceStateChangedErrorResponse(
ctx *actor.Context, err error,
Expand Down Expand Up @@ -1629,6 +1619,7 @@ func (m *dispatcherResourceManager) sendManifestToDispatcher(

func (m *dispatcherResourceManager) addTask(ctx *actor.Context, msg sproto.AllocateRequest) {
actors.NotifyOnStop(ctx, msg.AllocationRef, sproto.ResourcesReleased{
AllocationID: msg.AllocationID,
AllocationRef: msg.AllocationRef,
})

Expand Down Expand Up @@ -1741,26 +1732,22 @@ func (m *dispatcherResourceManager) assignResources(

func (m *dispatcherResourceManager) resourcesReleased(
ctx *actor.Context,
allocationID model.AllocationID,
handler *actor.Ref,
) {
var allocationID model.AllocationID
if allocation := m.reqList.Allocation(handler); allocation != nil {
allocationID = allocation.ID

// Remove any scheduled launch for the job associated with the
// allocation ID. Typically, this would be called at the end of
// "startLauncherJob()". However, if the experiment is canceled
// immediately, it is possible for "resourcesReleased()" to be called
// without ever getting a "StartDispatcherResources" message.
// Therefore, we make sure we remove the allocation ID from the
// "scheduledLaunches" map here, since the size of the map determines
// if "schedulePendingTasks()" will assign resources to another job
// (i.e., send another StartDispatcherResources message). It should
// also be noted that "resourcesReleased()" may get called multiple
// times, but there's no harm in calling "deleteScheduledLaunch()"
// more than once.
m.scheduledLaunches.Delete(allocationID)
}
// Remove any scheduled launch for the job associated with the
// allocation ID. Typically, this would be called at the end of
// "startLauncherJob()". However, if the experiment is canceled
// immediately, it is possible for "resourcesReleased()" to be called
// without ever getting a "StartDispatcherResources" message.
// Therefore, we make sure we remove the allocation ID from the
// "scheduledLaunches" map here, since the size of the map determines
// if "schedulePendingTasks()" will assign resources to another job
// (i.e., send another StartDispatcherResources message). It should
// also be noted that "resourcesReleased()" may get called multiple
// times, but there's no harm in calling "deleteScheduledLaunch()"
// more than once.
m.scheduledLaunches.Delete(allocationID)

ctx.Log().WithField("allocation-id", allocationID).
WithField("address", handler.Address()).
Expand Down

0 comments on commit 0572c44

Please sign in to comment.