Skip to content

Commit

Permalink
chore: Queue job cancelations [DET-9465] (#861)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcorujo authored and stoksc committed Jun 28, 2023
1 parent 287056e commit 60676c9
Showing 1 changed file with 100 additions and 13 deletions.
113 changes: 100 additions & 13 deletions master/internal/rm/dispatcherrm/dispatcher_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/ptrs"
"github.com/determined-ai/determined/master/pkg/syncx/mapx"
"github.com/determined-ai/determined/master/pkg/syncx/orderedmapx"
"github.com/determined-ai/determined/master/pkg/tasks"
"github.com/determined-ai/determined/proto/pkg/agentv1"
"github.com/determined-ai/determined/proto/pkg/apiv1"
Expand Down Expand Up @@ -57,6 +58,10 @@ const (
// memory.
const maxJobLaunchGoRoutines = 8

// Number of worker goroutines that monitor the job cancel queue for job
// cancelation requests.
const numJobCancelWorkers = 8

// Keeps track of how many times "schedulePendingTasks()" was called.
var numTimesScheduledPendingTasksCalled uint64 = 0

Expand Down Expand Up @@ -213,6 +218,16 @@ func New(
}

// dispatcherResourceProvider manages the lifecycle of dispatcher resources.
//
// "jobCancelQueue" is a FIFO queue where job cancelation requests are placed
// waiting for the "jobCancelQueueWorker()" to remove it from the queue and
// send it to "stopLauncherJob()" so that the job termination request can be
// sent to the launcher.
//
// "inflightCancelations" is a list of allocation IDs for job cancelations
// that are in progress. That is, "stopLauncherJob()" is running for that
// allocation ID. The "stopLauncherJob()" function will add the allocation ID
// to the list upon entry and remove it from the list upon exit.
type dispatcherResourceManager struct {
db *db.PgDB
wlmType wlmType
Expand All @@ -228,8 +243,10 @@ type dispatcherResourceManager struct {
poolProviderMap map[string][]string
dispatchIDToHPCJobID map[string]string
scheduledLaunches mapx.Map[model.AllocationID, struct{}]
inflightCancelations mapx.Map[model.AllocationID, struct{}]
dispatchIDToHPCJobIDMutex sync.RWMutex
dbState dispatcherState
jobCancelQueue *orderedmapx.Map[string, *actor.Context]
}

func newDispatcherResourceManager(
Expand All @@ -253,10 +270,9 @@ func newDispatcherResourceManager(
}

result := &dispatcherResourceManager{
db: db,
wlmType: wlmType,
rmConfig: rmConfig,

db: db,
wlmType: wlmType,
rmConfig: rmConfig,
apiClient: apiClient,
reqList: tasklist.New(),
groups: make(map[*actor.Ref]*tasklist.Group),
Expand All @@ -268,7 +284,9 @@ func newDispatcherResourceManager(
poolProviderMap: makeProvidedPoolsMap(poolConfig),
dispatchIDToHPCJobID: make(map[string]string),
scheduledLaunches: mapx.New[model.AllocationID, struct{}](),
inflightCancelations: mapx.New[model.AllocationID, struct{}](),
dbState: *dbState,
jobCancelQueue: orderedmapx.New[string, *actor.Context](),
}
watcher.rm = result

Expand Down Expand Up @@ -299,6 +317,42 @@ func (m *dispatcherResourceManager) getProvidingPartition(name string) string {
return name
}

// jobCancelQueueWorker waits to be notified that a job cancelation request is
// in the queue, then calls "stopLauncherJob()" to cancel the job.
func (m *dispatcherResourceManager) jobCancelQueueWorker(workerID int) {
var ctx *actor.Context
var ok bool

// Loop forever.
for {
// Remove the next job cancelation request from the queue and send
// it to the launcher. If the queue is empty, "GetAndDelete" will
// wait for an element to be placed in the queue.
if ctx, ok = m.jobCancelQueue.GetAndDelete(); ok {
ctx.Log().WithField("worker-id", workerID).
WithField("allocation-id", ctx.Message().(KillDispatcherResources).AllocationID).
WithField("queue-size", m.jobCancelQueue.Length()).
Debug("Job cancel queue worker found request")
m.stopLauncherJob(ctx)
continue
}

// Should never hit this case, but log a warning if we do.
ctx.Log().WithField("worker-id", workerID).
Warn("Job cancel queue worker did not find any requests")
}
}

// startJobCancelWorkers starts up "numWorkers" goroutines which wait to be
// notified that a job cancelation request has been queued. The first worker
// to receive the notification will call "stopLauncherJob()" to cancel the
// job.
func (m *dispatcherResourceManager) startJobCancelWorkers(numWorkers int) {
for i := 0; i < numWorkers; i++ {
go m.jobCancelQueueWorker(i)
}
}

func (m *dispatcherResourceManager) Receive(ctx *actor.Context) error {
switch msg := ctx.Message().(type) {
case actor.PreStart:
Expand All @@ -308,6 +362,8 @@ func (m *dispatcherResourceManager) Receive(ctx *actor.Context) error {
go gcOrphanedDispatches(context.TODO(), ctx.Log(), m.apiClient)
go m.jobWatcher.watch(ctx)

m.startJobCancelWorkers(numJobCancelWorkers)

m.hpcDetailsCache.wait()
actors.NotifyAfter(ctx, actionCoolDown, schedulerTick{})

Expand Down Expand Up @@ -667,7 +723,29 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
}

case KillDispatcherResources:
go m.stopLauncherJob(ctx, msg)
// Check if there is already a job cancelation inflight.
if _, ok := m.inflightCancelations.Load(msg.AllocationID); ok {
message := "Received request to cancel job, but job cancelation is already in progress"
ctx.Log().WithField("allocation-id", msg.AllocationID).Debug(message)
ctx.Tell(ctx.Self(), dispatchExpLogMessage{
DispatchID: string(msg.AllocationID),
Message: message,
})
return nil
}

// Put the job cancelation request in the queue. If there is already a
// request queued, do not queue up a second one. Simply log a message
// both in the master log and the experiment log.
if _, ok := m.jobCancelQueue.PutIfAbsent(string(msg.AllocationID), ctx); !ok {
message := "Received request to cancel job, but job cancelation request is already queued"
ctx.Log().WithField("allocation-id", msg.AllocationID).Debug(message)
ctx.Tell(ctx.Self(), dispatchExpLogMessage{
DispatchID: string(msg.AllocationID),
Message: message,
})
return nil
}

case DispatchStateChange:
log := ctx.Log().WithField("dispatch-id", msg.DispatchID)
Expand Down Expand Up @@ -722,6 +800,13 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
return nil
}

// Job completed while it was sitting in the cancelation queue, so
// remove it so that we don't send a request to the launcher to
// terminate a job that already completed.
if m.jobCancelQueue.Delete(string(allocationID)) {
log.Info("Job completed while still in cancelation queue. Removed job from cancelation queue.")
}

task, ok := m.reqList.TaskByID(allocationID)
if !ok {
log.Warnf("received DispatchExited for dispatch unknown to task list: %s", allocationID)
Expand Down Expand Up @@ -761,11 +846,7 @@ func (m *dispatcherResourceManager) getAssociatedTask(
ctx *actor.Context,
dispatchID string,
) *sproto.AllocateRequest {
allocationID, ok := m.getAllocationIDFromDispatchID(dispatchID)
if !ok {
log.Warnf("received message for unknown dispatch %s", dispatchID)
return nil
}
allocationID := model.AllocationID(dispatchID)

task, ok := m.reqList.TaskByID(allocationID)
if !ok {
Expand Down Expand Up @@ -1060,14 +1141,20 @@ func (m *dispatcherResourceManager) startLauncherJob(
}

// Used only via KillDispatcherResources and called via go routine.
func (m *dispatcherResourceManager) stopLauncherJob(ctx *actor.Context,
msg KillDispatcherResources,
) {
func (m *dispatcherResourceManager) stopLauncherJob(ctx *actor.Context) {
msg := ctx.Message().(KillDispatcherResources)

// Log at INFO level to let us know that the dispatcher resource manager
// actually received the request to delete the job.
ctx.Log().WithField("allocation-id", msg.AllocationID).
Info("Received request to terminate job")

// Make a note that there is a cancelation inflight for this job, so that
// if another cancelation request is received, we ignore it and don't queue
// it.
m.inflightCancelations.Store(msg.AllocationID, struct{}{})
defer m.inflightCancelations.Delete(msg.AllocationID)

// Find the Dispatch IDs associated with the allocation ID. We'll need the
// Dispatch ID to cancel the job on the launcher side.
dispatches, err := db.ListDispatchesByAllocationID(context.TODO(), msg.AllocationID)
Expand Down

0 comments on commit 60676c9

Please sign in to comment.