Skip to content

Commit

Permalink
chore(dispatcherRM): use mapx.Map for scheduledLaunches tracker (dete…
Browse files Browse the repository at this point in the history
…rmined-ai#835) [DET-9328]
  • Loading branch information
stoksc authored and eecsliu committed Apr 16, 2024
1 parent 22fa160 commit fa8c037
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 60 deletions.
13 changes: 7 additions & 6 deletions .circleci/real_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2317,6 +2317,13 @@ jobs:
if rpm -q $DET_PKG_NAME; then
sudo rpm -e $DET_PKG_NAME
fi
- setup-python-venv:
determined: True
extra-requirements-file: "e2e_tests/tests/requirements.txt"
install-python: false
executor: <<parameters.runner_class>>

- run:
name: Recreate Fresh Database
command: |
Expand Down Expand Up @@ -2380,12 +2387,6 @@ jobs:
}
EOF
- setup-python-venv:
determined: True
extra-requirements-file: "e2e_tests/tests/requirements.txt"
install-python: false
executor: <<parameters.runner_class>>

- run-e2e-tests:
mark: <<parameters.mark>>
master-host: localhost
Expand Down
71 changes: 17 additions & 54 deletions master/internal/rm/dispatcherrm/dispatcher_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/determined-ai/determined/master/pkg/logger"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/ptrs"
"github.com/determined-ai/determined/master/pkg/syncx/mapx"
"github.com/determined-ai/determined/master/pkg/tasks"
"github.com/determined-ai/determined/proto/pkg/agentv1"
"github.com/determined-ai/determined/proto/pkg/apiv1"
Expand Down Expand Up @@ -231,10 +232,9 @@ type dispatcherResourceManager struct {
hpcDetailsCache *hpcResourceDetailsCache
poolProviderMap map[string][]string
dispatchIDToHPCJobID map[string]string
scheduledLaunches map[string]bool
scheduledLaunches mapx.Map[model.AllocationID, struct{}]
dispatchIDToAllocationIDMutex sync.RWMutex
dispatchIDToHPCJobIDMutex sync.RWMutex
scheduledLaunchesMutex sync.RWMutex
dbState dispatcherState
}

Expand Down Expand Up @@ -274,7 +274,7 @@ func newDispatcherResourceManager(
poolConfig: poolConfig,
poolProviderMap: makeProvidedPoolsMap(poolConfig),
dispatchIDToHPCJobID: make(map[string]string),
scheduledLaunches: make(map[string]bool),
scheduledLaunches: mapx.New[model.AllocationID, struct{}](),
dbState: *dbState,
}
watcher.rm = result
Expand Down Expand Up @@ -926,13 +926,11 @@ func (m *dispatcherResourceManager) waitForLauncherAsyncThreadToComplete(
msg StartDispatcherResources,
dispatchID *string,
) {
allocationID := msg.AllocationID.String()

// No longer a scheduled launch, since we've now actually launched the job.
defer m.deleteScheduledLaunch(allocationID)
defer m.scheduledLaunches.Delete(msg.AllocationID)

if *dispatchID == "" {
ctx.Log().WithField("allocation-id", allocationID).
ctx.Log().WithField("allocation-id", msg.AllocationID).
Warn("Cannot check for HPC job ID because dispatch ID is empty")
return
}
Expand All @@ -950,7 +948,7 @@ func (m *dispatcherResourceManager) waitForLauncherAsyncThreadToComplete(
// If the allocation ID is no longer in the "scheduledLaunches" map,
// then that means that "resourcesReleased()" was called, and the
// job must have job completed or was terminated.
if !m.isScheduledLaunch(allocationID) {
if _, ok := m.scheduledLaunches.Load(msg.AllocationID); !ok {
jobExited = true
break
}
Expand All @@ -959,21 +957,21 @@ func (m *dispatcherResourceManager) waitForLauncherAsyncThreadToComplete(
}

if jobExited {
ctx.Log().WithField("allocation-id", allocationID).
ctx.Log().WithField("allocation-id", msg.AllocationID).
WithField("dispatch-id", *dispatchID).
Warnf("startLauncherJob goroutine completed but job has already exited")
return
}

if len(hpcJobID) == 0 {
ctx.Log().WithField("allocation-id", allocationID).
ctx.Log().WithField("allocation-id", msg.AllocationID).
WithField("dispatch-id", *dispatchID).
WithField("hpc-job-id", hpcJobID).
Warn("startLauncherJob goroutine completed without HPC job ID")
return
}

ctx.Log().WithField("allocation-id", allocationID).
ctx.Log().WithField("allocation-id", msg.AllocationID).
WithField("dispatch-id", *dispatchID).
WithField("hpc-job-id", hpcJobID).
Info("startLauncherJob goroutine completed with HPC job ID")
Expand All @@ -996,7 +994,7 @@ func (m *dispatcherResourceManager) startLauncherJob(
// help us troubleshoot customer issues.
ctx.Log().WithField("allocation-id", msg.AllocationID).
WithField("description", msg.Spec.Description).
WithField("scheduled-launches", m.getNumScheduledLaunches()).
WithField("scheduled-launches", m.scheduledLaunches.Len()).
Info("Received request to launch job")

hpcDetails, err := m.hpcDetailsCache.load()
Expand Down Expand Up @@ -1077,7 +1075,6 @@ func (m *dispatcherResourceManager) startLauncherJob(

tempDispatchID, err := m.sendManifestToDispatcher(
ctx, manifest, impersonatedUser, string(msg.AllocationID))

if err != nil {
sendResourceStateChangedErrorResponse(ctx, err, msg,
"unable to create the launcher job")
Expand Down Expand Up @@ -1767,12 +1764,9 @@ func (m *dispatcherResourceManager) resourcesReleased(
ctx *actor.Context,
handler *actor.Ref,
) {
allocationID := ""

allocation := m.reqList.Allocation(handler)

if allocation != nil {
allocationID = allocation.ID.String()
var allocationID model.AllocationID
if allocation := m.reqList.Allocation(handler); allocation != nil {
allocationID = allocation.ID

// Remove any scheduled launch for the job associated with the
// allocation ID. Typically, this would be called at the end of
Expand All @@ -1786,12 +1780,12 @@ func (m *dispatcherResourceManager) resourcesReleased(
// also be noted that "resourcesReleased()" may get called multiple
// times, but there's no harm in calling "deleteScheduledLaunch()"
// more than once.
m.deleteScheduledLaunch(allocationID)
m.scheduledLaunches.Delete(allocationID)
}

ctx.Log().WithField("allocation-id", allocationID).
WithField("address", handler.Address()).
WithField("scheduled-launches", m.getNumScheduledLaunches()).
WithField("scheduled-launches", m.scheduledLaunches.Len()).
Info("resources are released")

m.reqList.RemoveTaskByHandler(handler)
Expand Down Expand Up @@ -1874,8 +1868,7 @@ func (m *dispatcherResourceManager) schedulePendingTasks(ctx *actor.Context) {
// requests we send to the launcher, so that we don't overwhelm
// the launcher with too many concurrent requests.
if !req.Restore {
count := m.getNumScheduledLaunches()

count := m.scheduledLaunches.Len()
if count >= maxJobLaunchGoRoutines {
// To help us troubleshoot problems, log a message every 10
// seconds when we've reached our goroutine limit. The
Expand All @@ -1895,44 +1888,14 @@ func (m *dispatcherResourceManager) schedulePendingTasks(ctx *actor.Context) {
// "resourcesReleased()" function will also remove the
// allocation ID from the map, since jobs that are canceled
// too quickly may never call "startLauncherJob()".
m.addScheduledLaunch(req.AllocationID.String())
m.scheduledLaunches.Store(req.AllocationID, struct{}{})
}

m.assignResources(ctx, req)
}
}
}

func (m *dispatcherResourceManager) addScheduledLaunch(allocationID string) {
m.scheduledLaunchesMutex.Lock()
defer m.scheduledLaunchesMutex.Unlock()

m.scheduledLaunches[allocationID] = true
}

func (m *dispatcherResourceManager) deleteScheduledLaunch(allocationID string) {
m.scheduledLaunchesMutex.Lock()
defer m.scheduledLaunchesMutex.Unlock()

delete(m.scheduledLaunches, allocationID)
}

func (m *dispatcherResourceManager) isScheduledLaunch(allocationID string) bool {
m.scheduledLaunchesMutex.RLock()
defer m.scheduledLaunchesMutex.RUnlock()

_, ok := m.scheduledLaunches[allocationID]

return ok
}

func (m *dispatcherResourceManager) getNumScheduledLaunches() int {
m.scheduledLaunchesMutex.RLock()
defer m.scheduledLaunchesMutex.RUnlock()

return len(m.scheduledLaunches)
}

func (m *dispatcherResourceManager) disableAgent(
agentID string,
) (*apiv1.DisableAgentResponse, error) {
Expand Down

0 comments on commit fa8c037

Please sign in to comment.