Skip to content

Commit

Permalink
🐛 Fix Fail when Resource Quota exceeded. (konveyor#627)
Browse files Browse the repository at this point in the history
Detect resource quota exceeded and leave the task.State=Ready.

closes konveyor#625

---------

Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel authored and mansam committed Apr 30, 2024
1 parent 2695d77 commit fff6e9f
Showing 1 changed file with 68 additions and 20 deletions.
88 changes: 68 additions & 20 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"context"
"errors"
"fmt"
"path"
"strconv"
"strings"
"time"

"github.com/golang-jwt/jwt/v4"
liberr "github.com/jortel/go-utils/error"
"github.com/jortel/go-utils/logr"
Expand All @@ -16,13 +21,9 @@ import (
core "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"path"
k8s "sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"time"
)

//
// States
const (
Created = "Created"
Expand Down Expand Up @@ -66,6 +67,36 @@ func (e *AddonNotFound) Is(err error) (matched bool) {
return
}

// QuotaExceeded report quota exceeded.
type QuotaExceeded struct {
Reason string
}

// Match returns true when the error is Forbidden due to quota exceeded.
func (e *QuotaExceeded) Match(err error) (matched bool) {
if k8serr.IsForbidden(err) {
matched = true
e.Reason = err.Error()
for _, s := range []string{"quota", "exceeded"} {
matched = strings.Contains(e.Reason, s)
if !matched {
break
}
}
}
return
}

func (e *QuotaExceeded) Error() (s string) {
return e.Reason
}

func (e *QuotaExceeded) Is(err error) (matched bool) {
var inst *QuotaExceeded
matched = errors.As(err, &inst)
return
}

//
// Manager provides task management.
type Manager struct {
Expand Down Expand Up @@ -148,28 +179,28 @@ func (m *Manager) startReady() {
if m.postpone(ready, list) {
ready.State = Postponed
Log.Info("Task postponed.", "id", ready.ID)
sErr := m.DB.Save(ready).Error
Log.Error(sErr, "")
err := m.DB.Save(ready).Error
Log.Error(err, "")
continue
}
if ready.Retries == 0 {
metrics.TasksInitiated.Inc()
}
rt := Task{ready}
err := rt.Run(m.Client)
started, err := rt.Run(m.Client)
if err != nil {
if errors.Is(err, &AddonNotFound{}) {
ready.Error("Error", err.Error())
ready.State = Failed
sErr := m.DB.Save(ready).Error
Log.Error(sErr, "")
}
Log.Error(err, "")
ready.Error("Error", err.Error())
ready.State = Failed
err = m.DB.Save(ready).Error
Log.Error(err, "")
continue
}
Log.Info("Task started.", "id", ready.ID)
err = m.DB.Save(ready).Error
Log.Error(err, "")
if started {
Log.Info("Task started.", "id", ready.ID)
if ready.Retries == 0 {
metrics.TasksInitiated.Inc()
}
}
default:
// Ignored.
// Other states included to support
Expand Down Expand Up @@ -267,13 +298,23 @@ type Task struct {

//
// Run the specified task.
func (r *Task) Run(client k8s.Client) (err error) {
func (r *Task) Run(client k8s.Client) (started bool, err error) {
mark := time.Now()
defer func() {
if err != nil {
if err == nil {
return
}
if errors.Is(err, &QuotaExceeded{}) {
Log.V(1).Info(err.Error())
err = nil
return
}
if errors.Is(err, &AddonNotFound{}) {
r.Error("Error", err.Error())
r.Terminated = &mark
r.State = Failed
err = nil
return
}
}()
addon, err := r.findAddon(client, r.Addon)
Expand All @@ -299,6 +340,10 @@ func (r *Task) Run(client k8s.Client) (err error) {
pod := r.pod(addon, owner, &secret)
err = client.Create(context.TODO(), &pod)
if err != nil {
qe := &QuotaExceeded{err.Error()}
if qe.Match(err) {
err = qe
}
err = liberr.Wrap(err)
return
}
Expand All @@ -320,6 +365,7 @@ func (r *Task) Run(client k8s.Client) (err error) {
err = liberr.Wrap(err)
return
}
started = true
r.Started = &mark
r.State = Pending
r.Pod = path.Join(
Expand All @@ -341,7 +387,9 @@ func (r *Task) Reflect(client k8s.Client) (err error) {
pod)
if err != nil {
if k8serr.IsNotFound(err) {
err = r.Run(client)
r.Pod = ""
r.State = Ready
err = nil
} else {
err = liberr.Wrap(err)
}
Expand Down

0 comments on commit fff6e9f

Please sign in to comment.