Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

[Do Not Merge] Counting objects of kind #134

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ func executeRootCmd(cfg *config2.Config) {
logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err)
}

go func() {
err = mgr.Start(ctx.Done())
if err != nil {
logger.Fatalf(ctx, "Failed to start manager. Error: %v", err)
}
}()

c, err := controller.New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope)

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tasks:
- container
- K8S-ARRAY
- qubole-hive-executor
- spark
# Sample plugins config
plugins:
# All k8s plugins default configuration
Expand Down Expand Up @@ -62,7 +63,7 @@ storage:
endpoint: http://localhost:30084
region: us-east-1
secret-key: miniostorage
type: minio
type: mem
container: "my-s3-bucket"
event:
type: admin
Expand Down
38 changes: 38 additions & 0 deletions pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package k8s
import (
"context"
"fmt"
"reflect"
"strings"
"time"

"k8s.io/client-go/tools/cache"

"github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff"
v1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -417,8 +420,31 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
logger.Debugf(context.Background(), "Create received for %s, ignoring.", evt.Meta.GetName())
i, err := iCtx.KubeClient().GetCache().GetInformer(entry.ResourceToWatch)
if err != nil {
panic(err)
}

si, casted := i.(cache.SharedIndexInformer)
if !casted {
panic(fmt.Errorf("wrong type. Actual: %v", reflect.TypeOf(i)))
}

logger.Infof(ctx, "Found items in store [%v]", len(si.GetStore().List()))
},
UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
i, err := iCtx.KubeClient().GetCache().GetInformer(entry.ResourceToWatch)
if err != nil {
panic(err)
}

si, casted := i.(cache.SharedIndexInformer)
if !casted {
panic(fmt.Errorf("wrong type. Actual: %v", reflect.TypeOf(i)))
}

logger.Infof(ctx, "Found items in store [%v] for kind [%v]", len(si.GetStore().List()), entry.ResourceToWatch)

if evt.MetaNew == nil {
logger.Warn(context.Background(), "Received an Update event with nil MetaNew.")
} else if evt.MetaOld == nil || evt.MetaOld.GetResourceVersion() != evt.MetaNew.GetResourceVersion() {
Expand Down Expand Up @@ -466,6 +492,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
return nil, err
}

i, err := iCtx.KubeClient().GetCache().GetInformer(entry.ResourceToWatch)
if err != nil {
panic(err)
}

si, casted := i.(cache.SharedIndexInformer)
if !casted {
panic(fmt.Errorf("wrong type. Actual: %v", reflect.TypeOf(i)))
}

logger.Infof(ctx, "Found items in store [%v]", len(si.GetStore().List()))

return &PluginManager{
id: entry.ID,
plugin: entry.Plugin,
Expand Down
23 changes: 11 additions & 12 deletions pkg/controller/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controller
import (
"context"
"fmt"
"runtime/pprof"
"time"

"github.com/lyft/flytestdlib/contextutils"
Expand Down Expand Up @@ -139,17 +138,17 @@ func (w *WorkerPool) Run(ctx context.Context, threadiness int, synced ...cache.I
}

logger.Infof(ctx, "Starting workers [%d]", threadiness)
// Launch workers to process FlyteWorkflow resources
for i := 0; i < threadiness; i++ {
w.metrics.FreeWorkers.Inc()
logger.Infof(ctx, "Starting worker [%d]", i)
workerLabel := fmt.Sprintf("worker-%v", i)
go func() {
workerCtx := contextutils.WithGoroutineLabel(ctx, workerLabel)
pprof.SetGoroutineLabels(workerCtx)
w.runWorker(workerCtx)
}()
}
//// Launch workers to process FlyteWorkflow resources
//for i := 0; i < threadiness; i++ {
// w.metrics.FreeWorkers.Inc()
// logger.Infof(ctx, "Starting worker [%d]", i)
// workerLabel := fmt.Sprintf("worker-%v", i)
// go func() {
// workerCtx := contextutils.WithGoroutineLabel(ctx, workerLabel)
// pprof.SetGoroutineLabels(workerCtx)
// w.runWorker(workerCtx)
// }()
//}

w.workQueue.Start(ctx)
logger.Info(ctx, "Started workers")
Expand Down