diff --git a/task/manager.go b/task/manager.go index 172fc0f8b..1644b245a 100644 --- a/task/manager.go +++ b/task/manager.go @@ -4,6 +4,11 @@ import ( "context" "errors" "fmt" + "path" + "strconv" + "strings" + "time" + "github.com/golang-jwt/jwt/v4" liberr "github.com/jortel/go-utils/error" "github.com/jortel/go-utils/logr" @@ -16,13 +21,9 @@ import ( core "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "path" k8s "sigs.k8s.io/controller-runtime/pkg/client" - "strconv" - "time" ) -// // States const ( Created = "Created" @@ -66,6 +67,36 @@ func (e *AddonNotFound) Is(err error) (matched bool) { return } +// QuotaExceeded report quota exceeded. +type QuotaExceeded struct { + Reason string +} + +// Match returns true when the error is Forbidden due to quota exceeded. +func (e *QuotaExceeded) Match(err error) (matched bool) { + if k8serr.IsForbidden(err) { + matched = true + e.Reason = err.Error() + for _, s := range []string{"quota", "exceeded"} { + matched = strings.Contains(e.Reason, s) + if !matched { + break + } + } + } + return +} + +func (e *QuotaExceeded) Error() (s string) { + return e.Reason +} + +func (e *QuotaExceeded) Is(err error) (matched bool) { + var inst *QuotaExceeded + matched = errors.As(err, &inst) + return +} + // // Manager provides task management. type Manager struct { @@ -148,28 +179,28 @@ func (m *Manager) startReady() { if m.postpone(ready, list) { ready.State = Postponed Log.Info("Task postponed.", "id", ready.ID) - sErr := m.DB.Save(ready).Error - Log.Error(sErr, "") + err := m.DB.Save(ready).Error + Log.Error(err, "") continue } - if ready.Retries == 0 { - metrics.TasksInitiated.Inc() - } rt := Task{ready} - err := rt.Run(m.Client) + started, err := rt.Run(m.Client) if err != nil { - if errors.Is(err, &AddonNotFound{}) { - ready.Error("Error", err.Error()) - ready.State = Failed - sErr := m.DB.Save(ready).Error - Log.Error(sErr, "") - } + Log.Error(err, "") + ready.Error("Error", err.Error()) + ready.State = Failed + err = m.DB.Save(ready).Error Log.Error(err, "") continue } - Log.Info("Task started.", "id", ready.ID) err = m.DB.Save(ready).Error Log.Error(err, "") + if started { + Log.Info("Task started.", "id", ready.ID) + if ready.Retries == 0 { + metrics.TasksInitiated.Inc() + } + } default: // Ignored. // Other states included to support @@ -267,13 +298,23 @@ type Task struct { // // Run the specified task. -func (r *Task) Run(client k8s.Client) (err error) { +func (r *Task) Run(client k8s.Client) (started bool, err error) { mark := time.Now() defer func() { - if err != nil { + if err == nil { + return + } + if errors.Is(err, &QuotaExceeded{}) { + Log.V(1).Info(err.Error()) + err = nil + return + } + if errors.Is(err, &AddonNotFound{}) { r.Error("Error", err.Error()) r.Terminated = &mark r.State = Failed + err = nil + return } }() addon, err := r.findAddon(client, r.Addon) @@ -299,6 +340,10 @@ func (r *Task) Run(client k8s.Client) (err error) { pod := r.pod(addon, owner, &secret) err = client.Create(context.TODO(), &pod) if err != nil { + qe := &QuotaExceeded{err.Error()} + if qe.Match(err) { + err = qe + } err = liberr.Wrap(err) return } @@ -320,6 +365,7 @@ func (r *Task) Run(client k8s.Client) (err error) { err = liberr.Wrap(err) return } + started = true r.Started = &mark r.State = Pending r.Pod = path.Join( @@ -341,7 +387,9 @@ func (r *Task) Reflect(client k8s.Client) (err error) { pod) if err != nil { if k8serr.IsNotFound(err) { - err = r.Run(client) + r.Pod = "" + r.State = Ready + err = nil } else { err = liberr.Wrap(err) }