Skip to content

Commit

Permalink
[YUNIKORN-2319] cache.Task: reference to old pod object is kept after…
Browse files Browse the repository at this point in the history
… update (#864)

Closes: #864

Signed-off-by: Peter Bacsko <[email protected]>
  • Loading branch information
pbacsko committed Aug 1, 2024
1 parent a95c356 commit 26443bc
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
24 changes: 14 additions & 10 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {
if applicationID == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
ctx.updateYuniKornPod(applicationID, pod)
}
}

Expand Down Expand Up @@ -296,22 +296,26 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) {
log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err))
return
}
if utils.GetApplicationIDFromPod(pod) == "" {
applicationID := utils.GetApplicationIDFromPod(pod)
if applicationID == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
ctx.updateYuniKornPod(applicationID, pod)
}
}

func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
if taskMeta, ok := getTaskMetadata(pod); ok {
if app := ctx.getApplication(taskMeta.ApplicationID); app != nil {
ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID)
}
func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
var app *Application
taskID := string(pod.UID)
if app = ctx.getApplication(appID); app != nil {
if task, err := app.GetTask(taskID); task != nil && err == nil {

Check failure on line 311 in pkg/cache/context.go

View workflow job for this annotation

GitHub Actions / unit-tests

assignment mismatch: 2 variables but app.GetTask returns 1 value
task.setTaskPod(pod)
}
}

// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
ctx.notifyTaskComplete(appID, taskID)
log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
Expand Down
5 changes: 5 additions & 0 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ func TestUpdatePod(t *testing.T) {
context.UpdatePod(pod1, pod3)
pod = context.schedulerCache.GetPod(uid1)
assert.Check(t, pod == nil, "pod still found after termination")
app := context.getApplication("yunikorn-test-00001")
// ensure that an updated pod is updated inside the Task
task, err := app.GetTask("UID-00001")
assert.NilError(t, err)
assert.Assert(t, task.GetTaskPod() == pod3, "task pod has not been updated")

// ensure a non-terminated pod is updated
context.UpdatePod(pod1, pod2)
Expand Down
10 changes: 9 additions & 1 deletion pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (task *Task) getNodeName() string {
}

func (task *Task) DeleteTaskPod() error {
return task.context.apiProvider.GetAPIs().KubeClient.Delete(task.pod)
return task.context.apiProvider.GetAPIs().KubeClient.Delete(task.GetTaskPod())
}

func (task *Task) UpdateTaskPodStatus(pod *v1.Pod) (*v1.Pod, error) {
Expand Down Expand Up @@ -544,9 +544,11 @@ func (task *Task) releaseAllocation() {
// this reduces the scheduling overhead by blocking such
// request away from the core scheduler.
func (task *Task) sanityCheckBeforeScheduling() error {
task.lock.RLock()
// Check PVCs used by the pod
namespace := task.pod.Namespace
manifest := &(task.pod.Spec)
task.lock.RUnlock()
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim == nil {
Expand Down Expand Up @@ -599,3 +601,9 @@ func (task *Task) failWithEvent(errorMessage, actionReason string) {
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeWarning, actionReason, actionReason, errorMessage)
}

func (task *Task) setTaskPod(pod *v1.Pod) {
task.lock.Lock()
defer task.lock.Unlock()
task.pod = pod
}

0 comments on commit 26443bc

Please sign in to comment.