From 254f2df00ef2370cda3a3ec048eb88e59f66a17f Mon Sep 17 00:00:00 2001 From: Bradley Laney Date: Wed, 29 Mar 2023 12:13:41 -0400 Subject: [PATCH] chore: split some more dispatcher API calls out (#763) --- .../rm/dispatcherrm/dispatcher_api_client.go | 109 ++++++++++++++++++ .../dispatcher_resource_manager.go | 105 +++-------------- .../dispatcher_resource_manager_test.go | 3 - 3 files changed, 127 insertions(+), 90 deletions(-) diff --git a/master/internal/rm/dispatcherrm/dispatcher_api_client.go b/master/internal/rm/dispatcherrm/dispatcher_api_client.go index 9ddfe2da4b00..ffe73dfb7c22 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_api_client.go +++ b/master/internal/rm/dispatcherrm/dispatcher_api_client.go @@ -16,6 +16,13 @@ import ( "github.com/determined-ai/determined/master/internal/config" ) +const blankImpersonatedUser = "" + +// One time activity to create a manifest using SlurmResources carrier. +// This manifest is used on demand to retrieve details regarding HPC resources +// e.g., nodes, GPUs etc. +var hpcResourcesManifest = createSlurmResourcesManifest() + type launcherAPIClient struct { *launcher.APIClient @@ -105,6 +112,84 @@ func (c *launcherAPIClient) getVersion(ctx context.Context) (v *semver.Version, return version, nil } +func (c *launcherAPIClient) launchHPCResourcesJob() ( + info launcher.DispatchInfo, + resp *http.Response, + err error, +) { + defer recordAPITiming("launch_hpc_resources_job")() + defer recordAPIErr("launch_hpc_resources_job")(err) + + // Launch the HPC Resources manifest. Launch() method will ensure + // the manifest is in the RUNNING state on successful completion. + return c.LaunchApi. + Launch(c.withAuth(context.TODO())). + Manifest(hpcResourcesManifest). + Impersonate(blankImpersonatedUser). + Execute() //nolint:bodyclose +} + +func (c *launcherAPIClient) terminateDispatch( + owner string, + id string, +) ( + info launcher.DispatchInfo, + resp *http.Response, + err error, +) { + defer recordAPITiming("terminate")() + defer recordAPIErr("terminate")(err) + + info, resp, err = c.RunningApi. + TerminateRunning(c.withAuth(context.TODO()), owner, id). + Force(true).Execute() //nolint:bodyclose + switch { + case err != nil && resp != nil && resp.StatusCode == 404: + c.log.Debugf("call to terminate missing dispatch %s: %s", id, err) + case err != nil: + return launcher.DispatchInfo{}, nil, fmt.Errorf("terminating dispatch %s: %w", id, err) + default: + c.log.Debugf("terminated dispatch %s", id) + } + return info, resp, nil +} + +func (c *launcherAPIClient) deleteDispatch(owner, id string) (resp *http.Response, err error) { + defer recordAPITiming("delete_env")() + defer recordAPIErr("delete_env")(err) + c.log.Debugf("deleting environment with DispatchID %s", id) + + resp, err = c.MonitoringApi. + DeleteEnvironment(c.withAuth(context.TODO()), owner, id). + Execute() //nolint:bodyclose + switch { + case err != nil && resp != nil && resp.StatusCode == 404: + c.log.Debugf("try to delete environment with DispatchID %s but it is gone", id) + case err != nil: + return nil, fmt.Errorf("removing environment for Dispatch ID %s: %w", id, err) + default: + c.log.Debugf("deleted environment with DispatchID %s", id) + } + return resp, nil +} + +func (c *launcherAPIClient) loadEnvironmentLog(owner, id, logFileName string) ( + log *os.File, + resp *http.Response, + err error, +) { + defer recordAPITiming("load_log")() + defer recordAPIErr("load_log")(err) + + log, resp, err = c.MonitoringApi. + LoadEnvironmentLog(c.withAuth(context.TODO()), owner, id, logFileName). + Execute() //nolint:bodyclose + if err != nil { + return nil, nil, fmt.Errorf("failed to retrieve HPC Resource details: %w", err) + } + return log, resp, nil +} + // handleServiceQueryError provides common error handling for REST API calls // to the launcher in support of RM operations. func (c *launcherAPIClient) handleServiceQueryError(r *http.Response, err error) { @@ -125,3 +210,27 @@ func (c *launcherAPIClient) handleServiceQueryError(r *http.Response, err error) "{%v}. Verify that the launcher service is up and reachable.", err) } } + +// CreateSlurmResourcesManifest creates a Manifest for SlurmResources Carrier. +// This Manifest is used to retrieve information about resources available on the HPC system. +func createSlurmResourcesManifest() launcher.Manifest { + payload := launcher.NewPayloadWithDefaults() + payload.SetName("DAI-HPC-Resources") + payload.SetId("com.cray.analytics.capsules.hpc.resources") + payload.SetVersion("latest") + payload.SetCarriers([]string{slurmResourcesCarrier, pbsResourcesCarrier}) + + // Create payload launch parameters + launchParameters := launcher.NewLaunchParameters() + launchParameters.SetMode("interactive") + payload.SetLaunchParameters(*launchParameters) + + clientMetadata := launcher.NewClientMetadataWithDefaults() + clientMetadata.SetName("DAI-HPC-Resources") + + // Create & populate the manifest + manifest := *launcher.NewManifest("v1", *clientMetadata) + manifest.SetPayloads([]launcher.Payload{*payload}) + + return manifest +} diff --git a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go index d17420a4b2c6..e1d57b6a6fcb 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go +++ b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "fmt" "io" - "net/http" "strings" "sync" "time" @@ -264,7 +263,6 @@ type dispatcherResourceManager struct { rmConfig *config.DispatcherResourceManagerConfig poolConfig []config.ResourcePoolConfig apiClient *launcherAPIClient - hpcResourcesManifest *launcher.Manifest reqList *tasklist.TaskList groups map[*actor.Ref]*tasklist.Group dispatchIDToAllocationID map[string]model.AllocationID @@ -293,11 +291,6 @@ func newDispatcherResourceManager( panic(fmt.Errorf("building dispatcherrm: %w", err)) } - // One time activity to create a manifest using SlurmResources carrier. - // This manifest is used on demand to retrieve details regarding HPC resources - // e.g., nodes, GPUs etc - hpcResourcesManifest := createSlurmResourcesManifest() - watcher := newDispatchWatcher(apiClient) dbState, err := getDispatcherState(context.TODO()) if err != nil { @@ -310,7 +303,6 @@ func newDispatcherResourceManager( rmConfig: rmConfig, apiClient: apiClient, - hpcResourcesManifest: hpcResourcesManifest, reqList: tasklist.New(), groups: make(map[*actor.Ref]*tasklist.Group), dispatchIDToAllocationID: make(map[string]model.AllocationID), @@ -1538,9 +1530,6 @@ func (m *dispatcherResourceManager) resolveSlotType( // This function also queries launcher version and warns user if minimum required // launcher version is not met. func (m *dispatcherResourceManager) fetchHpcResourceDetails(ctx *actor.Context) { - impersonatedUser := "" - newSample := hpcResources{} - // Below code will ensure isUpdating flag of the cache is always set to false, // while exiting the function. defer func() { @@ -1549,26 +1538,15 @@ func (m *dispatcherResourceManager) fetchHpcResourceDetails(ctx *actor.Context) m.resourceDetails.mu.Unlock() }() - // Launch the HPC Resources manifest. Launch() method will ensure - // the manifest is in the RUNNING state on successful completion. - start := time.Now() - dispatchInfo, r, err := m.apiClient.LaunchApi. - Launch(m.apiClient.withAuth(context.TODO())). - Manifest(*m.hpcResourcesManifest). - Impersonate(impersonatedUser). - Execute() //nolint:bodyclose - dispatcherHistogram.WithLabelValues("launch").Observe(time.Since(start).Seconds()) + dispatchInfo, resp, err := m.apiClient.launchHPCResourcesJob() //nolint:bodyclose if err != nil { - dispatcherErrors.WithLabelValues("launch").Inc() - m.apiClient.handleServiceQueryError(r, err) + m.apiClient.handleServiceQueryError(resp, err) return } - ctx.Log().Debugf("Launched Manifest with DispatchID %s", dispatchInfo.GetDispatchId()) - dispatchID := dispatchInfo.GetDispatchId() + ctx.Log().Debugf("Launched Manifest with DispatchID %s", dispatchID) owner := "launcher" - defer m.ResourceQueryPostActions(ctx, dispatchID, owner) logFileName := "slurm-resources-info" @@ -1588,28 +1566,26 @@ func (m *dispatcherResourceManager) fetchHpcResourceDetails(ctx *actor.Context) // to get the partition info and does not create a job, so no job ID is ever // generated. Eventually it will timeout waiting and return, but that's too // long of a delay for us to deal with. - start = time.Now() - resp, _, err := m.apiClient.MonitoringApi. - LoadEnvironmentLog(m.apiClient.withAuth(context.TODO()), owner, dispatchID, logFileName). - Execute() //nolint:bodyclose - dispatcherHistogram.WithLabelValues("load_log").Observe(time.Since(start).Seconds()) + log, _, err := m.apiClient.loadEnvironmentLog(owner, dispatchID, logFileName) //nolint:bodyclose if err != nil { - dispatcherErrors.WithLabelValues("load_log").Inc() - ctx.Log().WithError(err).Errorf("failed to retrieve HPC Resource details. response: {%v}", resp) + ctx.Log().Error(err) return } // Parse the HPC resources file and extract the details into a // HpcResourceDetails object using YAML package. - resourcesBytes, err := io.ReadAll(resp) + resourcesBytes, err := io.ReadAll(log) if err != nil { - ctx.Log().WithError(err).Errorf("failed to read response") + ctx.Log().WithError(err).Errorf("failed to read HPC resources environment log file") return } + + var newSample hpcResources if err = yaml.Unmarshal(resourcesBytes, &newSample); err != nil { ctx.Log().WithError(err).Errorf("failed to parse HPC Resource details") return } + m.hpcResourcesToDebugLog(ctx, newSample) m.resourceDetails.mu.Lock() @@ -1693,21 +1669,10 @@ func (m *dispatcherResourceManager) terminateDispatcherJob(ctx *actor.Context, return false } - var err error - var response *http.Response - start := time.Now() - _, response, err = m.apiClient.RunningApi. - TerminateRunning(m.apiClient.withAuth(context.TODO()), owner, dispatchID). - Force(true).Execute() //nolint:bodyclose - dispatcherHistogram.WithLabelValues("terminate").Observe(time.Since(start).Seconds()) + _, _, err := m.apiClient.terminateDispatch(owner, dispatchID) //nolint:bodyclose if err != nil { - if response == nil || response.StatusCode != 404 { - ctx.Log().WithError(err).Errorf("Failed to terminate job with Dispatch ID %s, response: {%v}", - dispatchID, response) - // We failed to delete, and not 404/notfound so leave in DB. - dispatcherErrors.WithLabelValues("terminate").Inc() - return false - } + ctx.Log().Error(err) + return false } if slurmResourcesPolling { @@ -1736,26 +1701,16 @@ func (m *dispatcherResourceManager) terminateDispatcherJob(ctx *actor.Context, func (m *dispatcherResourceManager) removeDispatchEnvironment( ctx *actor.Context, owner string, dispatchID string, ) { - ctx.Log().Debugf("Deleting environment with DispatchID %s", dispatchID) - start := time.Now() - response, err := m.apiClient.MonitoringApi. - DeleteEnvironment(m.apiClient.withAuth(context.TODO()), owner, dispatchID). - Execute() //nolint:bodyclose - dispatcherHistogram.WithLabelValues("delete_env").Observe(time.Since(start).Seconds()) + _, err := m.apiClient.deleteDispatch(owner, dispatchID) //nolint:bodyclose if err != nil { - if response == nil || response.StatusCode != 404 { - ctx.Log().WithError(err).Errorf("Failed to remove environment for Dispatch ID %s, response:{%v}", - dispatchID, response) - dispatcherErrors.WithLabelValues("delete_env").Inc() - // We failed to delete, and not 404/notfound so leave in DB for later retry - return - } - } else { - ctx.Log().Debugf("Deleted environment with DispatchID %s", dispatchID) + ctx.Log().Error(err) + return } + count, err := db.DeleteDispatch(context.TODO(), dispatchID) if err != nil { ctx.Log().WithError(err).Errorf("Failed to delete DispatchID %s from DB", dispatchID) + return } // On Slurm resource query there may be no Dispatch in the DB, so only log as trace. ctx.Log().Tracef("Deleted DispatchID %s from DB, count %d", dispatchID, count) @@ -2120,30 +2075,6 @@ func (r DispatcherResources) Kill(ctx *actor.Context, _ logger.Context) { }) } -// CreateSlurmResourcesManifest creates a Manifest for SlurmResources Carrier. -// This Manifest is used to retrieve information about resources available on the HPC system. -func createSlurmResourcesManifest() *launcher.Manifest { - payload := launcher.NewPayloadWithDefaults() - payload.SetName("DAI-HPC-Resources") - payload.SetId("com.cray.analytics.capsules.hpc.resources") - payload.SetVersion("latest") - payload.SetCarriers([]string{slurmResourcesCarrier, pbsResourcesCarrier}) - - // Create payload launch parameters - launchParameters := launcher.NewLaunchParameters() - launchParameters.SetMode("interactive") - payload.SetLaunchParameters(*launchParameters) - - clientMetadata := launcher.NewClientMetadataWithDefaults() - clientMetadata.SetName("DAI-HPC-Resources") - - // Create & populate the manifest - manifest := *launcher.NewManifest("v1", *clientMetadata) - manifest.SetPayloads([]launcher.Payload{*payload}) - - return &manifest -} - // schedulingStateFromDispatchState returns SchedulingState from DispatchState representation. func schedulingStateFromDispatchState(state launcher.DispatchState) sproto.SchedulingState { switch state { diff --git a/master/internal/rm/dispatcherrm/dispatcher_resource_manager_test.go b/master/internal/rm/dispatcherrm/dispatcher_resource_manager_test.go index abd809591965..fa992ce43132 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_resource_manager_test.go +++ b/master/internal/rm/dispatcherrm/dispatcher_resource_manager_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - launcher "github.hpe.com/hpe/hpc-ard-launcher-go/launcher" "gotest.tools/assert" "github.com/determined-ai/determined/master/internal/config" @@ -453,7 +452,6 @@ func Test_dispatcherResourceManager_selectDefaultPools(t *testing.T) { type fields struct { config *config.DispatcherResourceManagerConfig apiClient *launcherAPIClient - hpcResourcesManifest *launcher.Manifest reqList *tasklist.TaskList groups map[*actor.Ref]*tasklist.Group dispatchIDToAllocationID map[string]model.AllocationID @@ -561,7 +559,6 @@ func Test_dispatcherResourceManager_selectDefaultPools(t *testing.T) { m := &dispatcherResourceManager{ rmConfig: tt.fields.config, apiClient: tt.fields.apiClient, - hpcResourcesManifest: tt.fields.hpcResourcesManifest, reqList: tt.fields.reqList, groups: tt.fields.groups, dispatchIDToAllocationID: tt.fields.dispatchIDToAllocationID,