Skip to content

Commit

Permalink
chore: pull version check out (determined-ai#759)
Browse files Browse the repository at this point in the history
  • Loading branch information
stoksc authored and dzhu committed Apr 25, 2023
1 parent 441d85e commit 6db44d9
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 61 deletions.
20 changes: 20 additions & 0 deletions master/internal/rm/dispatcherrm/dispatcher_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"net/http"
"os"
"strings"
"sync"

semver "github.com/Masterminds/semver/v3"
"github.com/sirupsen/logrus"
"github.hpe.com/hpe/hpc-ard-launcher-go/launcher"

Expand Down Expand Up @@ -85,6 +87,24 @@ func (c *launcherAPIClient) reloadAuthToken() {
}
}

func (c *launcherAPIClient) getVersion(ctx context.Context) (v *semver.Version, err error) {
defer recordAPITiming("get_version")()
defer recordAPIErr("get_version")(err)

resp, _, err := c.InfoApi.
GetServerVersion(c.withAuth(ctx)).
Execute() //nolint:bodyclose
if err != nil {
return nil, fmt.Errorf("getting launcher version: %w", err)
}

version, err := semver.NewVersion(strings.TrimSuffix(resp, "-SNAPSHOT"))
if err != nil {
return nil, fmt.Errorf("parsing semver version %s: %w", resp, err)
}
return version, nil
}

// handleServiceQueryError provides common error handling for REST API calls
// to the launcher in support of RM operations.
func (c *launcherAPIClient) handleServiceQueryError(r *http.Response, err error) {
Expand Down
28 changes: 28 additions & 0 deletions master/internal/rm/dispatcherrm/dispatcher_prom.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package dispatcherrm

import (
"time"

prom "github.com/prometheus/client_golang/prometheus"

"github.com/determined-ai/determined/master/internal/config"
)

const (
Expand Down Expand Up @@ -30,3 +34,27 @@ func init() {
prom.MustRegister(dispatcherHistogram)
prom.MustRegister(dispatcherErrors)
}

func recordAPITiming(labels ...string) (end func()) {
if !config.GetMasterConfig().Observability.EnablePrometheus {
return func() {}
}

start := time.Now()
return func() {
dispatcherHistogram.WithLabelValues(labels...).Observe(time.Since(start).Seconds())
}
}

func recordAPIErr(labels ...string) func(error) {
if !config.GetMasterConfig().Observability.EnablePrometheus {
return func(error) {}
}

return func(err error) {
if err == nil {
return
}
dispatcherErrors.WithLabelValues(labels...).Inc()
}
}
54 changes: 7 additions & 47 deletions master/internal/rm/dispatcherrm/dispatcher_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync"
"time"

semvar "github.com/Masterminds/semver/v3"
"github.com/ghodss/yaml"
"github.com/google/uuid"
echoV4 "github.com/labstack/echo/v4"
Expand Down Expand Up @@ -43,12 +42,11 @@ import (

const maxResourceDetailsSampleAgeSeconds = 60
const (
slurmSchedulerType = "slurm"
pbsSchedulerType = "pbs"
slurmResourcesCarrier = "com.cray.analytics.capsules.carriers.hpc.slurm.SlurmResources"
pbsResourcesCarrier = "com.cray.analytics.capsules.carriers.hpc.pbs.PbsResources"
launcherMinimumVersion = "3.2.4"
root = "root"
slurmSchedulerType = "slurm"
pbsSchedulerType = "pbs"
slurmResourcesCarrier = "com.cray.analytics.capsules.carriers.hpc.slurm.SlurmResources"
pbsResourcesCarrier = "com.cray.analytics.capsules.carriers.hpc.pbs.PbsResources"
root = "root"
)

// schedulerTick periodically triggers the scheduler to act.
Expand Down Expand Up @@ -162,6 +160,7 @@ func (d *DispatcherResourceManager) ResolveResourcePool(
}
name = resp.PoolName
}

providingPartition, err := d.validateResourcePool(ctx, name)
if err != nil {
return "", fmt.Errorf("validating resource pool: %w", err)
Expand Down Expand Up @@ -269,7 +268,6 @@ type dispatcherResourceManager struct {
jobWatcher *launcherMonitor
resourceDetails hpcResourceDetailsCache
wlmType string
launcherVersionIsOK bool
poolProviderMap map[string][]string
dispatchIDToHPCJobID map[string]string
dispatchIDToAllocationIDMutex sync.RWMutex
Expand Down Expand Up @@ -313,7 +311,6 @@ func newDispatcherResourceManager(
masterTLSConfig: masterTLSConfig,
loggingConfig: loggingConfig,
jobWatcher: watcher,
launcherVersionIsOK: false,
poolConfig: poolConfig,
poolProviderMap: makeProvidedPoolsMap(poolConfig),
dispatchIDToHPCJobID: make(map[string]string),
Expand Down Expand Up @@ -344,6 +341,7 @@ func (m *dispatcherResourceManager) Receive(ctx *actor.Context) error {
case actor.PreStart:
ctx.Log().Info("Starting dispatcher resource manager")
go m.killAllInactiveDispatches(ctx, ctx.Self())
go periodicallyCheckLauncherVersion(context.TODO(), ctx.Log(), m.apiClient)
go m.jobWatcher.watch(ctx)

// SLURM Resource Manager always fulfills requests for resource pool details using the
Expand Down Expand Up @@ -1522,40 +1520,6 @@ func (m *dispatcherResourceManager) resolveSlotType(
return device.CUDA, nil
}

// retrieves the launcher version and log error if not meeting minimum required version.
func (m *dispatcherResourceManager) getAndCheckLauncherVersion(ctx *actor.Context) {
start := time.Now()
resp, _, err := m.apiClient.InfoApi.GetServerVersion(m.apiClient.withAuth(context.TODO())).
Execute() //nolint:bodyclose
dispatcherHistogram.WithLabelValues("get_version").Observe(time.Since(start).Seconds())
if err == nil {
if checkMinimumLauncherVersion(resp) {
if !m.launcherVersionIsOK {
m.launcherVersionIsOK = true
ctx.Log().Infof("Determined HPC launcher %s at %s:%d",
resp, m.rmConfig.LauncherHost, m.rmConfig.LauncherPort)
}
}
} else {
dispatcherErrors.WithLabelValues("get_version").Inc()
}

if !m.launcherVersionIsOK {
ctx.Log().Errorf("Launcher version %s does not meet the required minimum. "+
"Upgrade to hpe-hpc-launcher version %s",
resp, launcherMinimumVersion)
}
}

func checkMinimumLauncherVersion(version string) bool {
c, _ := semvar.NewConstraint(">=" + launcherMinimumVersion)
launcherVersion, err := semvar.NewVersion(strings.TrimSuffix(version, "-SNAPSHOT"))
if err != nil {
return false
}
return c.Check(launcherVersion)
}

// fetchHpcResourceDetails retrieves the details about HPC Resources.
// This function uses HPC Resources manifest to retrieve the required details.
// This function performs the following steps:
Expand Down Expand Up @@ -1654,10 +1618,6 @@ func (m *dispatcherResourceManager) fetchHpcResourceDetails(ctx *actor.Context)
ctx.Log().Debugf("default resource pools are '%s', '%s'",
m.resourceDetails.lastSample.DefaultComputePoolPartition,
m.resourceDetails.lastSample.DefaultAuxPoolPartition)

if !m.launcherVersionIsOK {
m.getAndCheckLauncherVersion(ctx)
}
}

// determineWlmType determines the WLM type of the cluster from the dispatchInfo response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,20 +662,6 @@ func Test_dispatcherResourceManager_determineWlmType(t *testing.T) {
}
}

func Test_dispatcherResourceManager_checkLauncherVersion(t *testing.T) {
assert.Equal(t, checkMinimumLauncherVersion("4.1.0"), true)
assert.Equal(t, checkMinimumLauncherVersion("4.1.3-SNAPSHOT"), true)
assert.Equal(t, checkMinimumLauncherVersion("3.2.4"), true)
assert.Equal(t, checkMinimumLauncherVersion("3.2.3"), false)
assert.Equal(t, checkMinimumLauncherVersion("3.2.0"), false)
assert.Equal(t, checkMinimumLauncherVersion("3.1.3"), false)
assert.Equal(t, checkMinimumLauncherVersion("3.1.0"), false)
assert.Equal(t, checkMinimumLauncherVersion("2.3.3"), false)
assert.Equal(t, checkMinimumLauncherVersion("3.0.3"), false)
assert.Equal(t, checkMinimumLauncherVersion("x.y.z"), false)
assert.Equal(t, checkMinimumLauncherVersion("abc"), false)
}

func Test_dispatcherResourceManager_getPartitionValidationResponse(t *testing.T) {
type fields struct {
poolConfig []config.ResourcePoolConfig
Expand Down
41 changes: 41 additions & 0 deletions master/internal/rm/dispatcherrm/dispatcher_version_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package dispatcherrm

import (
"context"
"time"

semvar "github.com/Masterminds/semver/v3"
"github.com/sirupsen/logrus"
)

const versionCheckPeriod = 60

var launcherMinimumVersion = semvar.MustParse("3.2.4")

// periodicallyCheckLauncherVersion checks the launcher version every 60s, logging warnings while
// it is out of date and exiting if it finds it is ok.
func periodicallyCheckLauncherVersion(
ctx context.Context,
log *logrus.Entry,
cl *launcherAPIClient,
) {
for range time.NewTicker(versionCheckPeriod).C {
v, err := cl.getVersion(ctx)
if err != nil {
log.WithError(err).Error("could not get launcher API version")
continue
}

if checkLauncherVersion(v) {
return
}

log.Errorf("Launcher version %s does not meet the required minimum. "+
"Upgrade to hpe-hpc-launcher version %s",
v, launcherMinimumVersion)
}
}

func checkLauncherVersion(v *semvar.Version) bool {
return v.Equal(launcherMinimumVersion) || v.GreaterThan(launcherMinimumVersion)
}
20 changes: 20 additions & 0 deletions master/internal/rm/dispatcherrm/dispatcher_version_checker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dispatcherrm

import (
"testing"

"github.com/Masterminds/semver/v3"
"gotest.tools/assert"
)

func TestCheckLauncherVersion(t *testing.T) {
assert.Equal(t, checkLauncherVersion(semver.MustParse("4.1.0")), true)
assert.Equal(t, checkLauncherVersion(semver.MustParse("4.1.3-SNAPSHOT")), true)
assert.Equal(t, checkLauncherVersion(semver.MustParse("3.2.4")), true)
assert.Equal(t, checkLauncherVersion(semver.MustParse("3.2.3")), false)
assert.Equal(t, checkLauncherVersion(semver.MustParse("3.2.0")), false)
assert.Equal(t, checkLauncherVersion(semver.MustParse("3.1.3")), false)
assert.Equal(t, checkLauncherVersion(semver.MustParse("3.1.0")), false)
assert.Equal(t, checkLauncherVersion(semver.MustParse("2.3.3")), false)
assert.Equal(t, checkLauncherVersion(semver.MustParse("3.0.3")), false)
}

0 comments on commit 6db44d9

Please sign in to comment.