Skip to content

Commit

Permalink
chore: split some more dispatcher API calls out (#763)
Browse files Browse the repository at this point in the history
  • Loading branch information
stoksc authored and EmilyBonar committed Jul 20, 2023
1 parent 2ced51f commit 254f2df
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 90 deletions.
109 changes: 109 additions & 0 deletions master/internal/rm/dispatcherrm/dispatcher_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ import (
"github.com/determined-ai/determined/master/internal/config"
)

const blankImpersonatedUser = ""

// One time activity to create a manifest using SlurmResources carrier.
// This manifest is used on demand to retrieve details regarding HPC resources
// e.g., nodes, GPUs etc.
var hpcResourcesManifest = createSlurmResourcesManifest()

type launcherAPIClient struct {
*launcher.APIClient

Expand Down Expand Up @@ -105,6 +112,84 @@ func (c *launcherAPIClient) getVersion(ctx context.Context) (v *semver.Version,
return version, nil
}

func (c *launcherAPIClient) launchHPCResourcesJob() (
info launcher.DispatchInfo,
resp *http.Response,
err error,
) {
defer recordAPITiming("launch_hpc_resources_job")()
defer recordAPIErr("launch_hpc_resources_job")(err)

// Launch the HPC Resources manifest. Launch() method will ensure
// the manifest is in the RUNNING state on successful completion.
return c.LaunchApi.
Launch(c.withAuth(context.TODO())).
Manifest(hpcResourcesManifest).
Impersonate(blankImpersonatedUser).
Execute() //nolint:bodyclose
}

func (c *launcherAPIClient) terminateDispatch(
owner string,
id string,
) (
info launcher.DispatchInfo,
resp *http.Response,
err error,
) {
defer recordAPITiming("terminate")()
defer recordAPIErr("terminate")(err)

info, resp, err = c.RunningApi.
TerminateRunning(c.withAuth(context.TODO()), owner, id).
Force(true).Execute() //nolint:bodyclose
switch {
case err != nil && resp != nil && resp.StatusCode == 404:
c.log.Debugf("call to terminate missing dispatch %s: %s", id, err)
case err != nil:
return launcher.DispatchInfo{}, nil, fmt.Errorf("terminating dispatch %s: %w", id, err)
default:
c.log.Debugf("terminated dispatch %s", id)
}
return info, resp, nil
}

func (c *launcherAPIClient) deleteDispatch(owner, id string) (resp *http.Response, err error) {
defer recordAPITiming("delete_env")()
defer recordAPIErr("delete_env")(err)
c.log.Debugf("deleting environment with DispatchID %s", id)

resp, err = c.MonitoringApi.
DeleteEnvironment(c.withAuth(context.TODO()), owner, id).
Execute() //nolint:bodyclose
switch {
case err != nil && resp != nil && resp.StatusCode == 404:
c.log.Debugf("try to delete environment with DispatchID %s but it is gone", id)
case err != nil:
return nil, fmt.Errorf("removing environment for Dispatch ID %s: %w", id, err)
default:
c.log.Debugf("deleted environment with DispatchID %s", id)
}
return resp, nil
}

func (c *launcherAPIClient) loadEnvironmentLog(owner, id, logFileName string) (
log *os.File,
resp *http.Response,
err error,
) {
defer recordAPITiming("load_log")()
defer recordAPIErr("load_log")(err)

log, resp, err = c.MonitoringApi.
LoadEnvironmentLog(c.withAuth(context.TODO()), owner, id, logFileName).
Execute() //nolint:bodyclose
if err != nil {
return nil, nil, fmt.Errorf("failed to retrieve HPC Resource details: %w", err)
}
return log, resp, nil
}

// handleServiceQueryError provides common error handling for REST API calls
// to the launcher in support of RM operations.
func (c *launcherAPIClient) handleServiceQueryError(r *http.Response, err error) {
Expand All @@ -125,3 +210,27 @@ func (c *launcherAPIClient) handleServiceQueryError(r *http.Response, err error)
"{%v}. Verify that the launcher service is up and reachable.", err)
}
}

// CreateSlurmResourcesManifest creates a Manifest for SlurmResources Carrier.
// This Manifest is used to retrieve information about resources available on the HPC system.
func createSlurmResourcesManifest() launcher.Manifest {
payload := launcher.NewPayloadWithDefaults()
payload.SetName("DAI-HPC-Resources")
payload.SetId("com.cray.analytics.capsules.hpc.resources")
payload.SetVersion("latest")
payload.SetCarriers([]string{slurmResourcesCarrier, pbsResourcesCarrier})

// Create payload launch parameters
launchParameters := launcher.NewLaunchParameters()
launchParameters.SetMode("interactive")
payload.SetLaunchParameters(*launchParameters)

clientMetadata := launcher.NewClientMetadataWithDefaults()
clientMetadata.SetName("DAI-HPC-Resources")

// Create & populate the manifest
manifest := *launcher.NewManifest("v1", *clientMetadata)
manifest.SetPayloads([]launcher.Payload{*payload})

return manifest
}
105 changes: 18 additions & 87 deletions master/internal/rm/dispatcherrm/dispatcher_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -264,7 +263,6 @@ type dispatcherResourceManager struct {
rmConfig *config.DispatcherResourceManagerConfig
poolConfig []config.ResourcePoolConfig
apiClient *launcherAPIClient
hpcResourcesManifest *launcher.Manifest
reqList *tasklist.TaskList
groups map[*actor.Ref]*tasklist.Group
dispatchIDToAllocationID map[string]model.AllocationID
Expand Down Expand Up @@ -293,11 +291,6 @@ func newDispatcherResourceManager(
panic(fmt.Errorf("building dispatcherrm: %w", err))
}

// One time activity to create a manifest using SlurmResources carrier.
// This manifest is used on demand to retrieve details regarding HPC resources
// e.g., nodes, GPUs etc
hpcResourcesManifest := createSlurmResourcesManifest()

watcher := newDispatchWatcher(apiClient)
dbState, err := getDispatcherState(context.TODO())
if err != nil {
Expand All @@ -310,7 +303,6 @@ func newDispatcherResourceManager(
rmConfig: rmConfig,

apiClient: apiClient,
hpcResourcesManifest: hpcResourcesManifest,
reqList: tasklist.New(),
groups: make(map[*actor.Ref]*tasklist.Group),
dispatchIDToAllocationID: make(map[string]model.AllocationID),
Expand Down Expand Up @@ -1538,9 +1530,6 @@ func (m *dispatcherResourceManager) resolveSlotType(
// This function also queries launcher version and warns user if minimum required
// launcher version is not met.
func (m *dispatcherResourceManager) fetchHpcResourceDetails(ctx *actor.Context) {
impersonatedUser := ""
newSample := hpcResources{}

// Below code will ensure isUpdating flag of the cache is always set to false,
// while exiting the function.
defer func() {
Expand All @@ -1549,26 +1538,15 @@ func (m *dispatcherResourceManager) fetchHpcResourceDetails(ctx *actor.Context)
m.resourceDetails.mu.Unlock()
}()

// Launch the HPC Resources manifest. Launch() method will ensure
// the manifest is in the RUNNING state on successful completion.
start := time.Now()
dispatchInfo, r, err := m.apiClient.LaunchApi.
Launch(m.apiClient.withAuth(context.TODO())).
Manifest(*m.hpcResourcesManifest).
Impersonate(impersonatedUser).
Execute() //nolint:bodyclose
dispatcherHistogram.WithLabelValues("launch").Observe(time.Since(start).Seconds())
dispatchInfo, resp, err := m.apiClient.launchHPCResourcesJob() //nolint:bodyclose
if err != nil {
dispatcherErrors.WithLabelValues("launch").Inc()
m.apiClient.handleServiceQueryError(r, err)
m.apiClient.handleServiceQueryError(resp, err)
return
}
ctx.Log().Debugf("Launched Manifest with DispatchID %s", dispatchInfo.GetDispatchId())

dispatchID := dispatchInfo.GetDispatchId()
ctx.Log().Debugf("Launched Manifest with DispatchID %s", dispatchID)

owner := "launcher"

defer m.ResourceQueryPostActions(ctx, dispatchID, owner)

logFileName := "slurm-resources-info"
Expand All @@ -1588,28 +1566,26 @@ func (m *dispatcherResourceManager) fetchHpcResourceDetails(ctx *actor.Context)
// to get the partition info and does not create a job, so no job ID is ever
// generated. Eventually it will timeout waiting and return, but that's too
// long of a delay for us to deal with.
start = time.Now()
resp, _, err := m.apiClient.MonitoringApi.
LoadEnvironmentLog(m.apiClient.withAuth(context.TODO()), owner, dispatchID, logFileName).
Execute() //nolint:bodyclose
dispatcherHistogram.WithLabelValues("load_log").Observe(time.Since(start).Seconds())
log, _, err := m.apiClient.loadEnvironmentLog(owner, dispatchID, logFileName) //nolint:bodyclose
if err != nil {
dispatcherErrors.WithLabelValues("load_log").Inc()
ctx.Log().WithError(err).Errorf("failed to retrieve HPC Resource details. response: {%v}", resp)
ctx.Log().Error(err)
return
}

// Parse the HPC resources file and extract the details into a
// HpcResourceDetails object using YAML package.
resourcesBytes, err := io.ReadAll(resp)
resourcesBytes, err := io.ReadAll(log)
if err != nil {
ctx.Log().WithError(err).Errorf("failed to read response")
ctx.Log().WithError(err).Errorf("failed to read HPC resources environment log file")
return
}

var newSample hpcResources
if err = yaml.Unmarshal(resourcesBytes, &newSample); err != nil {
ctx.Log().WithError(err).Errorf("failed to parse HPC Resource details")
return
}

m.hpcResourcesToDebugLog(ctx, newSample)

m.resourceDetails.mu.Lock()
Expand Down Expand Up @@ -1693,21 +1669,10 @@ func (m *dispatcherResourceManager) terminateDispatcherJob(ctx *actor.Context,
return false
}

var err error
var response *http.Response
start := time.Now()
_, response, err = m.apiClient.RunningApi.
TerminateRunning(m.apiClient.withAuth(context.TODO()), owner, dispatchID).
Force(true).Execute() //nolint:bodyclose
dispatcherHistogram.WithLabelValues("terminate").Observe(time.Since(start).Seconds())
_, _, err := m.apiClient.terminateDispatch(owner, dispatchID) //nolint:bodyclose
if err != nil {
if response == nil || response.StatusCode != 404 {
ctx.Log().WithError(err).Errorf("Failed to terminate job with Dispatch ID %s, response: {%v}",
dispatchID, response)
// We failed to delete, and not 404/notfound so leave in DB.
dispatcherErrors.WithLabelValues("terminate").Inc()
return false
}
ctx.Log().Error(err)
return false
}

if slurmResourcesPolling {
Expand Down Expand Up @@ -1736,26 +1701,16 @@ func (m *dispatcherResourceManager) terminateDispatcherJob(ctx *actor.Context,
func (m *dispatcherResourceManager) removeDispatchEnvironment(
ctx *actor.Context, owner string, dispatchID string,
) {
ctx.Log().Debugf("Deleting environment with DispatchID %s", dispatchID)
start := time.Now()
response, err := m.apiClient.MonitoringApi.
DeleteEnvironment(m.apiClient.withAuth(context.TODO()), owner, dispatchID).
Execute() //nolint:bodyclose
dispatcherHistogram.WithLabelValues("delete_env").Observe(time.Since(start).Seconds())
_, err := m.apiClient.deleteDispatch(owner, dispatchID) //nolint:bodyclose
if err != nil {
if response == nil || response.StatusCode != 404 {
ctx.Log().WithError(err).Errorf("Failed to remove environment for Dispatch ID %s, response:{%v}",
dispatchID, response)
dispatcherErrors.WithLabelValues("delete_env").Inc()
// We failed to delete, and not 404/notfound so leave in DB for later retry
return
}
} else {
ctx.Log().Debugf("Deleted environment with DispatchID %s", dispatchID)
ctx.Log().Error(err)
return
}

count, err := db.DeleteDispatch(context.TODO(), dispatchID)
if err != nil {
ctx.Log().WithError(err).Errorf("Failed to delete DispatchID %s from DB", dispatchID)
return
}
// On Slurm resource query there may be no Dispatch in the DB, so only log as trace.
ctx.Log().Tracef("Deleted DispatchID %s from DB, count %d", dispatchID, count)
Expand Down Expand Up @@ -2120,30 +2075,6 @@ func (r DispatcherResources) Kill(ctx *actor.Context, _ logger.Context) {
})
}

// CreateSlurmResourcesManifest creates a Manifest for SlurmResources Carrier.
// This Manifest is used to retrieve information about resources available on the HPC system.
func createSlurmResourcesManifest() *launcher.Manifest {
payload := launcher.NewPayloadWithDefaults()
payload.SetName("DAI-HPC-Resources")
payload.SetId("com.cray.analytics.capsules.hpc.resources")
payload.SetVersion("latest")
payload.SetCarriers([]string{slurmResourcesCarrier, pbsResourcesCarrier})

// Create payload launch parameters
launchParameters := launcher.NewLaunchParameters()
launchParameters.SetMode("interactive")
payload.SetLaunchParameters(*launchParameters)

clientMetadata := launcher.NewClientMetadataWithDefaults()
clientMetadata.SetName("DAI-HPC-Resources")

// Create & populate the manifest
manifest := *launcher.NewManifest("v1", *clientMetadata)
manifest.SetPayloads([]launcher.Payload{*payload})

return &manifest
}

// schedulingStateFromDispatchState returns SchedulingState from DispatchState representation.
func schedulingStateFromDispatchState(state launcher.DispatchState) sproto.SchedulingState {
switch state {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"
"time"

launcher "github.hpe.com/hpe/hpc-ard-launcher-go/launcher"
"gotest.tools/assert"

"github.com/determined-ai/determined/master/internal/config"
Expand Down Expand Up @@ -453,7 +452,6 @@ func Test_dispatcherResourceManager_selectDefaultPools(t *testing.T) {
type fields struct {
config *config.DispatcherResourceManagerConfig
apiClient *launcherAPIClient
hpcResourcesManifest *launcher.Manifest
reqList *tasklist.TaskList
groups map[*actor.Ref]*tasklist.Group
dispatchIDToAllocationID map[string]model.AllocationID
Expand Down Expand Up @@ -561,7 +559,6 @@ func Test_dispatcherResourceManager_selectDefaultPools(t *testing.T) {
m := &dispatcherResourceManager{
rmConfig: tt.fields.config,
apiClient: tt.fields.apiClient,
hpcResourcesManifest: tt.fields.hpcResourcesManifest,
reqList: tt.fields.reqList,
groups: tt.fields.groups,
dispatchIDToAllocationID: tt.fields.dispatchIDToAllocationID,
Expand Down

0 comments on commit 254f2df

Please sign in to comment.