Skip to content

Commit

Permalink
chore: Move queuesFromCluster launch into dispatcher_api_client [DET-…
Browse files Browse the repository at this point in the history
…9278] (determined-ai#794)

- Drop hard-coded 'launcher' and use the user in the dispatch response
- Move the the launch of HPC queues into shared dispatch_api_client
  • Loading branch information
jerryharrow authored and determined-ci committed Jun 21, 2023
1 parent bea4ca7 commit a1f64ef
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 32 deletions.
48 changes: 48 additions & 0 deletions master/internal/rm/dispatcherrm/dispatcher_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ import (
"github.com/determined-ai/determined/master/internal/config"
)

// Blank user runs as launcher-configured user.
const blankImpersonatedUser = ""

// One time activity to create a manifest using SlurmResources carrier.
// This manifest is used on demand to retrieve details regarding HPC resources
// e.g., nodes, GPUs etc.
var hpcResourcesManifest = createSlurmResourcesManifest()

// One time activity to create a manifest using Slurm/PBSQueue carrier.
// This manifest is used on demand to retrieve details regarding
// pending/running HPC jobs.
var hpcQueueManifest = createHpcQueueManifest()

type launcherAPIClient struct {
*launcher.APIClient

Expand Down Expand Up @@ -129,6 +135,23 @@ func (c *launcherAPIClient) launchHPCResourcesJob() (
Execute() //nolint:bodyclose
}

func (c *launcherAPIClient) launchHPCQueueJob() (
info launcher.DispatchInfo,
resp *http.Response,
err error,
) {
defer recordAPITiming("launch_hpc_queue_job")()
defer recordAPIErr("launch_hpc_queue_job")(err)

// Launch the HPC Resources manifest. Launch() method will ensure
// the manifest is in the RUNNING state on successful completion.
return c.LaunchApi.
Launch(c.withAuth(context.TODO())).
Manifest(hpcQueueManifest).
Impersonate(blankImpersonatedUser).
Execute() //nolint:bodyclose
}

func (c *launcherAPIClient) terminateDispatch(
owner string,
id string,
Expand Down Expand Up @@ -234,3 +257,28 @@ func createSlurmResourcesManifest() launcher.Manifest {

return manifest
}

// CreateHpcQueueManifest creates a Manifest for Slurm/PBSQueue Carrier.
// This Manifest is used to retrieve information about pending/running jobs.
func createHpcQueueManifest() launcher.Manifest {
payload := launcher.NewPayloadWithDefaults()
payload.SetName("DAI-HPC-Queues")
payload.SetId("com.cray.analytics.capsules.hpc.queue")
payload.SetVersion("latest")
payload.SetCarriers([]string{
"com.cray.analytics.capsules.carriers.hpc.slurm.SlurmQueue",
"com.cray.analytics.capsules.carriers.hpc.pbs.PbsQueue",
})

launchParameters := launcher.NewLaunchParameters()
launchParameters.SetMode("batch")
payload.SetLaunchParameters(*launchParameters)

clientMetadata := launcher.NewClientMetadataWithDefaults()
clientMetadata.SetName("DAI-HPC-Queues")

manifest := *launcher.NewManifest("v1", *clientMetadata)
manifest.SetPayloads([]launcher.Payload{*payload})

return manifest
}
47 changes: 17 additions & 30 deletions master/internal/rm/dispatcherrm/dispatcher_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const (
ignoredReporter = "com.cray.analytics.capsules.dispatcher.shasta.ShastaDispatcher"
errorLinesToRetrieve = 200
errorLinesToDisplay = 15
ownerLauncher = "launcher"
)

// A list of WARNING/ERROR level messages that we're interested in, because they contain
Expand Down Expand Up @@ -432,42 +431,30 @@ func (m *launcherMonitor) queuesFromCluster(ctx *actor.Context) map[string]map[s
return result // Nothing to get of interest in this case
}
ctx.Log().Debugf("Fetching HPC queue state")
payload := launcher.NewPayloadWithDefaults()
payload.SetName("DAI-HPC-Queues")
payload.SetId("com.cray.analytics.capsules.hpc.queue")
payload.SetVersion("latest")
payload.SetCarriers([]string{
"com.cray.analytics.capsules.carriers.hpc.slurm.SlurmQueue",
"com.cray.analytics.capsules.carriers.hpc.pbs.PbsQueue",
})

launchParameters := launcher.NewLaunchParameters()
launchParameters.SetMode("batch")
payload.SetLaunchParameters(*launchParameters)

clientMetadata := launcher.NewClientMetadataWithDefaults()
clientMetadata.SetName("DAI-HPC-Queues")

manifest := *launcher.NewManifest("v1", *clientMetadata)
manifest.SetPayloads([]launcher.Payload{*payload})

dispatchInfo, r, err := m.apiClient.LaunchApi.Launch(m.apiClient.withAuth(context.TODO())).
Manifest(manifest).
Impersonate("").
Execute() //nolint:bodyclose
dispatchInfo, r, err := m.apiClient.launchHPCQueueJob() //nolint:bodyclose
if err != nil {
m.apiClient.handleServiceQueryError(r, err)
return result
}
dispatchID := dispatchInfo.GetDispatchId()
defer m.rm.ResourceQueryPostActions(ctx, dispatchID, ownerLauncher)
owner := dispatchInfo.GetLaunchingUser()
defer func() {
_, _, err := m.apiClient.terminateDispatch(owner, dispatchID) //nolint:bodyclose
if err != nil {
ctx.Log().WithError(err).Errorf("failed to terminate dispatchID {%s}", dispatchID)
return
}

resp, _, err := m.apiClient.MonitoringApi.LoadEnvironmentLog(
m.apiClient.withAuth(context.TODO()),
ownerLauncher,
dispatchID,
"slurm-queue-info",
).Execute() //nolint:bodyclose
_, err = m.apiClient.deleteDispatch(owner, dispatchID) //nolint:bodyclose
if err != nil {
ctx.Log().WithError(err).Errorf("failed to delete dispatchID {%s}", dispatchID)
return
}
}()

resp, _, err := m.apiClient.loadEnvironmentLog( //nolint:bodyclose
owner, dispatchID, "slurm-queue-info")
if err != nil {
ctx.Log().WithError(err).Errorf("failed to retrieve HPC job queue details. response: {%v}", resp)
return result
Expand Down
4 changes: 2 additions & 2 deletions master/internal/rm/dispatcherrm/hpc_resource_details_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (c *hpcResourceDetailsCache) fetchHpcResourceDetails() (
return nil, false
}
dispatchID := dispatchInfo.GetDispatchId()
owner := "launcher"
c.log.Debugf("Launched Manifest with DispatchID %s", dispatchID)
owner := dispatchInfo.GetLaunchingUser()
c.log.Debugf("Launched Manifest user %s with DispatchID %s", owner, dispatchID)
defer func() {
_, _, err := c.cl.terminateDispatch(owner, dispatchID) //nolint:bodyclose
if err != nil {
Expand Down

0 comments on commit a1f64ef

Please sign in to comment.