Skip to content

Commit

Permalink
chore: improve some dispatcher rm logging (#686)
Browse files Browse the repository at this point in the history
  • Loading branch information
stoksc authored and eecsliu committed Jul 22, 2023
1 parent 8b551d9 commit 4419783
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions master/internal/rm/dispatcherrm/dispatcher_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,27 +693,28 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
return nil
}

ctx.Log().Info(fmt.Sprintf("DispatchID is %s", dispatchID))
ctx.Log().WithField("allocation-id", msg.AllocationID).Infof("DispatchID is %s", dispatchID)
m.dispatchIDToAllocationID[dispatchID] = req.AllocationID
if err := db.InsertDispatch(context.TODO(), &db.Dispatch{
DispatchID: dispatchID,
ResourceID: msg.ResourcesID,
AllocationID: req.AllocationID,
ImpersonatedUser: impersonatedUser,
}); err != nil {
ctx.Log().WithError(err).Errorf("failed to persist dispatch: %v", dispatchID)
ctx.Log().WithError(err).WithField("allocation-id", msg.AllocationID).
Errorf("failed to persist dispatch: %v", dispatchID)
}
m.jobWatcher.monitorJob(impersonatedUser, dispatchID, payloadName)
return nil

case sproto.PendingPreemption:
ctx.Log().Info(fmt.Sprintf("PendingPreemption of %s. Terminating.", msg.AllocationID))
ctx.Log().Infof("PendingPreemption of %s. Terminating.", msg.AllocationID)
allocReq, ok := m.reqList.TaskByID(msg.AllocationID)
if ok {
ctx.Tell(allocReq.AllocationRef, sproto.ReleaseResources{ForcePreemption: true})
} else {
ctx.Log().Error(fmt.Sprintf("unable to find Allocation actor for AllocationID %s",
msg.AllocationID))
ctx.Log().Errorf("unable to find Allocation actor for AllocationID %s",
msg.AllocationID)
}

case sproto.NotifyContainerRunning:
Expand All @@ -734,8 +735,8 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error

case KillDispatcherResources:

ctx.Log().Debug(fmt.Sprintf("Received request to terminate jobs associated with AllocationID %s",
msg.AllocationID))
ctx.Log().Debugf("Received request to terminate jobs associated with AllocationID %s",
msg.AllocationID)

// Find the Dispatch IDs associated with the allocation ID. We'll need the
// Dispatch ID to cancel the job on the launcher side.
Expand All @@ -747,15 +748,15 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
return nil
}

ctx.Log().Debug(fmt.Sprintf("Found %d jobs associated with AllocationID %s",
len(dispatches), msg.AllocationID))
ctx.Log().Debugf("Found %d jobs associated with AllocationID %s",
len(dispatches), msg.AllocationID)

for _, dispatch := range dispatches {
dispatchID := dispatch.DispatchID
impersonatedUser := dispatch.ImpersonatedUser

ctx.Log().Info(fmt.Sprintf("Terminating job with DispatchID %s initiated by %s",
dispatchID, impersonatedUser))
ctx.Log().WithField("allocation-id", msg.AllocationID).Infof(
"Terminating job with DispatchID %s initiated by %s", dispatchID, impersonatedUser)

// Terminate and cleanup, on failure leave Dispatch in DB for later retry
if m.terminateDispatcherJob(ctx, dispatchID, impersonatedUser) {
Expand All @@ -770,10 +771,9 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
// gets a terminatal state from the launcher, it will take care
// of removing the dispatch environment at that time.
if m.jobWatcher.isJobBeingMonitored(dispatchID) {
ctx.Log().Debug(
fmt.Sprintf(
"Not removing dispatch environment for dispatchID '%s' because job is being monitored",
dispatchID))
ctx.Log().WithField("allocation-id", msg.AllocationID).Debugf(
"Not removing dispatch environment for dispatchID '%s' because job is being monitored",
dispatchID)
} else {
// If we are here, then we are likely being called from
// startup, as opposed to a user explicitly canceling
Expand Down Expand Up @@ -888,18 +888,18 @@ func (m *dispatcherResourceManager) receiveRequestMsg(ctx *actor.Context) error
allocationID)
return nil
}
ctx.Log().Debug(fmt.Sprintf("Found %d jobs associated with AllocationID %s",
len(dispatches), allocationID))
ctx.Log().Debugf("Found %d jobs associated with AllocationID %s",
len(dispatches), allocationID)

// Cleanup all the dispatcher environments associated with current allocation
for _, dispatch := range dispatches {
dispatchID := dispatch.DispatchID
impersonatedUser := dispatch.ImpersonatedUser

if ctx.Log().Logger.Level < logrus.DebugLevel {
ctx.Log().Info(fmt.Sprintf(
ctx.Log().WithField("allocation-id", allocationID).Infof(
"Deleting dispatcher environment for job with DispatchID %s initiated by %s",
dispatchID, impersonatedUser))
dispatchID, impersonatedUser)

// Cleanup the dispatcher environment
m.removeDispatchEnvironment(ctx, impersonatedUser, dispatchID)
Expand Down Expand Up @@ -1269,15 +1269,15 @@ func (m *dispatcherResourceManager) getAndCheckLauncherVersion(ctx *actor.Contex
if checkMinimumLauncherVersion(resp) {
if !m.launcherVersionIsOK {
m.launcherVersionIsOK = true
logrus.Info(fmt.Sprintf("Determined HPC launcher %s at %s:%d",
resp, m.rmConfig.LauncherHost, m.rmConfig.LauncherPort))
ctx.Log().Infof("Determined HPC launcher %s at %s:%d",
resp, m.rmConfig.LauncherHost, m.rmConfig.LauncherPort)
}
}
}
if !m.launcherVersionIsOK {
ctx.Log().Error(fmt.Sprintf("Launcher version %s does not meet the required minimum. "+
ctx.Log().Errorf("Launcher version %s does not meet the required minimum. "+
"Upgrade to hpe-hpc-launcher version %s",
resp, launcherMinimumVersion))
resp, launcherMinimumVersion)
}
}

Expand Down Expand Up @@ -1337,7 +1337,7 @@ func (m *dispatcherResourceManager) fetchHpcResourceDetails(ctx *actor.Context)
}
return
}
ctx.Log().Debug(fmt.Sprintf("Launched Manifest with DispatchID %s", dispatchInfo.GetDispatchId()))
ctx.Log().Debugf("Launched Manifest with DispatchID %s", dispatchInfo.GetDispatchId())

dispatchID := dispatchInfo.GetDispatchId()

Expand Down Expand Up @@ -1497,7 +1497,7 @@ func (m *dispatcherResourceManager) terminateDispatcherJob(ctx *actor.Context,
return false
}
}
ctx.Log().Debug(fmt.Sprintf("Terminated manifest with DispatchID %s", dispatchID))
ctx.Log().Debugf("Terminated manifest with DispatchID %s", dispatchID)
return true
}

Expand All @@ -1522,7 +1522,7 @@ func (m *dispatcherResourceManager) removeDispatchEnvironment(
return
}
} else {
ctx.Log().Debug(fmt.Sprintf("Deleted environment with DispatchID %s", dispatchID))
ctx.Log().Debugf("Deleted environment with DispatchID %s", dispatchID)
}
count, err := db.DeleteDispatch(context.TODO(), dispatchID)
if err != nil {
Expand Down Expand Up @@ -1628,8 +1628,8 @@ func (m *dispatcherResourceManager) assignResources(
return
}

ctx.Log().Debug(fmt.Sprintf("Restore: Found %d jobs associated with AllocationID %s",
len(dispatches), req.AllocationID))
ctx.Log().Debugf("Restore: Found %d jobs associated with AllocationID %s",
len(dispatches), req.AllocationID)

for _, dispatch := range dispatches {
dispatchID = dispatch.DispatchID
Expand Down Expand Up @@ -1722,10 +1722,10 @@ func (m *dispatcherResourceManager) debugKillAllInactiveDispatches(
ctx.Log().WithError(err).Errorf("Failed to retrieve all Dispatches")
return
}
ctx.Log().Debug(fmt.Sprintf("Found %d Dispatches to release", len(dispatches)))
ctx.Log().Debugf("Found %d Dispatches to release", len(dispatches))
for _, dispatch := range dispatches {
ctx.Log().Debug(fmt.Sprintf("Queuing cleanup of AllocationID %s, DispatchID %s",
dispatch.AllocationID, dispatch.DispatchID))
ctx.Log().Debugf("Queuing cleanup of AllocationID %s, DispatchID %s",
dispatch.AllocationID, dispatch.DispatchID)
ctx.Tell(handler, KillDispatcherResources{
ResourcesID: dispatch.ResourceID,
AllocationID: dispatch.AllocationID,
Expand Down

0 comments on commit 4419783

Please sign in to comment.