Skip to content

Commit

Permalink
chore: ft trial slot capacity check [DET-9897]
Browse files Browse the repository at this point in the history
Address comments
  • Loading branch information
jgongd committed Oct 31, 2023
1 parent f619ff6 commit b8a4395
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 35 deletions.
36 changes: 17 additions & 19 deletions master/internal/logpattern/logpattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (

const regexCacheSize = 256

var defaultSingleton *LogPatternPolicies
var (
defaultSingleton *LogPatternPolicies
expconfigCompiledRegex = regexp.MustCompile("(.*)(\\\"log_pattern_policies\\\":)(.*)")
)

// LogPatternPolicies performs log pattern checks.
type LogPatternPolicies struct {
Expand Down Expand Up @@ -53,27 +56,22 @@ func (l *LogPatternPolicies) getCompiledRegex(regex string) (*regexp.Regexp, err
func (l *LogPatternPolicies) monitor(ctx context.Context,
taskID model.TaskID, logs []*model.TaskLog, policies expconf.LogPoliciesConfig,
) error {
for _, log := range logs {
if log.AgentID == nil {
return fmt.Errorf("agentID must be non nil to monitor logs")
// TODO when we add rm specific log grabbing we will need to also monitor them.
for _, policy := range policies {
compiledRegex, err := l.getCompiledRegex(policy.Pattern())
if err != nil {
return err
}
// The first line of trial logs is printing expconf which has the regex pattern.
// We skip monitoring this line.
regex := "(.*)(\\\"log_pattern_policies\\\":)(.*)"
compiledRegex, err := l.getCompiledRegex(regex)

for _, policy := range policies {
if err != nil {
return err
}
if compiledRegex.MatchString(log.Log) {
continue

for _, log := range logs {
if log.AgentID == nil {
return fmt.Errorf("agentID must be non nil to monitor logs")
}

regex = fmt.Sprintf("(.*)%s(.*)", policy.Pattern())
compiledRegex, err = l.getCompiledRegex(regex)
if err != nil {
return err
// One of the trial logs prints expconf which has the regex pattern.
// We skip monitoring this line.
if expconfigCompiledRegex.MatchString(log.Log) {
continue
}

if compiledRegex.MatchString(log.Log) {
Expand Down
28 changes: 17 additions & 11 deletions master/internal/rm/agentrm/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,38 +476,44 @@ func (rp *resourcePool) Receive(ctx *actor.Context) error {
reschedule = false
var totalSlots int

disallowedNodes := set.New[string]()
blockedNodeSet := set.New[string]()
if msg.TaskID != nil {
disallowedNodes = *logpattern.DisallowedNodes(*msg.TaskID)
blockedNodes, err := logpattern.GetBlockedNodes(context.TODO(), *msg.TaskID)
if err != nil {
panic(err.Error())
}
blockedNodeSet = set.FromSlice(blockedNodes)
}
rp.agentStatesCache = rp.fetchAgentStates(ctx)
defer func() {
rp.agentStatesCache = nil
}()

switch {
case rp.config.Provider == nil:

defer func() {
rp.agentStatesCache = nil
}()

// check task speicifc slots here
for _, a := range rp.agentStatesCache {
if !disallowedNodes.Contains(a.Handler.Address().Local()) {
if !blockedNodeSet.Contains(a.Handler.Address().Local()) {
totalSlots += len(a.slotStates)
}
}
case rp.config.Provider.AWS != nil:
fmt.Printf("max instances: %v\n", rp.config.Provider.MaxInstances)
totalSlots = rp.config.Provider.MaxInstances * rp.config.Provider.AWS.SlotsPerInstance()

fmt.Printf("blockedNodeSet: %+v\n", blockedNodeSet)
for _, a := range rp.agentStatesCache {
if disallowedNodes.Contains(a.Handler.Address().Local()) {
if blockedNodeSet.Contains(a.Handler.Address().Local()) {
fmt.Printf("handler: %+v\n", a.Handler.Address().Local())
totalSlots -= len(a.slotStates)
}
}
fmt.Printf("totalSlots: %+v\n", totalSlots)
fmt.Println(" ")
case rp.config.Provider.GCP != nil:
totalSlots = rp.config.Provider.MaxInstances * rp.config.Provider.GCP.SlotsPerInstance()

for _, a := range rp.agentStatesCache {
if disallowedNodes.Contains(a.Handler.Address().Local()) {
if blockedNodeSet.Contains(a.Handler.Address().Local()) {
totalSlots -= len(a.slotStates)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ func (k ResourceManager) ValidateResourcePool(name string) error {
func (k ResourceManager) ValidateResourcePoolAvailability(
v *sproto.ValidateResourcePoolAvailabilityRequest,
) ([]command.LaunchWarning, error) {
if _, err := k.getResourcePoolRef(v.Name); err != nil {

if err := k.resourcePoolExists(v.Name); err != nil {
return nil, fmt.Errorf("%s is an invalid resource pool", v.Name)
}

Expand Down
7 changes: 3 additions & 4 deletions master/internal/trial.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ func (t *trial) maybeAllocateTask() error {
TaskID: &t.taskID,
},
)
if len(launchWarnings) > 0 {
logrus.Warnf("task ID %v slots requested exceeds cluster capacity", t.taskID)
}
if err != nil {
return fmt.Errorf("checking resource availability: %v", err.Error())
}
if len(launchWarnings) > 0 {
logrus.Warnf("task ID %v slots requested exceeds cluster capacity", t.taskID)
}
}

if err != nil {
Expand Down Expand Up @@ -476,7 +476,6 @@ func (t *trial) maybeAllocateTask() error {
Debugf("starting new trial allocation")

prom.AssociateJobExperiment(t.jobID, strconv.Itoa(t.experimentID), t.config.Labels())

err = task.DefaultService.StartAllocation(
t.logCtx, ar, t.db, t.rm, specifier,
t.AllocationExitedCallback,
Expand Down

0 comments on commit b8a4395

Please sign in to comment.