Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 add task manager cluster mutex. #703

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 88 additions & 21 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/golang-jwt/jwt/v4"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) {
return
default:
}
pod, found := m.cluster.pods[path.Base(task.Pod)]
pod, found := m.cluster.Pod(path.Base(task.Pod))
if found {
snErr := m.podSnapshot(task, pod)
Log.Error(
Expand Down Expand Up @@ -400,14 +401,14 @@ func (m *Manager) findRefs(task *Task) (err error) {
return
}
if task.Addon != "" {
_, found := m.cluster.addons[task.Addon]
_, found := m.cluster.Addon(task.Addon)
if !found {
err = &AddonNotFound{Name: task.Addon}
return
}
}
for _, name := range task.Extensions {
_, found := m.cluster.extensions[name]
_, found := m.cluster.Extension(name)
if !found {
err = &ExtensionNotFound{Name: name}
return
Expand All @@ -416,7 +417,7 @@ func (m *Manager) findRefs(task *Task) (err error) {
if task.Kind == "" {
return
}
kind, found := m.cluster.tasks[task.Kind]
kind, found := m.cluster.Task(task.Kind)
if !found {
err = &KindNotFound{Name: task.Kind}
return
Expand Down Expand Up @@ -469,21 +470,21 @@ func (m *Manager) selectAddons(list []*Task) (kept []*Task, err error) {
func (m *Manager) selectAddon(task *Task) (addon *crd.Addon, err error) {
if task.Addon != "" {
found := false
addon, found = m.cluster.addons[task.Addon]
addon, found = m.cluster.Addon(task.Addon)
if !found {
err = &AddonNotFound{task.Addon}
}
return
}
kind, found := m.cluster.tasks[task.Kind]
kind, found := m.cluster.Task(task.Kind)
if !found {
err = &KindNotFound{task.Kind}
return
}
matched := false
var selected *crd.Addon
selector := NewSelector(m.DB, task)
for _, addon = range m.cluster.addons {
for _, addon = range m.cluster.Addons() {
if addon.Spec.Task != kind.Name {
continue
}
Expand Down Expand Up @@ -512,7 +513,7 @@ func (m *Manager) selectExtensions(task *Task, addon *crd.Addon) (err error) {
}
matched := false
selector := NewSelector(m.DB, task)
for name, extension := range m.cluster.extensions {
for _, extension := range m.cluster.Extensions() {
if extension.Spec.Addon != addon.Name {
continue
}
Expand All @@ -521,8 +522,8 @@ func (m *Manager) selectExtensions(task *Task, addon *crd.Addon) (err error) {
return
}
if matched {
task.Extensions = append(task.Extensions, name)
task.Event(ExtSelected, name)
task.Extensions = append(task.Extensions, extension.Name)
task.Event(ExtSelected, extension.Name)
}
}
return
Expand Down Expand Up @@ -554,7 +555,7 @@ func (m *Manager) postpone(list []*Task) (err error) {
matched: make(map[uint]uint),
},
&RuleDeps{
cluster: m.cluster,
cluster: &m.cluster,
},
}
for _, task := range list {
Expand Down Expand Up @@ -624,7 +625,7 @@ func (m *Manager) adjustPriority(list []*Task) (err error) {
if len(list) == 0 {
return
}
pE := Priority{cluster: m.cluster}
pE := Priority{cluster: &m.cluster}
escalated := pE.Escalate(list)
for _, task := range escalated {
if task.State != Pending {
Expand Down Expand Up @@ -662,7 +663,7 @@ func (m *Manager) createPod(list []*Task) (err error) {
}
ready := task
started := false
started, err = ready.Run(m.cluster)
started, err = ready.Run(&m.cluster)
if err != nil {
Log.Error(err, "")
return
Expand Down Expand Up @@ -829,7 +830,7 @@ func (m *Manager) updateRunning() {
continue
}
running := task
pod, found := running.Reflect(m.cluster)
pod, found := running.Reflect(&m.cluster)
if found {
if task.StateIn(Succeeded, Failed) {
err = m.podSnapshot(running, pod)
Expand Down Expand Up @@ -1071,7 +1072,7 @@ func (r *Task) LastEvent(kind string) (event *model.TaskEvent, found bool) {
}

// Run the specified task.
func (r *Task) Run(cluster Cluster) (started bool, err error) {
func (r *Task) Run(cluster *Cluster) (started bool, err error) {
mark := time.Now()
client := cluster.Client
defer func() {
Expand All @@ -1088,7 +1089,7 @@ func (r *Task) Run(cluster Cluster) (started bool, err error) {
err = nil
}
}()
addon, found := cluster.addons[r.Addon]
addon, found := cluster.Addon(r.Addon)
if !found {
err = &AddonNotFound{Name: r.Addon}
return
Expand Down Expand Up @@ -1120,7 +1121,7 @@ func (r *Task) Run(cluster Cluster) (started bool, err error) {
pod := r.pod(
addon,
extensions,
cluster.tackle,
cluster.Tackle(),
&secret)
err = client.Create(context.TODO(), &pod)
if err != nil {
Expand Down Expand Up @@ -1168,8 +1169,8 @@ func (r *Task) Run(cluster Cluster) (started bool, err error) {
}

// Reflect finds the associated pod and updates the task state.
func (r *Task) Reflect(cluster Cluster) (pod *core.Pod, found bool) {
pod, found = cluster.pods[path.Base(r.Pod)]
func (r *Task) Reflect(cluster *Cluster) (pod *core.Pod, found bool) {
pod, found = cluster.Pod(path.Base(r.Pod))
if !found {
r.State = Ready
r.Event(PodNotFound, r.Pod)
Expand Down Expand Up @@ -1559,7 +1560,7 @@ type Event struct {

// Priority escalator.
type Priority struct {
cluster Cluster
cluster *Cluster
}

// Escalate task dependencies as needed.
Expand Down Expand Up @@ -1601,7 +1602,7 @@ func (p *Priority) Escalate(ready []*Task) (escalated []*Task) {

// graph builds a dependency graph.
func (p *Priority) graph(task *Task, ready []*Task) (deps []*Task) {
kind, found := p.cluster.tasks[task.Kind]
kind, found := p.cluster.Task(task.Kind)
if !found {
return
}
Expand Down Expand Up @@ -1638,16 +1639,22 @@ func (p *Priority) unique(in []*Task) (out []*Task) {
return
}

// Cluster provides cached cluster resources.
// Maps must NOT be accessed directly.
type Cluster struct {
k8s.Client
mutex sync.RWMutex
tackle *crd.Tackle
addons map[string]*crd.Addon
extensions map[string]*crd.Extension
tasks map[string]*crd.Task
pods map[string]*core.Pod
}

// Refresh the cache.
func (k *Cluster) Refresh() (err error) {
k.mutex.Lock()
defer k.mutex.Unlock()
if Settings.Hub.Disconnected {
k.tackle = &crd.Tackle{}
k.addons = make(map[string]*crd.Addon)
Expand Down Expand Up @@ -1679,6 +1686,66 @@ func (k *Cluster) Refresh() (err error) {
return
}

// Tackle returns the tackle resource.
func (k *Cluster) Tackle() (r *crd.Tackle) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r = k.tackle
return
}

// Addon returns an addon my name.
func (k *Cluster) Addon(name string) (r *crd.Addon, found bool) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r, found = k.addons[name]
return
}

// Addons returns an addon my name.
func (k *Cluster) Addons() (list []*crd.Addon) {
k.mutex.RLock()
defer k.mutex.RUnlock()
for _, r := range k.addons {
list = append(list, r)
}
return
}

// Extension returns an extension by name.
func (k *Cluster) Extension(name string) (r *crd.Extension, found bool) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r, found = k.extensions[name]
return
}

// Extensions returns an addon my name.
func (k *Cluster) Extensions() (list []*crd.Extension) {
k.mutex.RLock()
defer k.mutex.RUnlock()
for _, r := range k.extensions {
list = append(list, r)
}
return
}

// Task returns a task by name.
func (k *Cluster) Task(name string) (r *crd.Task, found bool) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r, found = k.tasks[name]
return
}

// Pod returns a pod by name.
func (k *Cluster) Pod(name string) (r *core.Pod, found bool) {
k.mutex.RLock()
defer k.mutex.RUnlock()
r, found = k.pods[name]
return
}

// getTackle
func (k *Cluster) getTackle() (err error) {
options := &k8s.ListOptions{Namespace: Settings.Namespace}
Expand Down
4 changes: 2 additions & 2 deletions task/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (r *RuleUnique) Match(ready, other *Task) (matched bool, reason string) {

// RuleDeps - Task kind dependencies.
type RuleDeps struct {
cluster Cluster
cluster *Cluster
}

// Match determines the match.
Expand All @@ -54,7 +54,7 @@ func (r *RuleDeps) Match(ready, other *Task) (matched bool, reason string) {
if *ready.ApplicationID != *other.ApplicationID {
return
}
def, found := r.cluster.tasks[ready.Kind]
def, found := r.cluster.Task(ready.Kind)
if !found {
return
}
Expand Down
4 changes: 2 additions & 2 deletions task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestPriorityEscalate(t *testing.T) {
ready = append(ready, task)

pE := Priority{
cluster: Cluster{
cluster: &Cluster{
tasks: kinds,
}}

Expand Down Expand Up @@ -129,7 +129,7 @@ func TestPriorityGraph(t *testing.T) {
ready = append(ready, task)

pE := Priority{
cluster: Cluster{
cluster: &Cluster{
tasks: kinds,
}}
deps := pE.graph(ready[0], ready)
Expand Down
Loading