From 79a5e16a837630a635fe38137c68c1ebbd9ad608 Mon Sep 17 00:00:00 2001 From: "Jerry J. Harrow" <84593277+jerryharrow@users.noreply.github.com> Date: Thu, 6 Apr 2023 08:44:07 -0400 Subject: [PATCH] chore: Move queuesFromCluster launch into dispatcher_api_client [DET-9278] (#794) - Drop hard-coded 'launcher' and use the user in the dispatch response - Move the the launch of HPC queues into shared dispatch_api_client --- .../rm/dispatcherrm/dispatcher_api_client.go | 48 +++++++++++++++++++ .../rm/dispatcherrm/dispatcher_monitor.go | 47 +++++++----------- .../hpc_resource_details_cache.go | 4 +- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/master/internal/rm/dispatcherrm/dispatcher_api_client.go b/master/internal/rm/dispatcherrm/dispatcher_api_client.go index ffe73dfb7c22..cd36cfed05be 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_api_client.go +++ b/master/internal/rm/dispatcherrm/dispatcher_api_client.go @@ -16,6 +16,7 @@ import ( "github.com/determined-ai/determined/master/internal/config" ) +// Blank user runs as launcher-configured user. const blankImpersonatedUser = "" // One time activity to create a manifest using SlurmResources carrier. @@ -23,6 +24,11 @@ const blankImpersonatedUser = "" // e.g., nodes, GPUs etc. var hpcResourcesManifest = createSlurmResourcesManifest() +// One time activity to create a manifest using Slurm/PBSQueue carrier. +// This manifest is used on demand to retrieve details regarding +// pending/running HPC jobs. +var hpcQueueManifest = createHpcQueueManifest() + type launcherAPIClient struct { *launcher.APIClient @@ -129,6 +135,23 @@ func (c *launcherAPIClient) launchHPCResourcesJob() ( Execute() //nolint:bodyclose } +func (c *launcherAPIClient) launchHPCQueueJob() ( + info launcher.DispatchInfo, + resp *http.Response, + err error, +) { + defer recordAPITiming("launch_hpc_queue_job")() + defer recordAPIErr("launch_hpc_queue_job")(err) + + // Launch the HPC Resources manifest. Launch() method will ensure + // the manifest is in the RUNNING state on successful completion. + return c.LaunchApi. + Launch(c.withAuth(context.TODO())). + Manifest(hpcQueueManifest). + Impersonate(blankImpersonatedUser). + Execute() //nolint:bodyclose +} + func (c *launcherAPIClient) terminateDispatch( owner string, id string, @@ -234,3 +257,28 @@ func createSlurmResourcesManifest() launcher.Manifest { return manifest } + +// CreateHpcQueueManifest creates a Manifest for Slurm/PBSQueue Carrier. +// This Manifest is used to retrieve information about pending/running jobs. +func createHpcQueueManifest() launcher.Manifest { + payload := launcher.NewPayloadWithDefaults() + payload.SetName("DAI-HPC-Queues") + payload.SetId("com.cray.analytics.capsules.hpc.queue") + payload.SetVersion("latest") + payload.SetCarriers([]string{ + "com.cray.analytics.capsules.carriers.hpc.slurm.SlurmQueue", + "com.cray.analytics.capsules.carriers.hpc.pbs.PbsQueue", + }) + + launchParameters := launcher.NewLaunchParameters() + launchParameters.SetMode("batch") + payload.SetLaunchParameters(*launchParameters) + + clientMetadata := launcher.NewClientMetadataWithDefaults() + clientMetadata.SetName("DAI-HPC-Queues") + + manifest := *launcher.NewManifest("v1", *clientMetadata) + manifest.SetPayloads([]launcher.Payload{*payload}) + + return manifest +} diff --git a/master/internal/rm/dispatcherrm/dispatcher_monitor.go b/master/internal/rm/dispatcherrm/dispatcher_monitor.go index c4f33965afea..ebe6f4377181 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_monitor.go +++ b/master/internal/rm/dispatcherrm/dispatcher_monitor.go @@ -31,7 +31,6 @@ const ( ignoredReporter = "com.cray.analytics.capsules.dispatcher.shasta.ShastaDispatcher" errorLinesToRetrieve = 200 errorLinesToDisplay = 15 - ownerLauncher = "launcher" ) // A list of WARNING/ERROR level messages that we're interested in, because they contain @@ -432,42 +431,30 @@ func (m *launcherMonitor) queuesFromCluster(ctx *actor.Context) map[string]map[s return result // Nothing to get of interest in this case } ctx.Log().Debugf("Fetching HPC queue state") - payload := launcher.NewPayloadWithDefaults() - payload.SetName("DAI-HPC-Queues") - payload.SetId("com.cray.analytics.capsules.hpc.queue") - payload.SetVersion("latest") - payload.SetCarriers([]string{ - "com.cray.analytics.capsules.carriers.hpc.slurm.SlurmQueue", - "com.cray.analytics.capsules.carriers.hpc.pbs.PbsQueue", - }) - - launchParameters := launcher.NewLaunchParameters() - launchParameters.SetMode("batch") - payload.SetLaunchParameters(*launchParameters) - - clientMetadata := launcher.NewClientMetadataWithDefaults() - clientMetadata.SetName("DAI-HPC-Queues") - manifest := *launcher.NewManifest("v1", *clientMetadata) - manifest.SetPayloads([]launcher.Payload{*payload}) - - dispatchInfo, r, err := m.apiClient.LaunchApi.Launch(m.apiClient.withAuth(context.TODO())). - Manifest(manifest). - Impersonate(""). - Execute() //nolint:bodyclose + dispatchInfo, r, err := m.apiClient.launchHPCQueueJob() //nolint:bodyclose if err != nil { m.apiClient.handleServiceQueryError(r, err) return result } dispatchID := dispatchInfo.GetDispatchId() - defer m.rm.ResourceQueryPostActions(ctx, dispatchID, ownerLauncher) + owner := dispatchInfo.GetLaunchingUser() + defer func() { + _, _, err := m.apiClient.terminateDispatch(owner, dispatchID) //nolint:bodyclose + if err != nil { + ctx.Log().WithError(err).Errorf("failed to terminate dispatchID {%s}", dispatchID) + return + } - resp, _, err := m.apiClient.MonitoringApi.LoadEnvironmentLog( - m.apiClient.withAuth(context.TODO()), - ownerLauncher, - dispatchID, - "slurm-queue-info", - ).Execute() //nolint:bodyclose + _, err = m.apiClient.deleteDispatch(owner, dispatchID) //nolint:bodyclose + if err != nil { + ctx.Log().WithError(err).Errorf("failed to delete dispatchID {%s}", dispatchID) + return + } + }() + + resp, _, err := m.apiClient.loadEnvironmentLog( //nolint:bodyclose + owner, dispatchID, "slurm-queue-info") if err != nil { ctx.Log().WithError(err).Errorf("failed to retrieve HPC job queue details. response: {%v}", resp) return result diff --git a/master/internal/rm/dispatcherrm/hpc_resource_details_cache.go b/master/internal/rm/dispatcherrm/hpc_resource_details_cache.go index 70029dfe1ad1..af68458102b6 100644 --- a/master/internal/rm/dispatcherrm/hpc_resource_details_cache.go +++ b/master/internal/rm/dispatcherrm/hpc_resource_details_cache.go @@ -143,8 +143,8 @@ func (c *hpcResourceDetailsCache) fetchHpcResourceDetails() ( return nil, false } dispatchID := dispatchInfo.GetDispatchId() - owner := "launcher" - c.log.Debugf("Launched Manifest with DispatchID %s", dispatchID) + owner := dispatchInfo.GetLaunchingUser() + c.log.Debugf("Launched Manifest user %s with DispatchID %s", owner, dispatchID) defer func() { _, _, err := c.cl.terminateDispatch(owner, dispatchID) //nolint:bodyclose if err != nil {