diff --git a/tools/cache/controller.go b/tools/cache/controller.go index eaed229569..f437f28616 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -394,17 +394,6 @@ func NewIndexerInformer( return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil) } -// TransformFunc allows for transforming an object before it will be processed -// and put into the controller cache and before the corresponding handlers will -// be called on it. -// TransformFunc (similarly to ResourceEventHandler functions) should be able -// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown -// -// The most common usage pattern is to clean-up some parts of the object to -// reduce component memory usage if a given component doesn't care about them. -// given controller doesn't care for them -type TransformFunc func(interface{}) (interface{}, error) - // NewTransformingInformer returns a Store and a controller for populating // the store while also providing event notifications. You should only used // the returned Store for Get/List operations; Add/Modify/Deletes will cause @@ -452,20 +441,12 @@ func processDeltas( // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, - transformer TransformFunc, deltas Deltas, isInInitialList bool, ) error { // from oldest to newest for _, d := range deltas { obj := d.Object - if transformer != nil { - var err error - obj, err = transformer(obj) - if err != nil { - return err - } - } switch d.Type { case Sync, Replaced, Added, Updated: @@ -517,6 +498,7 @@ func newInformer( fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: clientState, EmitDeltaTypeReplaced: true, + Transformer: transformer, }) cfg := &Config{ @@ -528,7 +510,7 @@ func newInformer( Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(h, clientState, transformer, deltas, isInInitialList) + return processDeltas(h, clientState, deltas, isInInitialList) } return errors.New("object given as Process argument is not Deltas") }, diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index e59a5c9665..8dff7679c9 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -32,7 +32,7 @@ import ( "k8s.io/apimachinery/pkg/watch" fcache "k8s.io/client-go/tools/cache/testing" - "github.com/google/gofuzz" + fuzz "github.com/google/gofuzz" ) func Example() { diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 1eb092f09a..7160bb1ee7 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -51,6 +51,10 @@ type DeltaFIFOOptions struct { // When true, `Replaced` events will be sent for items passed to a Replace() call. // When false, `Sync` events will be sent instead. EmitDeltaTypeReplaced bool + + // If set, will be called for objects before enqueueing them. Please + // see the comment on TransformFunc for details. + Transformer TransformFunc } // DeltaFIFO is like FIFO, but differs in two ways. One is that the @@ -129,8 +133,32 @@ type DeltaFIFO struct { // emitDeltaTypeReplaced is whether to emit the Replaced or Sync // DeltaType when Replace() is called (to preserve backwards compat). emitDeltaTypeReplaced bool + + // Called with every object if non-nil. + transformer TransformFunc } +// TransformFunc allows for transforming an object before it will be processed. +// TransformFunc (similarly to ResourceEventHandler functions) should be able +// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown. +// +// New in v1.27: In such cases, the contained object will already have gone +// through the transform object separately (when it was added / updated prior +// to the delete), so the TransformFunc can likely safely ignore such objects +// (i.e., just return the input object). +// +// The most common usage pattern is to clean-up some parts of the object to +// reduce component memory usage if a given component doesn't care about them. +// +// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc +// sees the object before any other actor, and it is now safe to mutate the +// object in place instead of making a copy. +// +// Note that TransformFunc is called while inserting objects into the +// notification queue and is therefore extremely performance sensitive; please +// do not do anything that will take a long time. +type TransformFunc func(interface{}) (interface{}, error) + // DeltaType is the type of a change (addition, deletion, etc) type DeltaType string @@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, + transformer: opts.Transformer, } f.cond.L = &f.lock return f @@ -415,6 +444,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err if err != nil { return KeyError{obj, err} } + + // Every object comes through this code path once, so this is a good + // place to call the transform func. If obj is a + // DeletedFinalStateUnknown tombstone, then the containted inner object + // will already have gone through the transformer, but we document that + // this can happen. In cases involving Replace(), such an object can + // come through multiple times. + if f.transformer != nil { + var err error + obj, err = f.transformer(obj) + if err != nil { + return err + } + } + oldDeltas := f.items[id] newDeltas := append(oldDeltas, Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 9d12416571..80994beb45 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -327,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { } } +type rvAndXfrm struct { + rv int + xfrm int +} + +func TestDeltaFIFO_transformer(t *testing.T) { + mk := func(name string, rv int) testFifoObject { + return mkFifoObj(name, &rvAndXfrm{rv, 0}) + } + xfrm := TransformFunc(func(obj interface{}) (interface{}, error) { + switch v := obj.(type) { + case testFifoObject: + v.val.(*rvAndXfrm).xfrm++ + case DeletedFinalStateUnknown: + if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 { + return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj) + } + default: + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + return obj, nil + }) + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + Transformer: xfrm, + }) + must(f.Add(mk("foo", 10))) + must(f.Add(mk("bar", 11))) + must(f.Update(mk("foo", 12))) + must(f.Delete(mk("foo", 15))) + must(f.Replace([]interface{}{}, "")) + must(f.Add(mk("bar", 16))) + must(f.Replace([]interface{}{}, "")) + + // Should be empty + if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + + for i := 0; i < 2; i++ { + obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil }) + if err != nil { + t.Fatalf("got nothing on try %v?", i) + } + obj = obj.(Deltas).Newest().Object + switch v := obj.(type) { + case testFifoObject: + if v.name != "foo" { + t.Errorf("expected regular deletion of foo, got %q", v.name) + } + rx := v.val.(*rvAndXfrm) + if rx.rv != 15 { + t.Errorf("expected last message, got %#v", obj) + } + if rx.xfrm != 1 { + t.Errorf("obj %v transformed wrong number of times.", obj) + } + case DeletedFinalStateUnknown: + tf := v.Obj.(testFifoObject) + rx := tf.val.(*rvAndXfrm) + if tf.name != "bar" { + t.Errorf("expected tombstone deletion of bar, got %q", tf.name) + } + if rx.rv != 16 { + t.Errorf("expected last message, got %#v", obj) + } + if rx.xfrm != 1 { + t.Errorf("tombstoned obj %v transformed wrong number of times.", obj) + } + default: + t.Errorf("unknown item %#v", obj) + } + } +} + func TestDeltaFIFO_enqueueingNoLister(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("foo", 10)) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 2717bde589..a889fdbc36 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -205,10 +205,7 @@ type SharedInformer interface { // // Must be set before starting the informer. // - // Note: Since the object given to the handler may be already shared with - // other goroutines, it is advisable to copy the object being - // transform before mutating it at all and returning the copy to prevent - // data races. + // Please see the comment on TransformFunc for more details. SetTransform(handler TransformFunc) error // IsStopped reports whether the informer has already been stopped. @@ -465,6 +462,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, + Transformer: s.transform, }) cfg := &Config{ @@ -637,7 +635,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList) + return processDeltas(s, s.indexer, deltas, isInInitialList) } return errors.New("object given as Process argument is not Deltas") } diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 0ae254b8e8..30776b2957 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -406,9 +406,8 @@ func TestSharedInformerTransformer(t *testing.T) { name := pod.GetName() if upper := strings.ToUpper(name); upper != name { - copied := pod.DeepCopyObject().(*v1.Pod) - copied.SetName(upper) - return copied, nil + pod.SetName(upper) + return pod, nil } } return obj, nil