From c7fb98e79969cf23849e2758989e9c4f700bece2 Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Fri, 11 Mar 2022 00:09:53 +0000 Subject: [PATCH] Implement CloudEvents for Runs Emit CloudEvents for Runs. This is achieved by: - add a new read-only controller for Runs - emit CloudEvents only (no k8s events) on every reconcile of a Run - use an ephemeral cache to store sent events across reconcile runs. This is required because since the Runs controller only observes Runs, it does not have the context to know what was changed in the Run and though if a new event is required. The ephemeral cache logic is largely taken from the same functionality implemented in tektoncd/experimental/cloudevents Fixes #3862 Signed-off-by: Andrea Frittoli --- cmd/controller/main.go | 2 + config/config-feature-flags.yaml | 3 + docs/events.md | 6 +- docs/install.md | 25 +- pkg/apis/config/feature_flags.go | 7 + pkg/apis/config/feature_flags_test.go | 1 + .../testdata/feature-flags-all-flags-set.yaml | 1 + pkg/reconciler/events/cache/cache.go | 78 ++++ pkg/reconciler/events/cache/cache_test.go | 156 ++++++++ pkg/reconciler/events/cache/cacheclient.go | 63 ++++ .../events/cache/cachefakeclient.go | 45 +++ .../cloudevent/cloud_event_controller.go | 22 ++ .../events/cloudevent/cloudeventclient.go | 10 +- .../cloudevent/cloudeventsfakeclient.go | 2 +- pkg/reconciler/events/event.go | 17 + pkg/reconciler/run/controller.go | 59 +++ pkg/reconciler/run/run.go | 73 ++++ pkg/reconciler/run/run_test.go | 355 ++++++++++++++++++ pkg/reconciler/testing/logger.go | 4 + 19 files changed, 918 insertions(+), 11 deletions(-) create mode 100644 pkg/reconciler/events/cache/cache.go create mode 100644 pkg/reconciler/events/cache/cache_test.go create mode 100644 pkg/reconciler/events/cache/cacheclient.go create mode 100644 pkg/reconciler/events/cache/cachefakeclient.go create mode 100644 pkg/reconciler/run/controller.go create mode 100644 pkg/reconciler/run/run.go create mode 100644 pkg/reconciler/run/run_test.go diff --git a/cmd/controller/main.go b/cmd/controller/main.go index eb333fe293b..137f4721c39 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -25,6 +25,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun" + "github.com/tektoncd/pipeline/pkg/reconciler/run" "github.com/tektoncd/pipeline/pkg/reconciler/taskrun" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/clock" @@ -104,6 +105,7 @@ func main() { sharedmain.MainWithConfig(ctx, ControllerLogKey, cfg, taskrun.NewController(opts, clock.RealClock{}), pipelinerun.NewController(opts, clock.RealClock{}), + run.NewController(), ) } diff --git a/config/config-feature-flags.yaml b/config/config-feature-flags.yaml index e4732f53ec6..da4bff2d9a7 100644 --- a/config/config-feature-flags.yaml +++ b/config/config-feature-flags.yaml @@ -74,3 +74,6 @@ data: # its dependent Tasks. This flag defaults to "true"; when expressions guard # the Task only. See TEP-0059 and Pipeline documentation for more details. scope-when-expressions-to-task: "true" + # Setting this flag to "true" enables CloudEvents for Runs, as long as a + # CloudEvents sink is configured in the config-defaults config map + send-cloudevents-for-runs: "false" diff --git a/docs/events.md b/docs/events.md index adc16c9e83d..6b5a36fc211 100644 --- a/docs/events.md +++ b/docs/events.md @@ -74,7 +74,11 @@ Resource |Event |Event Type `Run` | `Succeed` | `dev.tekton.event.run.successful.v1` `Run` | `Failed` | `dev.tekton.event.run.failed.v1` -`CloudEvents` for `Runs` are defined but not sent yet. +`CloudEvents` for `Runs` are only sent when enabled in the [configuration](./install.md#configuring-cloudevents-notifications). + +**Note**: `CloudEvents` for `Runs` rely on an ephemeral cache to avoid duplicate +events. In case of controller restart, the cache is reset and duplicate events +may be sent. ## Format of `CloudEvents` diff --git a/docs/install.md b/docs/install.md index 01d2fdac93c..ad15c42a823 100644 --- a/docs/install.md +++ b/docs/install.md @@ -268,11 +268,11 @@ data: ## Configuring CloudEvents notifications -When configured so, Tekton can generate `CloudEvents` for `TaskRun` and `PipelineRun` lifecycle -events. The only configuration parameter is the URL of the sink. When not set, no notification is -generated. +When configured so, Tekton can generate `CloudEvents` for `TaskRun`, +`PipelineRun` and `Run`lifecycle events. The main configuration parameter is the +URL of the sink. When not set, no notification is generated. -``` +```yaml apiVersion: v1 kind: ConfigMap metadata: @@ -285,6 +285,23 @@ data: default-cloud-events-sink: https://my-sink-url ``` +Additionally, CloudEvents for `Runs` require an extra configuration to be +enabled. This setting exists to avoid collisions with CloudEvents that might +be sent by custom task controllers: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: feature-flags + namespace: tekton-pipelines + labels: + app.kubernetes.io/instance: default + app.kubernetes.io/part-of: tekton-pipelines +data: + send-cloudevents-for-runs: true +``` + ## Configuring self-signed cert for private registry The `SSL_CERT_DIR` is set to `/etc/ssl/certs` as the default cert directory. If you are using a self-signed cert for private registry and the cert file is not under the default cert directory, configure your registry cert in the `config-registry-cert` `ConfigMap` with the key `cert`. diff --git a/pkg/apis/config/feature_flags.go b/pkg/apis/config/feature_flags.go index f196a1873af..dab18dad45f 100644 --- a/pkg/apis/config/feature_flags.go +++ b/pkg/apis/config/feature_flags.go @@ -46,6 +46,8 @@ const ( DefaultScopeWhenExpressionsToTask = true // DefaultEnableAPIFields is the default value for "enable-api-fields". DefaultEnableAPIFields = StableAPIFields + // DefaultSendCloudEventsForRuns is the default value for "send-cloudevents-for-runs". + DefaultSendCloudEventsForRuns = false disableAffinityAssistantKey = "disable-affinity-assistant" disableCredsInitKey = "disable-creds-init" @@ -55,6 +57,7 @@ const ( enableCustomTasks = "enable-custom-tasks" enableAPIFields = "enable-api-fields" scopeWhenExpressionsToTask = "scope-when-expressions-to-task" + sendCloudEventsForRuns = "send-cloudevents-for-runs" ) // FeatureFlags holds the features configurations @@ -68,6 +71,7 @@ type FeatureFlags struct { EnableCustomTasks bool ScopeWhenExpressionsToTask bool EnableAPIFields string + SendCloudEventsForRuns bool } // GetFeatureFlagsConfigName returns the name of the configmap containing all @@ -113,6 +117,9 @@ func NewFeatureFlagsFromMap(cfgMap map[string]string) (*FeatureFlags, error) { if err := setEnabledAPIFields(cfgMap, DefaultEnableAPIFields, &tc.EnableAPIFields); err != nil { return nil, err } + if err := setFeature(sendCloudEventsForRuns, DefaultSendCloudEventsForRuns, &tc.SendCloudEventsForRuns); err != nil { + return nil, err + } // Given that they are alpha features, Tekton Bundles and Custom Tasks should be switched on if // enable-api-fields is "alpha". If enable-api-fields is not "alpha" then fall back to the value of diff --git a/pkg/apis/config/feature_flags_test.go b/pkg/apis/config/feature_flags_test.go index e7737cc8650..792f3b08194 100644 --- a/pkg/apis/config/feature_flags_test.go +++ b/pkg/apis/config/feature_flags_test.go @@ -50,6 +50,7 @@ func TestNewFeatureFlagsFromConfigMap(t *testing.T) { EnableCustomTasks: true, ScopeWhenExpressionsToTask: true, EnableAPIFields: "alpha", + SendCloudEventsForRuns: true, }, fileName: "feature-flags-all-flags-set", }, diff --git a/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml b/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml index 817c9899a0f..84d5a6f6bd4 100644 --- a/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml +++ b/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml @@ -25,3 +25,4 @@ data: enable-custom-tasks: "true" scope-when-expressions-to-task: "true" enable-api-fields: "alpha" + send-cloudevents-for-runs: "true" diff --git a/pkg/reconciler/events/cache/cache.go b/pkg/reconciler/events/cache/cache.go new file mode 100644 index 00000000000..d04441dee17 --- /dev/null +++ b/pkg/reconciler/events/cache/cache.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "encoding/json" + "errors" + "fmt" + + cloudevents "github.com/cloudevents/sdk-go/v2" + lru "github.com/hashicorp/golang-lru" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" +) + +// Struct to unmarshal the event data +type eventData struct { + Run *v1alpha1.Run `json:"run,omitempty"` +} + +// AddEventSentToCache adds the particular object to cache marking it as sent +func AddEventSentToCache(cacheClient *lru.Cache, event *cloudevents.Event) error { + if cacheClient == nil { + return errors.New("cache client is nil") + } + eventKey, err := EventKey(event) + if err != nil { + return err + } + cacheClient.Add(eventKey, nil) + return nil +} + +// IsCloudEventSent checks if the event exists in the cache +func IsCloudEventSent(cacheClient *lru.Cache, event *cloudevents.Event) (bool, error) { + if cacheClient == nil { + return false, errors.New("cache client is nil") + } + eventKey, err := EventKey(event) + if err != nil { + return false, err + } + return cacheClient.Contains(eventKey), nil +} + +// EventKey defines whether an event is considered different from another +// in future we might want to let specific event types override this +func EventKey(event *cloudevents.Event) (string, error) { + var ( + data eventData + resourceName string + resourceNamespace string + ) + err := json.Unmarshal(event.Data(), &data) + if err != nil { + return "", err + } + if data.Run == nil { + return "", fmt.Errorf("Invalid Run data in %v", event) + } + resourceName = data.Run.Name + resourceNamespace = data.Run.Namespace + eventType := event.Type() + return fmt.Sprintf("%s/run/%s/%s", eventType, resourceNamespace, resourceName), nil +} diff --git a/pkg/reconciler/events/cache/cache_test.go b/pkg/reconciler/events/cache/cache_test.go new file mode 100644 index 00000000000..0e016d0687f --- /dev/null +++ b/pkg/reconciler/events/cache/cache_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "net/url" + "testing" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + lru "github.com/hashicorp/golang-lru" + + "github.com/cloudevents/sdk-go/v2/event" + cetypes "github.com/cloudevents/sdk-go/v2/types" + "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/tektoncd/pipeline/test/diff" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func strptr(s string) *string { return &s } + +func getEventData(run interface{}) map[string]interface{} { + cloudEventData := map[string]interface{}{} + if v, ok := run.(*v1alpha1.Run); ok { + cloudEventData["run"] = v + } + return cloudEventData +} + +func getEventToTest(eventtype string, run interface{}) *event.Event { + e := event.Event{ + Context: event.EventContextV1{ + Type: eventtype, + Source: cetypes.URIRef{URL: url.URL{Path: "/foo/bar/source"}}, + ID: "test-event", + Time: &cetypes.Timestamp{Time: time.Now()}, + Subject: strptr("topic"), + }.AsV1(), + } + if err := e.SetData(cloudevents.ApplicationJSON, getEventData(run)); err != nil { + panic(err) + } + return &e +} + +func getRunByMeta(name string, namespace string) *v1alpha1.Run { + return &v1alpha1.Run{ + TypeMeta: metav1.TypeMeta{ + Kind: "Run", + APIVersion: "v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.RunSpec{}, + Status: v1alpha1.RunStatus{}, + } +} + +// TestEventsKey verifies that keys are extracted correctly from events +func TestEventsKey(t *testing.T) { + testcases := []struct { + name string + eventtype string + run interface{} + wantKey string + wantErr bool + }{{ + name: "run event", + eventtype: "my.test.run.event", + run: getRunByMeta("myrun", "mynamespace"), + wantKey: "my.test.run.event/run/mynamespace/myrun", + wantErr: false, + }, { + name: "run event missing data", + eventtype: "my.test.run.event", + run: nil, + wantKey: "", + wantErr: true, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + gotEvent := getEventToTest(tc.eventtype, tc.run) + gotKey, err := EventKey(gotEvent) + if err != nil { + if !tc.wantErr { + t.Fatalf("Expecting an error, got none") + } + } + if d := cmp.Diff(tc.wantKey, gotKey); d != "" { + t.Errorf("Wrong Event key %s", diff.PrintWantGot(d)) + } + }) + } +} + +func TestAddCheckEvent(t *testing.T) { + run := getRunByMeta("arun", "anamespace") + runb := getRunByMeta("arun", "bnamespace") + baseEvent := getEventToTest("some.event.type", run) + + testcases := []struct { + name string + firstEvent *event.Event + secondEvent *event.Event + wantFound bool + }{{ + name: "identical events", + firstEvent: baseEvent, + secondEvent: baseEvent, + wantFound: true, + }, { + name: "new timestamp event", + firstEvent: baseEvent, + secondEvent: getEventToTest("some.event.type", run), + wantFound: true, + }, { + name: "different namespace", + firstEvent: baseEvent, + secondEvent: getEventToTest("some.event.type", runb), + wantFound: false, + }, { + name: "different event type", + firstEvent: baseEvent, + secondEvent: getEventToTest("some.other.event.type", run), + wantFound: false, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + testCache, _ := lru.New(10) + AddEventSentToCache(testCache, tc.firstEvent) + found, _ := IsCloudEventSent(testCache, tc.secondEvent) + if d := cmp.Diff(tc.wantFound, found); d != "" { + t.Errorf("Cache check failure %s", diff.PrintWantGot(d)) + } + }) + } +} diff --git a/pkg/reconciler/events/cache/cacheclient.go b/pkg/reconciler/events/cache/cacheclient.go new file mode 100644 index 00000000000..edbea78039a --- /dev/null +++ b/pkg/reconciler/events/cache/cacheclient.go @@ -0,0 +1,63 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + "k8s.io/client-go/rest" + "knative.dev/pkg/injection" + "knative.dev/pkg/logging" +) + +// With 4 events per Run, we can store events for 1024 concurrent Runs +const bufferSize = 4096 + +func init() { + injection.Default.RegisterClient(withCacheClient) +} + +// cacheKey is a way to associate the Cache from inside the context.Context +type cacheKey struct{} + +func withCacheClient(ctx context.Context, cfg *rest.Config) context.Context { + logger := logging.FromContext(ctx) + + cacheClient, err := lru.New(bufferSize) + logger.Infof("CACHE CLIENT %+v", cacheClient) + if err != nil { + logger.Error("unable to create cacheClient :" + err.Error()) + } + + return ToContext(ctx, cacheClient) +} + +// Get extracts the cloudEventClient client from the context. +func Get(ctx context.Context) *lru.Cache { + untyped := ctx.Value(cacheKey{}) + if untyped == nil { + logging.FromContext(ctx).Errorf("Unable to fetch client from context.") + return nil + } + return untyped.(*lru.Cache) +} + +// ToContext adds the cloud events client to the context +func ToContext(ctx context.Context, c *lru.Cache) context.Context { + return context.WithValue(ctx, cacheKey{}, c) +} diff --git a/pkg/reconciler/events/cache/cachefakeclient.go b/pkg/reconciler/events/cache/cachefakeclient.go new file mode 100644 index 00000000000..f1245f374cb --- /dev/null +++ b/pkg/reconciler/events/cache/cachefakeclient.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + "k8s.io/client-go/rest" + "knative.dev/pkg/injection" + "knative.dev/pkg/logging" +) + +const fakeBufferSize = 128 + +func init() { + injection.Fake.RegisterClient(withFakeCacheClient) +} + +func withFakeCacheClient(ctx context.Context, cfg *rest.Config) context.Context { + logger := logging.FromContext(ctx) + + // With 4 events per Run, we can store events for 32 concurrent Runs + cacheClient, err := lru.New(fakeBufferSize) + logger.Infof("CACHE CLIENT %+v", cacheClient) + if err != nil { + logger.Error("unable to create cacheClient :" + err.Error()) + } + + return ToContext(ctx, cacheClient) +} diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller.go b/pkg/reconciler/events/cloudevent/cloud_event_controller.go index 5a353cec370..5072708477b 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller.go @@ -23,9 +23,11 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/hashicorp/go-multierror" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -137,11 +139,25 @@ func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error if err != nil { return err } + // Events for Runs require a cache of events that have been sent + cacheClient := cache.Get(ctx) + _, isRun := object.(*v1alpha1.Run) wasIn := make(chan error) go func() { wasIn <- nil logger.Debugf("Sending cloudevent of type %q", event.Type()) + // In case of Run event, check cache if cloudevent is already sent + if isRun { + cloudEventSent, err := cache.IsCloudEventSent(cacheClient, event) + if err != nil { + logger.Errorf("error while checking cache: %s", err) + } + if cloudEventSent { + logger.Infof("cloudevent %v already sent", event) + return + } + } if result := ceClient.Send(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), *event); !cloudevents.IsACK(result) { logger.Warnf("Failed to send cloudevent: %s", result.Error()) recorder := controller.GetEventRecorder(ctx) @@ -150,6 +166,12 @@ func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error } recorder.Event(object, corev1.EventTypeWarning, "Cloud Event Failure", result.Error()) } + // In case of Run event, add to the cache to avoid duplicate events + if isRun { + if err := cache.AddEventSentToCache(cacheClient, event); err != nil { + logger.Errorf("error while adding sent event to cache: %s", err) + } + } }() return <-wasIn diff --git a/pkg/reconciler/events/cloudevent/cloudeventclient.go b/pkg/reconciler/events/cloudevent/cloudeventclient.go index 8305bb34fb6..be89c5f5fc9 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventclient.go @@ -33,8 +33,8 @@ func init() { injection.Dynamic.RegisterDynamicClient(withCloudEventClient) } -// CECKey is used to associate the CloudEventClient inside the context.Context -type CECKey struct{} +// ceKey is used to associate the CloudEventClient inside the context.Context +type ceKey struct{} func withCloudEventClient(ctx context.Context) context.Context { logger := logging.FromContext(ctx) @@ -59,12 +59,12 @@ func withCloudEventClient(ctx context.Context) context.Context { logger.Panicf("Error creating the cloudevents client: %s", err) } - return context.WithValue(ctx, CECKey{}, cloudEventClient) + return context.WithValue(ctx, ceKey{}, cloudEventClient) } // Get extracts the cloudEventClient client from the context. func Get(ctx context.Context) CEClient { - untyped := ctx.Value(CECKey{}) + untyped := ctx.Value(ceKey{}) if untyped == nil { logging.FromContext(ctx).Errorf( "Unable to fetch client from context.") @@ -75,5 +75,5 @@ func Get(ctx context.Context) CEClient { // ToContext adds the cloud events client to the context func ToContext(ctx context.Context, cec CEClient) context.Context { - return context.WithValue(ctx, CECKey{}, cec) + return context.WithValue(ctx, ceKey{}, cec) } diff --git a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go index b9ef53f7163..b727ac537ec 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go @@ -74,5 +74,5 @@ func (c FakeClient) StartReceiver(ctx context.Context, fn interface{}) error { // WithClient adds to the context a fake client with the desired behaviour func WithClient(ctx context.Context, behaviour *FakeClientBehaviour) context.Context { - return context.WithValue(ctx, CECKey{}, newFakeClient(behaviour)) + return context.WithValue(ctx, ceKey{}, newFakeClient(behaviour)) } diff --git a/pkg/reconciler/events/event.go b/pkg/reconciler/events/event.go index 1aca5578227..e2bb638b1a3 100644 --- a/pkg/reconciler/events/event.go +++ b/pkg/reconciler/events/event.go @@ -69,6 +69,23 @@ func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition * } } +// EmitCloudEvents emits CloudEvents (only) for object +func EmitCloudEvents(ctx context.Context, object runtime.Object) { + logger := logging.FromContext(ctx) + configs := config.FromContextOrDefaults(ctx) + sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "") + if sendCloudEvents { + ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink) + } + + if sendCloudEvents { + err := cloudevent.SendCloudEventWithRetries(ctx, object) + if err != nil { + logger.Warnf("Failed to emit cloud events %v", err.Error()) + } + } +} + func sendKubernetesEvents(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { // Events that are going to be sent // diff --git a/pkg/reconciler/run/controller.go b/pkg/reconciler/run/controller.go new file mode 100644 index 00000000000..74368b86d16 --- /dev/null +++ b/pkg/reconciler/run/controller.go @@ -0,0 +1,59 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package run + +import ( + "context" + + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" + runinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/run" + runreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1alpha1/run" + cacheclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" + cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" +) + +// NewController instantiates a new controller.Impl from knative.dev/pkg/controller +// This is a read-only controller, hence the SkipStatusUpdates set to true +func NewController() func(context.Context, configmap.Watcher) *controller.Impl { + return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { + logger := logging.FromContext(ctx) + runInformer := runinformer.Get(ctx) + + configStore := config.NewStore(logger.Named("config-store")) + configStore.WatchConfigs(cmw) + + c := &Reconciler{ + cloudEventClient: cloudeventclient.Get(ctx), + cacheClient: cacheclient.Get(ctx), + } + impl := runreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { + return controller.Options{ + AgentName: pipeline.RunControllerName, + ConfigStore: configStore, + SkipStatusUpdates: true, + } + }) + + runInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + return impl + } +} diff --git a/pkg/reconciler/run/run.go b/pkg/reconciler/run/run.go new file mode 100644 index 00000000000..9a4f0f0cf16 --- /dev/null +++ b/pkg/reconciler/run/run.go @@ -0,0 +1,73 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package run + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + runreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1alpha1/run" + "github.com/tektoncd/pipeline/pkg/reconciler/events" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + _ "github.com/tektoncd/pipeline/pkg/taskrunmetrics/fake" // Make sure the taskrunmetrics are setup + "knative.dev/pkg/apis" + "knative.dev/pkg/logging" + pkgreconciler "knative.dev/pkg/reconciler" +) + +// Reconciler implements controller.Reconciler for Configuration resources. +type Reconciler struct { + cloudEventClient cloudevent.CEClient + cacheClient *lru.Cache +} + +// Check that our Reconciler implements runreconciler.Interface +var ( + _ runreconciler.Interface = (*Reconciler)(nil) +) + +// ReconcileKind compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Task Run +// resource with the current status of the resource. +func (c *Reconciler) ReconcileKind(ctx context.Context, run *v1alpha1.Run) pkgreconciler.Event { + logger := logging.FromContext(ctx) + configs := config.FromContextOrDefaults(ctx) + ctx = cloudevent.ToContext(ctx, c.cloudEventClient) + ctx = cache.ToContext(ctx, c.cacheClient) + // ctx = cache.ToContext(ctx, c.cacheClient) + logger.Infof("Reconciling %s", run.Name) + + // Create a copy of the run object, just in case, to avoid sync'ing changes + runEvents := *run.DeepCopy() + + if configs.FeatureFlags.SendCloudEventsForRuns { + // Custom task controllers may be sending events for "Runs" associated + // to the custom tasks they control. To avoid sending duplicate events, + // CloudEvents for "Runs" are only sent when enabled + + // Read and log the condition + condition := runEvents.Status.GetCondition(apis.ConditionSucceeded) + logger.Debugf("Emitting cloudevent for %s, condition: %s", runEvents.Name, condition) + + events.EmitCloudEvents(ctx, &runEvents) + } + + return nil +} diff --git a/pkg/reconciler/run/run_test.go b/pkg/reconciler/run/run_test.go new file mode 100644 index 00000000000..df27d931af9 --- /dev/null +++ b/pkg/reconciler/run/run_test.go @@ -0,0 +1,355 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package run + +import ( + "context" + "strings" + "testing" + + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + + "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing" + "github.com/tektoncd/pipeline/test" + eventstest "github.com/tektoncd/pipeline/test/events" + "github.com/tektoncd/pipeline/test/names" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "knative.dev/pkg/apis" + cminformer "knative.dev/pkg/configmap/informer" + pkgreconciler "knative.dev/pkg/reconciler" + + "knative.dev/pkg/system" + + _ "knative.dev/pkg/system/testing" // Setup system.Namespace() +) + +func ensureConfigurationConfigMapsExist(d *test.Data) { + var defaultsExists, featureFlagsExists, artifactBucketExists, artifactPVCExists, metricsExists bool + for _, cm := range d.ConfigMaps { + if cm.Name == config.GetDefaultsConfigName() { + defaultsExists = true + } + if cm.Name == config.GetFeatureFlagsConfigName() { + featureFlagsExists = true + } + if cm.Name == config.GetArtifactBucketConfigName() { + artifactBucketExists = true + } + if cm.Name == config.GetArtifactPVCConfigName() { + artifactPVCExists = true + } + if cm.Name == config.GetMetricsConfigName() { + metricsExists = true + } + } + if !defaultsExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } + if !featureFlagsExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } + if !artifactBucketExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetArtifactBucketConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } + if !artifactPVCExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetArtifactPVCConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } + if !metricsExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetMetricsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } +} + +func initializeRunControllerAssets(t *testing.T, d test.Data) (test.Assets, func()) { + ctx, _ := ttesting.SetupFakeContext(t) + ctx, cancel := context.WithCancel(ctx) + ensureConfigurationConfigMapsExist(&d) + c, informers := test.SeedTestData(t, ctx, d) + configMapWatcher := cminformer.NewInformedWatcher(c.Kube, system.Namespace()) + ctl := NewController()(ctx, configMapWatcher) + if err := configMapWatcher.Start(ctx.Done()); err != nil { + t.Fatalf("error starting configmap watcher: %v", err) + } + + if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok { + la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {}) + } + + return test.Assets{ + Logger: logging.FromContext(ctx), + Controller: ctl, + Clients: c, + Informers: informers, + Recorder: controller.GetEventRecorder(ctx).(*record.FakeRecorder), + Ctx: ctx, + }, cancel +} + +func getRunName(run v1alpha1.Run) string { + return strings.Join([]string{run.Namespace, run.Name}, "/") +} + +// getRunController returns an instance of the TaskRun controller/reconciler that has been seeded with +// d, where d represents the state of the system (existing resources) needed for the test. +func getRunController(t *testing.T, d test.Data) (test.Assets, func()) { + t.Helper() + names.TestingSeed() + return initializeRunControllerAssets(t, d) +} + +// TestReconcile_CloudEvents runs reconcile with a cloud event sink configured +// to ensure that events are sent in different cases +func TestReconcile_CloudEvents(t *testing.T) { + names.TestingSeed() + + cms := []*corev1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "http://synk:8080", + }, + }, { + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "send-cloudevents-for-runs": "true", + }, + }, + } + testcases := []struct { + name string + condition *apis.Condition + wantCloudEvents []string + }{{ + name: "Run with no condition", + condition: nil, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.started.v1.*test-run`}, + }, { + name: "Run with unknown condition", + condition: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: v1beta1.TaskRunReasonRunning.String(), + }, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.running.v1.*test-run`}, + }, { + name: "Run with finished true condition", + condition: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + Reason: v1beta1.PipelineRunReasonSuccessful.String(), + }, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.successful.v1.*test-run`}, + }, { + name: "Run with finished false condition", + condition: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: v1beta1.PipelineRunReasonCancelled.String(), + }, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.failed.v1.*test-run`}, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + + objectStatus := duckv1.Status{ + Conditions: []apis.Condition{}, + } + runStatusFields := v1alpha1.RunStatusFields{} + if tc.condition != nil { + objectStatus.Conditions = append(objectStatus.Conditions, *tc.condition) + } + run := v1alpha1.Run{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-run", + Namespace: "foo", + }, + Spec: v1alpha1.RunSpec{}, + Status: v1alpha1.RunStatus{ + Status: objectStatus, + RunStatusFields: runStatusFields, + }, + } + runs := []*v1alpha1.Run{&run} + + d := test.Data{ + Runs: runs, + ConfigMaps: cms, + } + testAssets, cancel := getRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(run)); err != nil { + t.Fatal("Didn't expect an error, but got one.") + } + if len(clients.Kube.Actions()) == 0 { + t.Errorf("Expected actions to be logged in the kubeclient, got none") + } + + urun, err := clients.Pipeline.TektonV1alpha1().Runs(run.Namespace).Get(testAssets.Ctx, run.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated run: %v", err) + } + + if d := cmp.Diff(run.Status, urun.Status); d != "" { + t.Fatalf("run should not have changed, go %v instead", d) + } + + ceClient := clients.CloudEvents.(cloudevent.FakeClient) + err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, tc.wantCloudEvents) + if err != nil { + t.Errorf(err.Error()) + } + + // Try and reconcile again - expect no event + if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(run)); err != nil { + t.Fatal("Didn't expect an error, but got one.") + } + err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, []string{}) + if err != nil { + t.Errorf(err.Error()) + } + }) + } +} + +func TestReconcile_CloudEvents_Disabled(t *testing.T) { + names.TestingSeed() + + cmSinkOn := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "http://synk:8080", + }, + } + cmSinkOff := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "", + }, + } + cmRunsOn := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "send-cloudevents-for-runs": "true", + }, + } + cmRunsOff := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "send-cloudevents-for-runs": "false", + }, + } + testcases := []struct { + name string + cms []*corev1.ConfigMap + }{{ + name: "Both disabled", + cms: []*corev1.ConfigMap{cmSinkOff, cmRunsOff}, + }, { + name: "Sink Disabled", + cms: []*corev1.ConfigMap{cmSinkOff, cmRunsOn}, + }, { + name: "Runs Disabled", + cms: []*corev1.ConfigMap{cmSinkOn, cmRunsOff}, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + + objectStatus := duckv1.Status{ + Conditions: []apis.Condition{ + { + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: v1beta1.PipelineRunReasonCancelled.String(), + }, + }, + } + runStatusFields := v1alpha1.RunStatusFields{} + run := v1alpha1.Run{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-run", + Namespace: "foo", + }, + Spec: v1alpha1.RunSpec{}, + Status: v1alpha1.RunStatus{ + Status: objectStatus, + RunStatusFields: runStatusFields, + }, + } + runs := []*v1alpha1.Run{&run} + + d := test.Data{ + Runs: runs, + ConfigMaps: tc.cms, + } + testAssets, cancel := getRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(run)); err != nil { + t.Fatal("Didn't expect an error, but got one.") + } + if len(clients.Kube.Actions()) == 0 { + t.Errorf("Expected actions to be logged in the kubeclient, got none") + } + + urun, err := clients.Pipeline.TektonV1alpha1().Runs(run.Namespace).Get(testAssets.Ctx, run.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated run: %v", err) + } + + if d := cmp.Diff(run.Status, urun.Status); d != "" { + t.Fatalf("run should not have changed, go %v instead", d) + } + + ceClient := clients.CloudEvents.(cloudevent.FakeClient) + err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, []string{}) + if err != nil { + t.Errorf(err.Error()) + } + }) + } +} diff --git a/pkg/reconciler/testing/logger.go b/pkg/reconciler/testing/logger.go index 386e10837a7..d836d58dde6 100644 --- a/pkg/reconciler/testing/logger.go +++ b/pkg/reconciler/testing/logger.go @@ -4,6 +4,7 @@ import ( "context" "testing" + lru "github.com/hashicorp/golang-lru" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -11,6 +12,7 @@ import ( "knative.dev/pkg/injection" logtesting "knative.dev/pkg/logging/testing" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -28,6 +30,8 @@ func SetupFakeContext(t *testing.T) (context.Context, []controller.Informer) { SendSuccessfully: true, } ctx = cloudevent.WithClient(ctx, &cloudEventClientBehaviour) + cacheClient, _ := lru.New(128) + ctx = cache.ToContext(ctx, cacheClient) return WithLogger(ctx, t), informer }