Skip to content

Commit

Permalink
perf: Add template validation caching
Browse files Browse the repository at this point in the history
During template validation k8s API is called for each templateRef.
For complex workflows with many refs it creates huge overhead.
Let's cache such templates

Throughout codebase each wrapper is used only in context of single call.
It is not guaranteed for future, so TTL mechanism was added with default TTL=1M
Additionally WithCacheTTL enables overwriting this parameter.

Signed-off-by: Jakub Buczak <[email protected]>
  • Loading branch information
jakkubu committed Sep 20, 2024
1 parent dc731d0 commit fc3060e
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 7 deletions.
94 changes: 87 additions & 7 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package templateresolution
import (
"context"
"fmt"
"sync"
"time"

log "github.com/sirupsen/logrus"
apierr "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -14,19 +16,60 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

var (
defaultCacheTTL = time.Minute
)

type workflowTemplateCache struct {
createdAt time.Time
template *wfv1.WorkflowTemplate
}

var _ WorkflowTemplateNamespacedGetter = (*workflowTemplateInterfaceWrapper)(nil)

// workflowTemplateInterfaceWrapper is an internal struct to wrap clientset.
type workflowTemplateInterfaceWrapper struct {
clientset typed.WorkflowTemplateInterface
cache map[string]workflowTemplateCache
cacheMu sync.Mutex
cacheTTL time.Duration
}

func WrapWorkflowTemplateInterface(clientset typed.WorkflowTemplateInterface) *workflowTemplateInterfaceWrapper {
return &workflowTemplateInterfaceWrapper{
clientset: clientset,
cache: map[string]workflowTemplateCache{},
cacheTTL: defaultCacheTTL,
}
}

func WrapWorkflowTemplateInterface(clientset typed.WorkflowTemplateInterface) WorkflowTemplateNamespacedGetter {
return &workflowTemplateInterfaceWrapper{clientset: clientset}
func (w *workflowTemplateInterfaceWrapper) WithCacheTTL(ttl time.Duration) *workflowTemplateInterfaceWrapper {
w.cacheTTL = ttl.Abs()
return w
}

// Get retrieves the WorkflowTemplate of a given name.
func (wrapper *workflowTemplateInterfaceWrapper) Get(name string) (*wfv1.WorkflowTemplate, error) {
ctx := context.TODO()
return wrapper.clientset.Get(ctx, name, metav1.GetOptions{})

wrapper.cacheMu.Lock()
defer wrapper.cacheMu.Unlock()

val, ok := wrapper.cache[name]
if ok && val.createdAt.Add(wrapper.cacheTTL).After(time.Now()) {
return val.template, nil
}

tmpl, err := wrapper.clientset.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return tmpl, err
}
wrapper.cache[name] = workflowTemplateCache{
createdAt: time.Now(),
template: tmpl,
}

return tmpl, nil
}

// WorkflowTemplateNamespacedGetter helps get WorkflowTemplates.
Expand All @@ -35,9 +78,19 @@ type WorkflowTemplateNamespacedGetter interface {
Get(name string) (*wfv1.WorkflowTemplate, error)
}

type clusterWorkflowTemplateCache struct {
createdAt time.Time
template *wfv1.ClusterWorkflowTemplate
}

var _ ClusterWorkflowTemplateGetter = (*clusterWorkflowTemplateInterfaceWrapper)(nil)

// clusterWorkflowTemplateInterfaceWrapper is an internal struct to wrap clientset.
type clusterWorkflowTemplateInterfaceWrapper struct {
clientset typed.ClusterWorkflowTemplateInterface
cache map[string]clusterWorkflowTemplateCache
cacheMu sync.Mutex
cacheTTL time.Duration
}

// ClusterWorkflowTemplateGetter helps get WorkflowTemplates.
Expand All @@ -46,8 +99,17 @@ type ClusterWorkflowTemplateGetter interface {
Get(name string) (*wfv1.ClusterWorkflowTemplate, error)
}

func WrapClusterWorkflowTemplateInterface(clusterClientset typed.ClusterWorkflowTemplateInterface) ClusterWorkflowTemplateGetter {
return &clusterWorkflowTemplateInterfaceWrapper{clientset: clusterClientset}
func WrapClusterWorkflowTemplateInterface(clusterClientset typed.ClusterWorkflowTemplateInterface) *clusterWorkflowTemplateInterfaceWrapper {
return &clusterWorkflowTemplateInterfaceWrapper{
clientset: clusterClientset,
cache: map[string]clusterWorkflowTemplateCache{},
cacheTTL: defaultCacheTTL,
}
}

func (w *clusterWorkflowTemplateInterfaceWrapper) WithCacheTTL(ttl time.Duration) *clusterWorkflowTemplateInterfaceWrapper {
w.cacheTTL = ttl.Abs()
return w
}

type NullClusterWorkflowTemplateGetter struct{}
Expand All @@ -60,7 +122,25 @@ func (n *NullClusterWorkflowTemplateGetter) Get(name string) (*wfv1.ClusterWorkf
// Get retrieves the WorkflowTemplate of a given name.
func (wrapper *clusterWorkflowTemplateInterfaceWrapper) Get(name string) (*wfv1.ClusterWorkflowTemplate, error) {
ctx := context.TODO()
return wrapper.clientset.Get(ctx, name, metav1.GetOptions{})

wrapper.cacheMu.Lock()
defer wrapper.cacheMu.Unlock()

val, ok := wrapper.cache[name]
if ok && val.createdAt.Add(wrapper.cacheTTL).After(time.Now()) {
return val.template, nil
}

tmpl, err := wrapper.clientset.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return tmpl, err
}
wrapper.cache[name] = clusterWorkflowTemplateCache{
createdAt: time.Now(),
template: tmpl,
}

return tmpl, nil
}

// Context is a context of template search.
Expand Down Expand Up @@ -166,7 +246,7 @@ func (ctx *Context) GetTemplateScope() string {
return string(ctx.tmplBase.GetResourceScope()) + "/" + ctx.tmplBase.GetName()
}

// ResolveTemplate digs into referenes and returns a merged template.
// ResolveTemplate digs into references and returns a merged template.
// This method is the public start point of template resolution.
func (ctx *Context) ResolveTemplate(tmplHolder wfv1.TemplateReferenceHolder) (*Context, *wfv1.Template, bool, error) {
return ctx.resolveTemplateImpl(tmplHolder)
Expand Down
8 changes: 8 additions & 0 deletions workflow/templateresolution/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,20 @@ func TestGetTemplateFromRef(t *testing.T) {
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

actionLen := len(wfClientset.Actions())
// Get the template of existing template reference.
tmplRef := wfv1.TemplateRef{Name: "some-workflow-template", Template: "whalesay"}
tmpl, err := ctx.GetTemplateFromRef(&tmplRef)
require.NoError(t, err)
assert.Equal(t, "whalesay", tmpl.Name)
assert.NotNil(t, tmpl.Container)
assert.Len(t, len(wfClientset.Actions()), actionLen+1)

tmpl, err = ctx.GetTemplateFromRef(&tmplRef)
require.NoError(t, err)
assert.Equal(t, "whalesay", tmpl.Name)
assert.NotNil(t, tmpl.Container)
assert.Len(t, len(wfClientset.Actions()), actionLen+1, "template should be cached")

// Get the template of unexisting template reference.
tmplRef = wfv1.TemplateRef{Name: "unknown-workflow-template", Template: "whalesay"}
Expand Down

0 comments on commit fc3060e

Please sign in to comment.