Skip to content

Commit

Permalink
chore: take invariant_config priority into account with manage job wo…
Browse files Browse the repository at this point in the history
…rkflow (#10025)
  • Loading branch information
kkunapuli authored Oct 11, 2024
1 parent 2356f91 commit dadf75e
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 177 deletions.
2 changes: 1 addition & 1 deletion master/internal/command/command_job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *Command) SetJobPriority(priority int) error {

// Returns an error if RM does not implement priority.
if smallerHigher, err := c.rm.SmallerValueIsHigherPriority(); err == nil {
ok, err := configpolicy.PriorityAllowed(
ok, err := configpolicy.PriorityUpdateAllowed(
int(c.GenericCommandSpec.Metadata.WorkspaceID),
model.NTSCType,
priority,
Expand Down
38 changes: 0 additions & 38 deletions master/internal/configpolicy/postgres_task_config_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package configpolicy
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"

Expand Down Expand Up @@ -101,43 +100,6 @@ func GetTaskConfigPolicies(
return &tcp, nil
}

// GetPriorityLimit reads the priority limit for the given scope and workload type.
// It returns found=false if no limit exists.
func GetPriorityLimit(ctx context.Context, scope *int, workloadType string) (limit int, found bool, err error) {
if !ValidWorkloadType(workloadType) {
return 0, false, fmt.Errorf("invalid workload type: %s", workloadType)
}

wkspQuery := wkspIDQuery
if scope == nil {
wkspQuery = wkspIDGlobalQuery
}

var constraints model.Constraints
var constraintsStr string
err = db.Bun().NewSelect().
Table("task_config_policies").
Column("constraints").
Where(wkspQuery, scope).
Where("workload_type = ?", workloadType).
Scan(ctx, &constraintsStr)

if err == sql.ErrNoRows {
return 0, false, nil
} else if err != nil {
return 0, false, fmt.Errorf("error retrieving priority limit: %w", err)
}

if err = json.Unmarshal([]byte(constraintsStr), &constraints); err != nil {
return 0, false, err
}
if constraints.PriorityLimit != nil {
return *constraints.PriorityLimit, true, nil
}

return 0, false, nil
}

// DeleteConfigPolicies deletes the invariant experiment config and constraints for the
// given scope (global or workspace-level) and workload type.
func DeleteConfigPolicies(ctx context.Context,
Expand Down
119 changes: 0 additions & 119 deletions master/internal/configpolicy/postgres_task_config_policy_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package configpolicy
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -207,124 +206,6 @@ func TestSetTaskConfigPolicies(t *testing.T) {
require.ErrorContains(t, err, "violates foreign key constraint")
}

func TestWorkspaceGetPriorityLimit(t *testing.T) {
ctx := context.Background()
require.NoError(t, etc.SetRootPath(db.RootFromDB))
pgDB, cleanup := db.MustResolveNewPostgresDatabase(t)
defer cleanup()
db.MustMigrateTestPostgres(t, pgDB, db.MigrationsFromDB)
user := db.RequireMockUser(t, pgDB)

// Add a workspace to use.
w := model.Workspace{Name: uuid.NewString(), UserID: user.ID}
_, err := db.Bun().NewInsert().Model(&w).Exec(ctx)
require.NoError(t, err)
defer func() {
err := db.CleanupMockWorkspace([]int32{int32(w.ID)})
if err != nil {
log.Errorf("error when cleaning up mock workspaces")
}
}()

// No limit set.
_, found, err := GetPriorityLimit(ctx, nil, model.NTSCType)
require.NoError(t, err)
require.False(t, found)

// Add priority limit for workspace NTSC.
wkspLimit := 20
constraints := fmt.Sprintf(`{"priority_limit": %d}`, wkspLimit)
wkspInput := model.TaskConfigPolicies{
WorkloadType: model.NTSCType,
WorkspaceID: &w.ID,
Constraints: &constraints,
LastUpdatedBy: user.ID,
}

err = SetTaskConfigPolicies(ctx, &wkspInput)
require.NoError(t, err)

// Get priority limit; should match workspace limit.
res, found, err := GetPriorityLimit(ctx, &w.ID, model.NTSCType)
require.NoError(t, err)
require.True(t, found)
require.Equal(t, wkspLimit, res)

// Get limit for a workspace that does not exist.
wkspIDDoesNotExist := 404
_, found, err = GetPriorityLimit(ctx, &wkspIDDoesNotExist, model.NTSCType)
require.NoError(t, err)
require.False(t, found)

// Get global limit.
_, found, err = GetPriorityLimit(ctx, nil, model.NTSCType)
require.NoError(t, err)
require.False(t, found)

// Get limit for other workload type.
_, found, err = GetPriorityLimit(ctx, &w.ID, model.ExperimentType)
require.NoError(t, err)
require.False(t, found)

// Try an invalid workload type.
_, found, err = GetPriorityLimit(ctx, &w.ID, "bogus")
require.Error(t, err)
require.False(t, found)
}

func TestGlobalGetPriorityLimit(t *testing.T) {
ctx := context.Background()
require.NoError(t, etc.SetRootPath(db.RootFromDB))
pgDB, cleanup := db.MustResolveNewPostgresDatabase(t)
defer cleanup()
db.MustMigrateTestPostgres(t, pgDB, db.MigrationsFromDB)
user := db.RequireMockUser(t, pgDB)

// Add a workspace to use.
w := model.Workspace{Name: uuid.NewString(), UserID: user.ID}
_, err := db.Bun().NewInsert().Model(&w).Exec(ctx)
require.NoError(t, err)
defer func() {
err := db.CleanupMockWorkspace([]int32{int32(w.ID)})
if err != nil {
log.Errorf("error when cleaning up mock workspaces")
}
}()

// No limit set.
_, found, err := GetPriorityLimit(ctx, nil, model.NTSCType)
require.NoError(t, err)
require.False(t, found)

// Add priority limit for global NTSC.
globalLimit := 5
constraints := fmt.Sprintf(`{"priority_limit": %d}`, globalLimit)
globalInput := model.TaskConfigPolicies{
WorkloadType: model.NTSCType,
WorkspaceID: nil,
Constraints: &constraints,
LastUpdatedBy: user.ID,
}
err = SetTaskConfigPolicies(ctx, &globalInput)
require.NoError(t, err)

// Get priority limit, should be global limit.
res, found, err := GetPriorityLimit(ctx, nil, model.NTSCType)
require.NoError(t, err)
require.True(t, found)
require.Equal(t, globalLimit, res)

// Get limit for a different workload type.
_, found, err = GetPriorityLimit(ctx, nil, model.ExperimentType)
require.NoError(t, err)
require.False(t, found)

// Try an invalid workload type.
_, found, err = GetPriorityLimit(ctx, nil, "bogus")
require.Error(t, err)
require.False(t, found)
}

// Test the enforcement of the primary key on the task_config_polciies table.
func TestTaskConfigPoliciesUnique(t *testing.T) {
ctx := context.Background()
Expand Down
90 changes: 77 additions & 13 deletions master/internal/configpolicy/task_config_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type NTSCConfigPolicies struct {
var (
errPriorityConstraintFailure = errors.New("submitted workload failed priority constraint")
errResourceConstraintFailure = errors.New("submitted workload failed a resource constraint")
errPriorityImmutable = errors.New("priority cannot be modified")
)

// CheckNTSCConstraints returns an error if the NTSC config fails constraint checks.
Expand Down Expand Up @@ -87,7 +88,7 @@ func CheckExperimentConstraints(

if constraints.ResourceConstraints != nil && constraints.ResourceConstraints.MaxSlots != nil {
// users cannot specify number of slots for an experiment
slotsRequest := 0
slotsRequest := *constraints.ResourceConstraints.MaxSlots
if err = checkSlotsConstraint(*constraints.ResourceConstraints.MaxSlots, slotsRequest,
workloadConfig.Resources().MaxSlots()); err != nil {
return err
Expand Down Expand Up @@ -219,27 +220,90 @@ func MergeWithInvariantExperimentConfigs(ctx context.Context, workspaceID int,
return &config, nil
}

// PriorityAllowed returns true if the desired priority is within the task config policy limit.
func PriorityAllowed(wkspID int, workloadType string, priority int, smallerHigher bool) (bool, error) {
func findAllowedPriority(scope *int, workloadType string) (limit int, exists bool, err error) {
configPolicies, err := GetTaskConfigPolicies(context.TODO(), scope, workloadType)
if err != nil {
return 0, false, fmt.Errorf("unable to fetch task config policies: %w", err)
}

// Cannot update priority if priority set in invariant config.
if configPolicies.InvariantConfig != nil {
switch workloadType {
case model.NTSCType:
var configs model.CommandConfig
err = json.Unmarshal([]byte(*configPolicies.InvariantConfig), &configs)
if err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if configs.Resources.Priority != nil {
adminPriority := *configs.Resources.Priority
return adminPriority, false,
fmt.Errorf("priority set by invariant config: %w", errPriorityImmutable)
}
case model.ExperimentType:
var configs expconf.ExperimentConfigV0
err = json.Unmarshal([]byte(*configPolicies.InvariantConfig), &configs)
if err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if configs.Resources().Priority() != nil {
adminPriority := *configs.Resources().Priority()
return adminPriority, false,
fmt.Errorf("priority set by invariant config: %w", errPriorityImmutable)
}
default:
return 0, false, fmt.Errorf("workload type %s not supported", workloadType)
}
}

// Find priority constraint, if set.
var constraints model.Constraints
if configPolicies.Constraints != nil {
if err = json.Unmarshal([]byte(*configPolicies.Constraints), &constraints); err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if constraints.PriorityLimit != nil {
return *constraints.PriorityLimit, true, nil
}
}

return 0, false, nil
}

// PriorityUpdateAllowed returns true if the desired priority is within the task config policy limit.
func PriorityUpdateAllowed(wkspID int, workloadType string, priority int, smallerHigher bool) (bool, error) {
// Check if a priority limit has been set with a constraint policy.
// Global policies have highest precedence.
limit, found, err := GetPriorityLimit(context.TODO(), nil, workloadType)
if err != nil {
return false, fmt.Errorf("unable to fetch task config policy priority limit")
globalLimit, globalExists, err := findAllowedPriority(nil, workloadType)

if errors.Is(err, errPriorityImmutable) && globalLimit == priority {
// If task config policies have updated since the workload was originally scheduled, allow users
// to update the priority to the new priority set by invariant config.
return true, nil
}
if found {
return priorityWithinLimit(priority, limit, smallerHigher), nil
if err != nil {
return false, err
}

// TODO use COALESCE instead once postgres updates are complete.
// Workspace policies have second precedence.
limit, found, err = GetPriorityLimit(context.TODO(), &wkspID, workloadType)
wkspLimit, wkspExists, err := findAllowedPriority(&wkspID, workloadType)

if errors.Is(err, errPriorityImmutable) && wkspLimit == priority {
// If task config policies have updated since the workload was originally scheduled, allow users
// to update the priority to the new priority set by invariant config.
return true, nil
}
if err != nil {
// TODO do we really want to block on this?
return false, fmt.Errorf("unable to fetch task config policy priority limit")
return false, err
}

// No invariant configs. Check for constraints.
if globalExists {
return priorityWithinLimit(priority, globalLimit, smallerHigher), nil
}
if found {
return priorityWithinLimit(priority, limit, smallerHigher), nil
if wkspExists {
return priorityWithinLimit(priority, wkspLimit, smallerHigher), nil
}

// No priority limit has been set.
Expand Down
Loading

0 comments on commit dadf75e

Please sign in to comment.