Skip to content

Commit

Permalink
chore: add job_project_source configuration for dispatcher RM (#602)
Browse files Browse the repository at this point in the history
  • Loading branch information
phillip-gaisford authored and eecsliu committed Jul 22, 2023
1 parent a4beeee commit 3067626
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 2 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,7 @@ jobs:
- make-package
- run: make -C master publish
- run: make -C agent publish

- run: mkdir /tmp/pkgs && cp -v */dist/*.{rpm,deb,tar.gz} /tmp/pkgs
- store_artifacts:
path: /tmp/pkgs
Expand Down
26 changes: 26 additions & 0 deletions master/internal/config/dispatcher_resource_manager_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"encoding/json"
"fmt"
"strings"

"github.com/determined-ai/determined/master/pkg/device"
"github.com/determined-ai/determined/master/pkg/model"
Expand All @@ -14,6 +15,14 @@ const (
enroot = "enroot"
)

// job labeling modes.
const (
Project = "project"
Workspace = "workspace"
Label = "label"
LabelPrefix = "label:"
)

// DispatcherResourceManagerConfig is the object that stores the values of
// the "resource_manager" section of "tools/devcluster.yaml".
type DispatcherResourceManagerConfig struct {
Expand All @@ -39,6 +48,7 @@ type DispatcherResourceManagerConfig struct {
GresSupported bool `json:"gres_supported"`
DefaultAuxResourcePool *string `json:"default_aux_resource_pool"`
DefaultComputeResourcePool *string `json:"default_compute_resource_pool"`
JobProjectSource *string `json:"job_project_source"`

Security *DispatcherSecurityConfig `json:"security"`
PartitionOverrides map[string]DispatcherPartitionOverrideConfigs `json:"partition_overrides"`
Expand All @@ -57,6 +67,22 @@ func (c DispatcherResourceManagerConfig) Validate() []error {
c.LauncherContainerRunType == enroot) {
return []error{fmt.Errorf("invalid launch container run type: '%s'", c.LauncherContainerRunType)}
}
return c.validateJobProjectSource()
}

func (c DispatcherResourceManagerConfig) validateJobProjectSource() []error {
switch {
case c.JobProjectSource == nil:
case *c.JobProjectSource == Project:
case *c.JobProjectSource == Workspace:
case *c.JobProjectSource == Label:
case strings.HasPrefix(*c.JobProjectSource, LabelPrefix):
default:
return []error{fmt.Errorf(
"invalid job_project_source value: '%s'. "+
"Specify one of project, workspace or label[:value]",
*c.JobProjectSource)}
}
return nil
}

Expand Down
46 changes: 46 additions & 0 deletions master/internal/config/dispatcher_resource_manager_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"fmt"
"reflect"
"testing"

"github.com/determined-ai/determined/master/pkg/ptrs"
)

func TestDispatcherResourceManagerConfig_Validate(t *testing.T) {
type fields struct {
LauncherContainerRunType string
JobProjectSource *string
}
tests := []struct {
name string
Expand All @@ -35,11 +38,54 @@ func TestDispatcherResourceManagerConfig_Validate(t *testing.T) {
fields: fields{LauncherContainerRunType: "enroot"},
want: nil,
},
{
name: "workspace case",
fields: fields{
LauncherContainerRunType: "enroot",
JobProjectSource: ptrs.Ptr("workspace"),
},
want: nil,
},
{
name: "project case",
fields: fields{
LauncherContainerRunType: "enroot",
JobProjectSource: ptrs.Ptr("project"),
},
want: nil,
},
{
name: "label case",
fields: fields{
LauncherContainerRunType: "enroot",
JobProjectSource: ptrs.Ptr("label"),
},
want: nil,
},
{
name: "label: case",
fields: fields{
LauncherContainerRunType: "enroot",
JobProjectSource: ptrs.Ptr("label:something"),
},
want: nil,
},
{
name: "invalid project source",
fields: fields{
LauncherContainerRunType: "enroot",
JobProjectSource: ptrs.Ptr("something-bad"),
},
want: []error{fmt.Errorf(
"invalid job_project_source value: 'something-bad'. " +
"Specify one of project, workspace or label[:value]")},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := DispatcherResourceManagerConfig{
LauncherContainerRunType: tt.fields.LauncherContainerRunType,
JobProjectSource: tt.fields.JobProjectSource,
}
if got := c.Validate(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("DispatcherResourceManagerConfig.Validate() = %v, want %v", got, tt.want)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,9 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
manifest, impersonatedUser, payloadName, err := msg.Spec.ToDispatcherManifest(
m.rmConfig.MasterHost, m.rmConfig.MasterPort, m.masterTLSConfig.CertificateName,
req.SlotsNeeded, slotType, partition, tresSupported, gresSupported,
m.rmConfig.LauncherContainerRunType, m.wlmType == pbsSchedulerType)
m.rmConfig.LauncherContainerRunType, m.wlmType == pbsSchedulerType,
m.rmConfig.JobProjectSource,
)
if err != nil {
sendResourceStateChangedErrorResponse(ctx, err, msg,
"unable to create the launcher manifest")
Expand Down
69 changes: 69 additions & 0 deletions master/pkg/tasks/dispatcher_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@ package tasks
import (
"archive/tar"
"encoding/base64"
"encoding/json"
"fmt"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"

"github.com/docker/docker/api/types/mount"
"github.com/sirupsen/logrus"
launcher "github.hpe.com/hpe/hpc-ard-launcher-go/launcher"

"github.com/determined-ai/determined/master/internal/config"
"github.com/determined-ai/determined/master/pkg/archive"
"github.com/determined-ai/determined/master/pkg/cproto"
"github.com/determined-ai/determined/master/pkg/device"
"github.com/determined-ai/determined/master/pkg/etc"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/schemas/expconf"
)

const (
Expand Down Expand Up @@ -76,6 +80,7 @@ func (t *TaskSpec) ToDispatcherManifest(
gresSupported bool,
containerRunType string,
isPbsLauncher bool,
labelMode *string,
) (*launcher.Manifest, string, string, error) {
/*
* The user that the "launcher" is going to run the Determined task
Expand Down Expand Up @@ -188,6 +193,7 @@ func (t *TaskSpec) ToDispatcherManifest(
if err != nil {
return nil, "", "", err
}
pbsProj, slurmProj := t.jobAndProjectLabels(labelMode)

resources, resourceOpts := t.computeResources(tresSupported, numSlots,
slotType, gresSupported, isPbsLauncher)
Expand All @@ -202,6 +208,7 @@ func (t *TaskSpec) ToDispatcherManifest(
logrus.WithError(errList[0]).Error("Forbidden slurm option specified")
return nil, "", "", errList[0]
}
slurmArgs = append(slurmArgs, slurmProj...)
customParams["slurmArgs"] = slurmArgs

var pbsArgs []string
Expand All @@ -213,6 +220,7 @@ func (t *TaskSpec) ToDispatcherManifest(
logrus.WithError(errList[0]).Error("Forbidden PBS option specified")
return nil, "", "", errList[0]
}
pbsArgs = append(pbsArgs, pbsProj...)
customParams["pbsArgs"] = pbsArgs

if containerRunType == podman {
Expand Down Expand Up @@ -265,6 +273,67 @@ func (t *TaskSpec) ToDispatcherManifest(
return &manifest, impersonatedUser, payloadName, err
}

// jobAndProjectLabels returns as command options the strings necessary to label
// the job in the specified mode.
func (t *TaskSpec) jobAndProjectLabels(mode *string) (pbsResult, slurmResult []string) {
// Recover the configuration from the JSON in the env. var.
configJSON := t.ExtraEnvVars["DET_EXPERIMENT_CONFIG"]
var expConf expconf.ExperimentConfig
if err := json.Unmarshal([]byte(configJSON), &expConf); err != nil {
logrus.Infof("Unable to Unmarshal exp config: %s", err)
return pbsResult, slurmResult
}
switch {
case (mode == nil || *mode == config.Project):
return computeJobProjectResult(expConf.RawProject)
case *mode == config.Workspace:
return computeJobProjectResult(expConf.RawWorkspace)
case *mode == config.Label:
return computeJobProjectResultForLabels(expConf.Labels(), "")
case strings.HasPrefix(*mode, config.LabelPrefix):
prefix := strings.TrimPrefix(*mode, config.LabelPrefix)
return computeJobProjectResultForLabels(expConf.Labels(), prefix)
}
return pbsResult, slurmResult
}

func computeJobProjectResult(labelValue *string) (pbsResult, slurmResult []string) {
if labelValue == nil || *labelValue == "" {
return slurmResult, pbsResult
}
slurmResult = append(slurmResult, formatSlurmLabelResult(*labelValue))
pbsResult = append(pbsResult, formatPbsLabelResult(*labelValue))
return pbsResult, slurmResult
}

func computeJobProjectResultForLabels(
labels expconf.Labels, prefix string,
) (pbsResult, slurmResult []string) {
var labelNames []string
for labelName := range labels {
if prefix != "" && !strings.HasPrefix(labelName, prefix) {
continue
}
labelName = strings.TrimPrefix(labelName, prefix)
labelNames = append(labelNames, labelName)
}
if len(labelNames) == 0 {
return pbsResult, slurmResult
}
sort.Strings(labelNames) // to make the tests more reliable
slurmResult = append(slurmResult, formatSlurmLabelResult(strings.Join(labelNames, ",")))
pbsResult = append(pbsResult, formatPbsLabelResult(strings.Join(labelNames, "_")))
return pbsResult, slurmResult
}

func formatPbsLabelResult(label string) string {
return fmt.Sprintf("-P %s", label)
}

func formatSlurmLabelResult(label string) string {
return fmt.Sprintf("--wckey=%s", label)
}

// computeResources calculates the job resource requirements. It also returns any
// additional qualifiers required for the desired scheduling behavior (required
// for Slurm only at the time of writing).
Expand Down
Loading

0 comments on commit 3067626

Please sign in to comment.