From 27a60c48c4ffa9f5424a5a4e836e1663756d7a95 Mon Sep 17 00:00:00 2001 From: phillip-gaisford <98362331+phillip-gaisford@users.noreply.github.com> Date: Wed, 28 Dec 2022 16:15:44 -0500 Subject: [PATCH] chore: add job_project_source configuration for dispatcher RM (#602) --- .circleci/config.yml | 1 + .../dispatcher_resource_manager_config.go | 26 +++ ...dispatcher_resource_manager_config_test.go | 46 +++++ .../dispatcher_resource_manager.go | 4 +- master/pkg/tasks/dispatcher_task.go | 69 +++++++ master/pkg/tasks/dispatcher_task_test.go | 169 +++++++++++++++++- tools/devcluster-slurm.yaml | 1 + 7 files changed, 314 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index d8cf0a71f94f..78a7e116afc4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1307,6 +1307,7 @@ jobs: - make-package - run: make -C master publish - run: make -C agent publish + - run: mkdir /tmp/pkgs && cp -v */dist/*.{rpm,deb,tar.gz} /tmp/pkgs - store_artifacts: path: /tmp/pkgs diff --git a/master/internal/config/dispatcher_resource_manager_config.go b/master/internal/config/dispatcher_resource_manager_config.go index f5c1aa236b9d..8d2bfec99b37 100644 --- a/master/internal/config/dispatcher_resource_manager_config.go +++ b/master/internal/config/dispatcher_resource_manager_config.go @@ -3,6 +3,7 @@ package config import ( "encoding/json" "fmt" + "strings" "github.com/determined-ai/determined/master/pkg/device" "github.com/determined-ai/determined/master/pkg/model" @@ -14,6 +15,14 @@ const ( enroot = "enroot" ) +// job labeling modes. +const ( + Project = "project" + Workspace = "workspace" + Label = "label" + LabelPrefix = "label:" +) + // DispatcherResourceManagerConfig is the object that stores the values of // the "resource_manager" section of "tools/devcluster.yaml". type DispatcherResourceManagerConfig struct { @@ -39,6 +48,7 @@ type DispatcherResourceManagerConfig struct { GresSupported bool `json:"gres_supported"` DefaultAuxResourcePool *string `json:"default_aux_resource_pool"` DefaultComputeResourcePool *string `json:"default_compute_resource_pool"` + JobProjectSource *string `json:"job_project_source"` Security *DispatcherSecurityConfig `json:"security"` PartitionOverrides map[string]DispatcherPartitionOverrideConfigs `json:"partition_overrides"` @@ -57,6 +67,22 @@ func (c DispatcherResourceManagerConfig) Validate() []error { c.LauncherContainerRunType == enroot) { return []error{fmt.Errorf("invalid launch container run type: '%s'", c.LauncherContainerRunType)} } + return c.validateJobProjectSource() +} + +func (c DispatcherResourceManagerConfig) validateJobProjectSource() []error { + switch { + case c.JobProjectSource == nil: + case *c.JobProjectSource == Project: + case *c.JobProjectSource == Workspace: + case *c.JobProjectSource == Label: + case strings.HasPrefix(*c.JobProjectSource, LabelPrefix): + default: + return []error{fmt.Errorf( + "invalid job_project_source value: '%s'. "+ + "Specify one of project, workspace or label[:value]", + *c.JobProjectSource)} + } return nil } diff --git a/master/internal/config/dispatcher_resource_manager_config_test.go b/master/internal/config/dispatcher_resource_manager_config_test.go index 261c31b354ab..3cfcf4bfc442 100644 --- a/master/internal/config/dispatcher_resource_manager_config_test.go +++ b/master/internal/config/dispatcher_resource_manager_config_test.go @@ -4,11 +4,14 @@ import ( "fmt" "reflect" "testing" + + "github.com/determined-ai/determined/master/pkg/ptrs" ) func TestDispatcherResourceManagerConfig_Validate(t *testing.T) { type fields struct { LauncherContainerRunType string + JobProjectSource *string } tests := []struct { name string @@ -35,11 +38,54 @@ func TestDispatcherResourceManagerConfig_Validate(t *testing.T) { fields: fields{LauncherContainerRunType: "enroot"}, want: nil, }, + { + name: "workspace case", + fields: fields{ + LauncherContainerRunType: "enroot", + JobProjectSource: ptrs.Ptr("workspace"), + }, + want: nil, + }, + { + name: "project case", + fields: fields{ + LauncherContainerRunType: "enroot", + JobProjectSource: ptrs.Ptr("project"), + }, + want: nil, + }, + { + name: "label case", + fields: fields{ + LauncherContainerRunType: "enroot", + JobProjectSource: ptrs.Ptr("label"), + }, + want: nil, + }, + { + name: "label: case", + fields: fields{ + LauncherContainerRunType: "enroot", + JobProjectSource: ptrs.Ptr("label:something"), + }, + want: nil, + }, + { + name: "invalid project source", + fields: fields{ + LauncherContainerRunType: "enroot", + JobProjectSource: ptrs.Ptr("something-bad"), + }, + want: []error{fmt.Errorf( + "invalid job_project_source value: 'something-bad'. " + + "Specify one of project, workspace or label[:value]")}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := DispatcherResourceManagerConfig{ LauncherContainerRunType: tt.fields.LauncherContainerRunType, + JobProjectSource: tt.fields.JobProjectSource, } if got := c.Validate(); !reflect.DeepEqual(got, tt.want) { t.Errorf("DispatcherResourceManagerConfig.Validate() = %v, want %v", got, tt.want) diff --git a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go index b1d7b707dea1..93cdfc338a11 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go +++ b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go @@ -660,7 +660,9 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error manifest, impersonatedUser, payloadName, err := msg.Spec.ToDispatcherManifest( m.rmConfig.MasterHost, m.rmConfig.MasterPort, m.masterTLSConfig.CertificateName, req.SlotsNeeded, slotType, partition, tresSupported, gresSupported, - m.rmConfig.LauncherContainerRunType, m.wlmType == pbsSchedulerType) + m.rmConfig.LauncherContainerRunType, m.wlmType == pbsSchedulerType, + m.rmConfig.JobProjectSource, + ) if err != nil { sendResourceStateChangedErrorResponse(ctx, err, msg, "unable to create the launcher manifest") diff --git a/master/pkg/tasks/dispatcher_task.go b/master/pkg/tasks/dispatcher_task.go index abe599f44191..856399d2f791 100644 --- a/master/pkg/tasks/dispatcher_task.go +++ b/master/pkg/tasks/dispatcher_task.go @@ -3,9 +3,11 @@ package tasks import ( "archive/tar" "encoding/base64" + "encoding/json" "fmt" "path/filepath" "regexp" + "sort" "strconv" "strings" @@ -13,11 +15,13 @@ import ( "github.com/sirupsen/logrus" launcher "github.hpe.com/hpe/hpc-ard-launcher-go/launcher" + "github.com/determined-ai/determined/master/internal/config" "github.com/determined-ai/determined/master/pkg/archive" "github.com/determined-ai/determined/master/pkg/cproto" "github.com/determined-ai/determined/master/pkg/device" "github.com/determined-ai/determined/master/pkg/etc" "github.com/determined-ai/determined/master/pkg/model" + "github.com/determined-ai/determined/master/pkg/schemas/expconf" ) const ( @@ -76,6 +80,7 @@ func (t *TaskSpec) ToDispatcherManifest( gresSupported bool, containerRunType string, isPbsLauncher bool, + labelMode *string, ) (*launcher.Manifest, string, string, error) { /* * The user that the "launcher" is going to run the Determined task @@ -188,6 +193,7 @@ func (t *TaskSpec) ToDispatcherManifest( if err != nil { return nil, "", "", err } + pbsProj, slurmProj := t.jobAndProjectLabels(labelMode) resources, resourceOpts := t.computeResources(tresSupported, numSlots, slotType, gresSupported, isPbsLauncher) @@ -202,6 +208,7 @@ func (t *TaskSpec) ToDispatcherManifest( logrus.WithError(errList[0]).Error("Forbidden slurm option specified") return nil, "", "", errList[0] } + slurmArgs = append(slurmArgs, slurmProj...) customParams["slurmArgs"] = slurmArgs var pbsArgs []string @@ -213,6 +220,7 @@ func (t *TaskSpec) ToDispatcherManifest( logrus.WithError(errList[0]).Error("Forbidden PBS option specified") return nil, "", "", errList[0] } + pbsArgs = append(pbsArgs, pbsProj...) customParams["pbsArgs"] = pbsArgs if containerRunType == podman { @@ -265,6 +273,67 @@ func (t *TaskSpec) ToDispatcherManifest( return &manifest, impersonatedUser, payloadName, err } +// jobAndProjectLabels returns as command options the strings necessary to label +// the job in the specified mode. +func (t *TaskSpec) jobAndProjectLabels(mode *string) (pbsResult, slurmResult []string) { + // Recover the configuration from the JSON in the env. var. + configJSON := t.ExtraEnvVars["DET_EXPERIMENT_CONFIG"] + var expConf expconf.ExperimentConfig + if err := json.Unmarshal([]byte(configJSON), &expConf); err != nil { + logrus.Infof("Unable to Unmarshal exp config: %s", err) + return pbsResult, slurmResult + } + switch { + case (mode == nil || *mode == config.Project): + return computeJobProjectResult(expConf.RawProject) + case *mode == config.Workspace: + return computeJobProjectResult(expConf.RawWorkspace) + case *mode == config.Label: + return computeJobProjectResultForLabels(expConf.Labels(), "") + case strings.HasPrefix(*mode, config.LabelPrefix): + prefix := strings.TrimPrefix(*mode, config.LabelPrefix) + return computeJobProjectResultForLabels(expConf.Labels(), prefix) + } + return pbsResult, slurmResult +} + +func computeJobProjectResult(labelValue *string) (pbsResult, slurmResult []string) { + if labelValue == nil || *labelValue == "" { + return slurmResult, pbsResult + } + slurmResult = append(slurmResult, formatSlurmLabelResult(*labelValue)) + pbsResult = append(pbsResult, formatPbsLabelResult(*labelValue)) + return pbsResult, slurmResult +} + +func computeJobProjectResultForLabels( + labels expconf.Labels, prefix string, +) (pbsResult, slurmResult []string) { + var labelNames []string + for labelName := range labels { + if prefix != "" && !strings.HasPrefix(labelName, prefix) { + continue + } + labelName = strings.TrimPrefix(labelName, prefix) + labelNames = append(labelNames, labelName) + } + if len(labelNames) == 0 { + return pbsResult, slurmResult + } + sort.Strings(labelNames) // to make the tests more reliable + slurmResult = append(slurmResult, formatSlurmLabelResult(strings.Join(labelNames, ","))) + pbsResult = append(pbsResult, formatPbsLabelResult(strings.Join(labelNames, "_"))) + return pbsResult, slurmResult +} + +func formatPbsLabelResult(label string) string { + return fmt.Sprintf("-P %s", label) +} + +func formatSlurmLabelResult(label string) string { + return fmt.Sprintf("--wckey=%s", label) +} + // computeResources calculates the job resource requirements. It also returns any // additional qualifiers required for the desired scheduling behavior (required // for Slurm only at the time of writing). diff --git a/master/pkg/tasks/dispatcher_task_test.go b/master/pkg/tasks/dispatcher_task_test.go index 4cc74af0b793..8ed223c28384 100644 --- a/master/pkg/tasks/dispatcher_task_test.go +++ b/master/pkg/tasks/dispatcher_task_test.go @@ -17,6 +17,7 @@ import ( "github.com/determined-ai/determined/master/pkg/device" "github.com/determined-ai/determined/master/pkg/etc" "github.com/determined-ai/determined/master/pkg/model" + "github.com/determined-ai/determined/master/pkg/ptrs" "github.com/determined-ai/determined/master/pkg/schemas/expconf" ) @@ -645,7 +646,7 @@ func Test_ToDispatcherManifest(t *testing.T) { manifest, userName, payloadName, err := ts.ToDispatcherManifest( "masterHost", 8888, "certName", 16, tt.slotType, "slurm_partition1", tt.tresSupported, tt.gresSupported, tt.containerRunType, - tt.isPbsScheduler) + tt.isPbsScheduler, nil) if tt.wantErr { assert.ErrorContains(t, err, tt.errorContains) @@ -1301,3 +1302,169 @@ func TestTaskSpec_computeResources(t *testing.T) { }) } } + +func TestTaskSpec_jobAndProjectSource(t *testing.T) { + type fields struct { + // Description string + // LoggingFields map[string]string + // ClusterID string + // HarnessPath string + // MasterCert *tls.Certificate + // SSHRsaSize int + // SegmentEnabled bool + // SegmentAPIKey string + // TaskContainerDefaults model.TaskContainerDefaultsConfig + // Environment expconf.EnvironmentConfig + // ResourcesConfig expconf.ResourcesConfig + // WorkDir string + // Owner *model.User + // AgentUserGroup *model.AgentUserGroup + // ExtraArchives []cproto.RunArchive + ExtraEnvVars map[string]string + // Entrypoint []string + // Mounts []mount.Mount + // UseHostMode bool + // ShmSize int64 + // TaskID string + // AllocationID string + // AllocationSessionToken string + // ResourcesID string + // ContainerID string + // Devices []device.Device + // UserSessionToken string + // TaskType model.TaskType + // SlurmConfig expconf.SlurmConfig + // PbsConfig expconf.PbsConfig + } + type args struct { + mode *string + } + tests := []struct { + name string + fields fields + args args + wantPbsResult []string + wantSlurmResult []string + }{ + { + name: "Default mode; workspace & project not specified", + fields: fields{ + ExtraEnvVars: map[string]string{ + "DET_EXPERIMENT_CONFIG": `{"project":"","workspace":""}`, + }, + }, + args: args{ + mode: nil, + }, + }, + { + name: "Default mode; workspace & project values present", + fields: fields{ + ExtraEnvVars: map[string]string{ + "DET_EXPERIMENT_CONFIG": `{"project":"project1","workspace":"workspace1"}`, + }, + }, + args: args{ + mode: nil, + }, + wantPbsResult: []string{"-P project1"}, + wantSlurmResult: []string{"--wckey=project1"}, + }, + { + name: "Project mode", + fields: fields{ + ExtraEnvVars: map[string]string{ + "DET_EXPERIMENT_CONFIG": `{"project":"project1","workspace":"workspace1"}`, + }, + }, + args: args{ + mode: ptrs.Ptr("project"), + }, + wantPbsResult: []string{"-P project1"}, + wantSlurmResult: []string{"--wckey=project1"}, + }, + { + name: "workspace mode", + fields: fields{ + ExtraEnvVars: map[string]string{ + "DET_EXPERIMENT_CONFIG": `{"project":"project1","workspace":"workspace1"}`, + }, + }, + args: args{ + mode: ptrs.Ptr("workspace"), + }, + wantPbsResult: []string{"-P workspace1"}, + wantSlurmResult: []string{"--wckey=workspace1"}, + }, + { + name: "label mode", + fields: fields{ + ExtraEnvVars: map[string]string{ + "DET_EXPERIMENT_CONFIG": `{"project":"project1","workspace":"workspace1",` + + `"labels":["label1","label2"]}`, + }, + }, + args: args{ + mode: ptrs.Ptr("label"), + }, + wantPbsResult: []string{"-P label1_label2"}, + wantSlurmResult: []string{"--wckey=label1,label2"}, + }, + { + name: "label mode, but no labels specified", + fields: fields{ + ExtraEnvVars: map[string]string{ + "DET_EXPERIMENT_CONFIG": `{"project":"project1","workspace":"workspace1"}`, + }, + }, + args: args{ + mode: ptrs.Ptr("label"), + }, + // wantPbsResult: []string{"-P label1_label2"}, + // wantSlurmResult: []string{"--wckey=label1,label2"}, + }, + { + name: "label mode, empty prefix", + fields: fields{ + ExtraEnvVars: map[string]string{ + "DET_EXPERIMENT_CONFIG": `{"project":"project1","workspace":"workspace1",` + + `"labels":["label1","label2"]}`, + }, + }, + args: args{ + mode: ptrs.Ptr("label:"), + }, + wantPbsResult: []string{"-P label1_label2"}, + wantSlurmResult: []string{"--wckey=label1,label2"}, + }, + { + name: "label:prefix mode", + fields: fields{ + ExtraEnvVars: map[string]string{ + "DET_EXPERIMENT_CONFIG": `{"labels":["label1","label2","no-match"]}`, + }, + }, + args: args{ + mode: ptrs.Ptr("label:lab"), + }, + wantPbsResult: []string{"-P el1_el2"}, + wantSlurmResult: []string{"--wckey=el1,el2"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr := &TaskSpec{ + ExtraEnvVars: tt.fields.ExtraEnvVars, + } + gotPbsResult, gotSlurmResult := tr.jobAndProjectLabels(tt.args.mode) + if !reflect.DeepEqual(gotPbsResult, tt.wantPbsResult) { + t.Errorf("TaskSpec.jobAndProjectSource() gotPbsResult = %v, want %v", + gotPbsResult, tt.wantPbsResult) + } + if !reflect.DeepEqual(gotSlurmResult, tt.wantSlurmResult) { + t.Errorf("TaskSpec.jobAndProjectSource() gotSlurmResult = %v, want %v", + gotSlurmResult, tt.wantSlurmResult) + } + }) + } +} diff --git a/tools/devcluster-slurm.yaml b/tools/devcluster-slurm.yaml index 3d1545f90c1f..a12a449bb9d4 100644 --- a/tools/devcluster-slurm.yaml +++ b/tools/devcluster-slurm.yaml @@ -73,6 +73,7 @@ $OPT_TASKCONTAINERDEFAULTS tls: skip_verify: true type: slurm + # job_project_source: project # File containing the authorization token for communication with the launcher -- if blank then none. # This would typically be a full path where the determined master is running generated by # the `dev-keytool token` command.