Skip to content

Commit

Permalink
feat: fetch, store and provide external job details as part of the Di…
Browse files Browse the repository at this point in the history
…spatcher RM. (#855)
  • Loading branch information
jagadeesh545 authored and djanicekpach committed Feb 29, 2024
1 parent 75428ab commit 7addd70
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 15 deletions.
14 changes: 14 additions & 0 deletions master/internal/job/authz_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/determined-ai/determined/master/internal/authz"
"github.com/determined-ai/determined/master/internal/cluster"
"github.com/determined-ai/determined/master/internal/command"
"github.com/determined-ai/determined/master/internal/config"
"github.com/determined-ai/determined/master/internal/db"
Expand Down Expand Up @@ -87,6 +88,14 @@ func (a *JobAuthZRBAC) FilterJobs(
}
}

userCanViewExternalJobs := false
permErr, err := cluster.AuthZProvider.Get().CanViewExternalJobs(ctx, &curUser)
if err != nil {
log.Warnf("Failed to check VIEW_EXTERNAL_JOBS permission for user %s: %s",
curUser.Username, err.Error())
} else if permErr == nil {
userCanViewExternalJobs = true
}
viewableJobs = make([]*jobv1.Job, 0)
for _, job := range jobs {
switch job.Type {
Expand All @@ -101,6 +110,11 @@ func (a *JobAuthZRBAC) FilterJobs(
if userHasGlobalNTSCViewPerm || viewable {
viewableJobs = append(viewableJobs, job)
}
case jobv1.Type_TYPE_EXTERNAL:
if userCanViewExternalJobs {
viewableJobs = append(viewableJobs, job)
}
continue
// TODO: special case for tensorboard.
default:
log.Warnf("ignoring job type: %s", job.Type)
Expand Down
199 changes: 191 additions & 8 deletions master/internal/rm/dispatcherrm/dispatcher_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,43 @@ import (
"net/http"
"regexp"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/ghodss/yaml"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/determined-ai/determined/master/pkg/mathx"
"github.com/determined-ai/determined/master/pkg/syncx/mapx"
"github.com/determined-ai/determined/proto/pkg/jobv1"

launcher "github.hpe.com/hpe/hpc-ard-launcher-go/launcher"
)

//nolint:lll
const (
pollLoopInterval = time.Duration(10) * time.Second
ignoredReporter = "com.cray.analytics.capsules.dispatcher.shasta.ShastaDispatcher"
errorLinesToRetrieve = 500
errorLinesToDisplay = 15
pollLoopInterval = time.Duration(10) * time.Second
ignoredReporter = "com.cray.analytics.capsules.dispatcher.shasta.ShastaDispatcher"
errorLinesToRetrieve = 500
errorLinesToDisplay = 15
slotsValueNotAvailable = 0
notAvailable = "Not Available"
keyMissingErrorMessage = "The '%s' key is missing from the job details for external job id %s"
invalidValueErrorMessage = "Invalid '%s' value '%s' for external job id %s: %s"
)

// Keys for External Job Details Map.
const (
jobState = "state"
jobName = "name"
jobPartition = "partition"
jobSubmitTime = "submitTime"
jobUserName = "userName"
jobGPUSlots = "gpuSlots"
jobCPUSlots = "cpuSlots"
)

// SlurmPrologReasonCode is the Slurm Prolog Reason Code.
Expand Down Expand Up @@ -99,6 +117,7 @@ type launcherMonitor struct {
checkLauncherJob chan *launcherJob
processingWatchedJobs atomic.Bool
dispatchIDToHPCJobID *mapx.Map[string, string]
externalJobs mapx.Map[string, map[string]string]
}

// dispatchLastJobStatusCheckTime is used to sort the dispatches by the time
Expand All @@ -120,6 +139,7 @@ func newDispatchWatcher(
monitoredJobs: mapx.New[string, *launcherJob](),
jobsToRemove: mapx.New[string, struct{}](),
apiClient: apiClient,
externalJobs: mapx.New[string, map[string]string](),
newLauncherJob: make(chan *launcherJob),
removeLauncherJob: make(chan *launcherJob),
checkLauncherJob: make(chan *launcherJob),
Expand Down Expand Up @@ -437,32 +457,46 @@ func (m *launcherMonitor) processWatchedJobs() {

// Loop through the jobs in the monitoredJobs map and update status accordingly
for _, dispatchID := range sortedDispatchIDs {
if m.isJobBeingRemoved(dispatchID) {
m.removeJobFromMonitoredList(dispatchID)
if job, ok = m.getJobByDispatchID(dispatchID); !ok {
m.syslog.Warnf("dispatcher_monitor did not find job for dispatchID %s", dispatchID)
continue
}

if job, ok = m.getJobByDispatchID(dispatchID); !ok {
m.syslog.Warnf("dispatcher_monitor did not find job for dispatchID %s", dispatchID)
// In each case below, remove the processed job details from the qStats map. This will
// leave only the external jobs in the qStats map after all DispatchIDs are processed.
// The qStats map with just the external jobs will be processed in the next for loop.

if m.isJobBeingRemoved(dispatchID) {
m.removeJobFromMonitoredList(dispatchID)
delete(qStats, job.hpcJobID)
continue
}

if m.obtainJobStateFromWlmQueueDetails(dispatchID, qStats, job) {
m.updateLastJobStatusCheckTime(dispatchID)
delete(qStats, job.hpcJobID)
continue // An optimization to avoid per-job use of updateJobStatus (below)
}

if removeJob := m.updateJobStatus(job); removeJob {
m.removeJobFromMonitoredList(dispatchID)
delete(qStats, job.hpcJobID)
continue
}

m.updateLastJobStatusCheckTime(dispatchID)
delete(qStats, job.hpcJobID)
}

// There are chances that jobsToRemove might still have some elements remaining.
// These values are stale and can be removed safely.
m.clearJobsToRemoveMap()

// Refresh the external jobs map.
m.externalJobs.Clear()
for qJobID, qJobDetails := range qStats {
m.externalJobs.Store(qJobID, qJobDetails)
}
}

// obtainJobStateFromWlmQueueDetails gets the state of the specified dispatch from
Expand Down Expand Up @@ -1056,3 +1090,152 @@ func (m *launcherMonitor) isDispatchInProgress(owner string, dispatchID string)
_, _, exited := calculateJobExitStatus(resp)
return !exited
}

// getRequestedSlots retrieves the value for Requested Slots using the provided job details map.
// It looks for GPU slots value first and CPU slots value later. It will return the first valid
// value. In case of errors, warning messages are logged and default value of zero is returned.
func (m *launcherMonitor) getRequestedSlots(qJobID string, qJobDetails map[string]string) int32 {
var requestedSlots int64 = slotsValueNotAvailable
var err error
keys := []string{jobGPUSlots, jobCPUSlots}
// Look for GPU slots first and CPU slots later.
// Return the first valid value.
for _, key := range keys {
if len(qJobDetails[key]) == 0 {
m.syslog.Warnf(keyMissingErrorMessage, key, qJobID)
} else {
requestedSlots, err = strconv.ParseInt(qJobDetails[key], 10, 32)
if err != nil {
m.syslog.Warnf(invalidValueErrorMessage, key, qJobDetails[key], qJobID, err.Error())
requestedSlots = slotsValueNotAvailable
}
if requestedSlots != slotsValueNotAvailable {
break
}
}
}
return int32(requestedSlots)
}

// getSubmitTime retrieves the value for Job Submission Time using the provided job details map.
// In case of errors, warning messages are logged and default value of zero timestamp is returned.
func (m *launcherMonitor) getSubmitTime(qJobID string, qJobDetails map[string]string) time.Time {
submitTime := time.Time{}
var err error
if len(qJobDetails[jobSubmitTime]) == 0 {
m.syslog.Warnf(keyMissingErrorMessage, jobSubmitTime, qJobID)
} else {
submitTime, err = time.Parse(time.RFC3339, qJobDetails[jobSubmitTime])
if err != nil {
m.syslog.Warnf(invalidValueErrorMessage, jobSubmitTime, qJobDetails[jobSubmitTime],
qJobID, err.Error())
submitTime = time.Time{}
}
}
return submitTime
}

// convertHpcStatus converts the HPC WLM status into a DeterminedAI job status.
func (m *launcherMonitor) convertHpcStatus(slurmStatus string) jobv1.State {
switch slurmStatus {
case "PENDING":
return jobv1.State_STATE_QUEUED
case "RUNNING":
return jobv1.State_STATE_SCHEDULED
default:
return jobv1.State_STATE_UNSPECIFIED
}
}

// getJobSummary retrieves the value for Job State using the provided job details map. It returns
// jobv1.JobSummary value which contains member State indicating the job state. In case of errors,
// warning messages are logged and default value of JobSummary with State set to
// jobv1.State_STATE_UNSPECIFIED is returned.
func (m *launcherMonitor) getJobSummary(
qJobID string, qJobDetails map[string]string,
) *jobv1.JobSummary {
jobSummary := &jobv1.JobSummary{
State: jobv1.State_STATE_UNSPECIFIED,
}
if len(qJobDetails[jobState]) == 0 {
m.syslog.Warnf(keyMissingErrorMessage, jobState, qJobID)
} else {
jobSummary = &jobv1.JobSummary{
State: m.convertHpcStatus(qJobDetails[jobState]),
}
}
return jobSummary
}

// getValueFromJobDetails retrieves the string values for the provided key from the given job
// details map. In case of errors, warning messages are logged and default value 'Not Available'
// is returned.
func (m *launcherMonitor) getValueFromJobDetails(
qJobID string, qJobDetails map[string]string, key string,
) string {
value := notAvailable
if len(qJobDetails[key]) == 0 {
m.syslog.Warnf(keyMissingErrorMessage, key, qJobID)
} else {
value = qJobDetails[key]
}
return value
}

// convertExternalJobs returns a new jobv1.Job instance created using the provided external job
// details.
func (m *launcherMonitor) convertExternalJobs(
qJobID string, qJobDetails map[string]string,
) *jobv1.Job {
if len(qJobID) == 0 {
m.syslog.Warn("Cannot convert the external job. Job ID is missing.")
return nil
}
if qJobDetails == nil {
m.syslog.Warn("Cannot convert the external job. Job Details are missing.")
return nil
}

requestedSlots := m.getRequestedSlots(qJobID, qJobDetails)
submitTime := m.getSubmitTime(qJobID, qJobDetails)
jobSummary := m.getJobSummary(qJobID, qJobDetails)
name := m.getValueFromJobDetails(qJobID, qJobDetails, jobName)
userName := m.getValueFromJobDetails(qJobID, qJobDetails, jobUserName)
partition := m.getValueFromJobDetails(qJobID, qJobDetails, jobPartition)

var allocatedSlots int32 = slotsValueNotAvailable
if jobSummary.State == jobv1.State_STATE_SCHEDULED {
allocatedSlots = requestedSlots
}

job := jobv1.Job{
JobId: qJobID,
Type: jobv1.Type_TYPE_EXTERNAL,
Summary: jobSummary,
Name: name,
Username: userName,
SubmissionTime: timestamppb.New(submitTime),
AllocatedSlots: allocatedSlots,
RequestedSlots: requestedSlots,
ResourcePool: partition,
}
return &job
}

// fetchExternalJobs returns a list of external jobs in the jobv1.Job format.
// If a resource pool name is provided the list is filtered to return only the jobs related to the
// provided resource pool.
func (m *launcherMonitor) fetchExternalJobs(resourcePool string) []*jobv1.Job {
externalJobs := []*jobv1.Job{}
m.externalJobs.WithLock(func(inmap map[string]map[string]string) {
for jobID, jobDetails := range inmap {
if len(resourcePool) == 0 || jobDetails["partition"] == resourcePool {
convertedExternalJob := m.convertExternalJobs(jobID, jobDetails)
if convertedExternalJob != nil {
externalJobs = append(externalJobs, convertedExternalJob)
}
}
}
})
return externalJobs
}
Loading

0 comments on commit 7addd70

Please sign in to comment.