diff --git a/cmd/controller/main.go b/cmd/controller/main.go index eb333fe293b..137f4721c39 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -25,6 +25,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun" + "github.com/tektoncd/pipeline/pkg/reconciler/run" "github.com/tektoncd/pipeline/pkg/reconciler/taskrun" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/clock" @@ -104,6 +105,7 @@ func main() { sharedmain.MainWithConfig(ctx, ControllerLogKey, cfg, taskrun.NewController(opts, clock.RealClock{}), pipelinerun.NewController(opts, clock.RealClock{}), + run.NewController(), ) } diff --git a/config/config-feature-flags.yaml b/config/config-feature-flags.yaml index e4732f53ec6..da4bff2d9a7 100644 --- a/config/config-feature-flags.yaml +++ b/config/config-feature-flags.yaml @@ -74,3 +74,6 @@ data: # its dependent Tasks. This flag defaults to "true"; when expressions guard # the Task only. See TEP-0059 and Pipeline documentation for more details. scope-when-expressions-to-task: "true" + # Setting this flag to "true" enables CloudEvents for Runs, as long as a + # CloudEvents sink is configured in the config-defaults config map + send-cloudevents-for-runs: "false" diff --git a/docs/events.md b/docs/events.md index adc16c9e83d..6b5a36fc211 100644 --- a/docs/events.md +++ b/docs/events.md @@ -74,7 +74,11 @@ Resource |Event |Event Type `Run` | `Succeed` | `dev.tekton.event.run.successful.v1` `Run` | `Failed` | `dev.tekton.event.run.failed.v1` -`CloudEvents` for `Runs` are defined but not sent yet. +`CloudEvents` for `Runs` are only sent when enabled in the [configuration](./install.md#configuring-cloudevents-notifications). + +**Note**: `CloudEvents` for `Runs` rely on an ephemeral cache to avoid duplicate +events. In case of controller restart, the cache is reset and duplicate events +may be sent. ## Format of `CloudEvents` diff --git a/docs/install.md b/docs/install.md index 01d2fdac93c..ad15c42a823 100644 --- a/docs/install.md +++ b/docs/install.md @@ -268,11 +268,11 @@ data: ## Configuring CloudEvents notifications -When configured so, Tekton can generate `CloudEvents` for `TaskRun` and `PipelineRun` lifecycle -events. The only configuration parameter is the URL of the sink. When not set, no notification is -generated. +When configured so, Tekton can generate `CloudEvents` for `TaskRun`, +`PipelineRun` and `Run`lifecycle events. The main configuration parameter is the +URL of the sink. When not set, no notification is generated. -``` +```yaml apiVersion: v1 kind: ConfigMap metadata: @@ -285,6 +285,23 @@ data: default-cloud-events-sink: https://my-sink-url ``` +Additionally, CloudEvents for `Runs` require an extra configuration to be +enabled. This setting exists to avoid collisions with CloudEvents that might +be sent by custom task controllers: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: feature-flags + namespace: tekton-pipelines + labels: + app.kubernetes.io/instance: default + app.kubernetes.io/part-of: tekton-pipelines +data: + send-cloudevents-for-runs: true +``` + ## Configuring self-signed cert for private registry The `SSL_CERT_DIR` is set to `/etc/ssl/certs` as the default cert directory. If you are using a self-signed cert for private registry and the cert file is not under the default cert directory, configure your registry cert in the `config-registry-cert` `ConfigMap` with the key `cert`. diff --git a/pkg/apis/config/feature_flags.go b/pkg/apis/config/feature_flags.go index f196a1873af..dab18dad45f 100644 --- a/pkg/apis/config/feature_flags.go +++ b/pkg/apis/config/feature_flags.go @@ -46,6 +46,8 @@ const ( DefaultScopeWhenExpressionsToTask = true // DefaultEnableAPIFields is the default value for "enable-api-fields". DefaultEnableAPIFields = StableAPIFields + // DefaultSendCloudEventsForRuns is the default value for "send-cloudevents-for-runs". + DefaultSendCloudEventsForRuns = false disableAffinityAssistantKey = "disable-affinity-assistant" disableCredsInitKey = "disable-creds-init" @@ -55,6 +57,7 @@ const ( enableCustomTasks = "enable-custom-tasks" enableAPIFields = "enable-api-fields" scopeWhenExpressionsToTask = "scope-when-expressions-to-task" + sendCloudEventsForRuns = "send-cloudevents-for-runs" ) // FeatureFlags holds the features configurations @@ -68,6 +71,7 @@ type FeatureFlags struct { EnableCustomTasks bool ScopeWhenExpressionsToTask bool EnableAPIFields string + SendCloudEventsForRuns bool } // GetFeatureFlagsConfigName returns the name of the configmap containing all @@ -113,6 +117,9 @@ func NewFeatureFlagsFromMap(cfgMap map[string]string) (*FeatureFlags, error) { if err := setEnabledAPIFields(cfgMap, DefaultEnableAPIFields, &tc.EnableAPIFields); err != nil { return nil, err } + if err := setFeature(sendCloudEventsForRuns, DefaultSendCloudEventsForRuns, &tc.SendCloudEventsForRuns); err != nil { + return nil, err + } // Given that they are alpha features, Tekton Bundles and Custom Tasks should be switched on if // enable-api-fields is "alpha". If enable-api-fields is not "alpha" then fall back to the value of diff --git a/pkg/apis/config/feature_flags_test.go b/pkg/apis/config/feature_flags_test.go index e7737cc8650..792f3b08194 100644 --- a/pkg/apis/config/feature_flags_test.go +++ b/pkg/apis/config/feature_flags_test.go @@ -50,6 +50,7 @@ func TestNewFeatureFlagsFromConfigMap(t *testing.T) { EnableCustomTasks: true, ScopeWhenExpressionsToTask: true, EnableAPIFields: "alpha", + SendCloudEventsForRuns: true, }, fileName: "feature-flags-all-flags-set", }, diff --git a/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml b/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml index 817c9899a0f..84d5a6f6bd4 100644 --- a/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml +++ b/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml @@ -25,3 +25,4 @@ data: enable-custom-tasks: "true" scope-when-expressions-to-task: "true" enable-api-fields: "alpha" + send-cloudevents-for-runs: "true" diff --git a/pkg/reconciler/events/cache/cache.go b/pkg/reconciler/events/cache/cache.go new file mode 100644 index 00000000000..d04441dee17 --- /dev/null +++ b/pkg/reconciler/events/cache/cache.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "encoding/json" + "errors" + "fmt" + + cloudevents "github.com/cloudevents/sdk-go/v2" + lru "github.com/hashicorp/golang-lru" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" +) + +// Struct to unmarshal the event data +type eventData struct { + Run *v1alpha1.Run `json:"run,omitempty"` +} + +// AddEventSentToCache adds the particular object to cache marking it as sent +func AddEventSentToCache(cacheClient *lru.Cache, event *cloudevents.Event) error { + if cacheClient == nil { + return errors.New("cache client is nil") + } + eventKey, err := EventKey(event) + if err != nil { + return err + } + cacheClient.Add(eventKey, nil) + return nil +} + +// IsCloudEventSent checks if the event exists in the cache +func IsCloudEventSent(cacheClient *lru.Cache, event *cloudevents.Event) (bool, error) { + if cacheClient == nil { + return false, errors.New("cache client is nil") + } + eventKey, err := EventKey(event) + if err != nil { + return false, err + } + return cacheClient.Contains(eventKey), nil +} + +// EventKey defines whether an event is considered different from another +// in future we might want to let specific event types override this +func EventKey(event *cloudevents.Event) (string, error) { + var ( + data eventData + resourceName string + resourceNamespace string + ) + err := json.Unmarshal(event.Data(), &data) + if err != nil { + return "", err + } + if data.Run == nil { + return "", fmt.Errorf("Invalid Run data in %v", event) + } + resourceName = data.Run.Name + resourceNamespace = data.Run.Namespace + eventType := event.Type() + return fmt.Sprintf("%s/run/%s/%s", eventType, resourceNamespace, resourceName), nil +} diff --git a/pkg/reconciler/events/cache/cache_test.go b/pkg/reconciler/events/cache/cache_test.go new file mode 100644 index 00000000000..0e016d0687f --- /dev/null +++ b/pkg/reconciler/events/cache/cache_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "net/url" + "testing" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + lru "github.com/hashicorp/golang-lru" + + "github.com/cloudevents/sdk-go/v2/event" + cetypes "github.com/cloudevents/sdk-go/v2/types" + "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/tektoncd/pipeline/test/diff" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func strptr(s string) *string { return &s } + +func getEventData(run interface{}) map[string]interface{} { + cloudEventData := map[string]interface{}{} + if v, ok := run.(*v1alpha1.Run); ok { + cloudEventData["run"] = v + } + return cloudEventData +} + +func getEventToTest(eventtype string, run interface{}) *event.Event { + e := event.Event{ + Context: event.EventContextV1{ + Type: eventtype, + Source: cetypes.URIRef{URL: url.URL{Path: "/foo/bar/source"}}, + ID: "test-event", + Time: &cetypes.Timestamp{Time: time.Now()}, + Subject: strptr("topic"), + }.AsV1(), + } + if err := e.SetData(cloudevents.ApplicationJSON, getEventData(run)); err != nil { + panic(err) + } + return &e +} + +func getRunByMeta(name string, namespace string) *v1alpha1.Run { + return &v1alpha1.Run{ + TypeMeta: metav1.TypeMeta{ + Kind: "Run", + APIVersion: "v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.RunSpec{}, + Status: v1alpha1.RunStatus{}, + } +} + +// TestEventsKey verifies that keys are extracted correctly from events +func TestEventsKey(t *testing.T) { + testcases := []struct { + name string + eventtype string + run interface{} + wantKey string + wantErr bool + }{{ + name: "run event", + eventtype: "my.test.run.event", + run: getRunByMeta("myrun", "mynamespace"), + wantKey: "my.test.run.event/run/mynamespace/myrun", + wantErr: false, + }, { + name: "run event missing data", + eventtype: "my.test.run.event", + run: nil, + wantKey: "", + wantErr: true, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + gotEvent := getEventToTest(tc.eventtype, tc.run) + gotKey, err := EventKey(gotEvent) + if err != nil { + if !tc.wantErr { + t.Fatalf("Expecting an error, got none") + } + } + if d := cmp.Diff(tc.wantKey, gotKey); d != "" { + t.Errorf("Wrong Event key %s", diff.PrintWantGot(d)) + } + }) + } +} + +func TestAddCheckEvent(t *testing.T) { + run := getRunByMeta("arun", "anamespace") + runb := getRunByMeta("arun", "bnamespace") + baseEvent := getEventToTest("some.event.type", run) + + testcases := []struct { + name string + firstEvent *event.Event + secondEvent *event.Event + wantFound bool + }{{ + name: "identical events", + firstEvent: baseEvent, + secondEvent: baseEvent, + wantFound: true, + }, { + name: "new timestamp event", + firstEvent: baseEvent, + secondEvent: getEventToTest("some.event.type", run), + wantFound: true, + }, { + name: "different namespace", + firstEvent: baseEvent, + secondEvent: getEventToTest("some.event.type", runb), + wantFound: false, + }, { + name: "different event type", + firstEvent: baseEvent, + secondEvent: getEventToTest("some.other.event.type", run), + wantFound: false, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + testCache, _ := lru.New(10) + AddEventSentToCache(testCache, tc.firstEvent) + found, _ := IsCloudEventSent(testCache, tc.secondEvent) + if d := cmp.Diff(tc.wantFound, found); d != "" { + t.Errorf("Cache check failure %s", diff.PrintWantGot(d)) + } + }) + } +} diff --git a/pkg/reconciler/events/cache/cacheclient.go b/pkg/reconciler/events/cache/cacheclient.go new file mode 100644 index 00000000000..edbea78039a --- /dev/null +++ b/pkg/reconciler/events/cache/cacheclient.go @@ -0,0 +1,63 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + "k8s.io/client-go/rest" + "knative.dev/pkg/injection" + "knative.dev/pkg/logging" +) + +// With 4 events per Run, we can store events for 1024 concurrent Runs +const bufferSize = 4096 + +func init() { + injection.Default.RegisterClient(withCacheClient) +} + +// cacheKey is a way to associate the Cache from inside the context.Context +type cacheKey struct{} + +func withCacheClient(ctx context.Context, cfg *rest.Config) context.Context { + logger := logging.FromContext(ctx) + + cacheClient, err := lru.New(bufferSize) + logger.Infof("CACHE CLIENT %+v", cacheClient) + if err != nil { + logger.Error("unable to create cacheClient :" + err.Error()) + } + + return ToContext(ctx, cacheClient) +} + +// Get extracts the cloudEventClient client from the context. +func Get(ctx context.Context) *lru.Cache { + untyped := ctx.Value(cacheKey{}) + if untyped == nil { + logging.FromContext(ctx).Errorf("Unable to fetch client from context.") + return nil + } + return untyped.(*lru.Cache) +} + +// ToContext adds the cloud events client to the context +func ToContext(ctx context.Context, c *lru.Cache) context.Context { + return context.WithValue(ctx, cacheKey{}, c) +} diff --git a/pkg/reconciler/events/cache/cachefakeclient.go b/pkg/reconciler/events/cache/cachefakeclient.go new file mode 100644 index 00000000000..f1245f374cb --- /dev/null +++ b/pkg/reconciler/events/cache/cachefakeclient.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + "k8s.io/client-go/rest" + "knative.dev/pkg/injection" + "knative.dev/pkg/logging" +) + +const fakeBufferSize = 128 + +func init() { + injection.Fake.RegisterClient(withFakeCacheClient) +} + +func withFakeCacheClient(ctx context.Context, cfg *rest.Config) context.Context { + logger := logging.FromContext(ctx) + + // With 4 events per Run, we can store events for 32 concurrent Runs + cacheClient, err := lru.New(fakeBufferSize) + logger.Infof("CACHE CLIENT %+v", cacheClient) + if err != nil { + logger.Error("unable to create cacheClient :" + err.Error()) + } + + return ToContext(ctx, cacheClient) +} diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller.go b/pkg/reconciler/events/cloudevent/cloud_event_controller.go index 5a353cec370..5072708477b 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller.go @@ -23,9 +23,11 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/hashicorp/go-multierror" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -137,11 +139,25 @@ func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error if err != nil { return err } + // Events for Runs require a cache of events that have been sent + cacheClient := cache.Get(ctx) + _, isRun := object.(*v1alpha1.Run) wasIn := make(chan error) go func() { wasIn <- nil logger.Debugf("Sending cloudevent of type %q", event.Type()) + // In case of Run event, check cache if cloudevent is already sent + if isRun { + cloudEventSent, err := cache.IsCloudEventSent(cacheClient, event) + if err != nil { + logger.Errorf("error while checking cache: %s", err) + } + if cloudEventSent { + logger.Infof("cloudevent %v already sent", event) + return + } + } if result := ceClient.Send(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), *event); !cloudevents.IsACK(result) { logger.Warnf("Failed to send cloudevent: %s", result.Error()) recorder := controller.GetEventRecorder(ctx) @@ -150,6 +166,12 @@ func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error } recorder.Event(object, corev1.EventTypeWarning, "Cloud Event Failure", result.Error()) } + // In case of Run event, add to the cache to avoid duplicate events + if isRun { + if err := cache.AddEventSentToCache(cacheClient, event); err != nil { + logger.Errorf("error while adding sent event to cache: %s", err) + } + } }() return <-wasIn diff --git a/pkg/reconciler/events/cloudevent/cloudeventclient.go b/pkg/reconciler/events/cloudevent/cloudeventclient.go index 8305bb34fb6..be89c5f5fc9 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventclient.go @@ -33,8 +33,8 @@ func init() { injection.Dynamic.RegisterDynamicClient(withCloudEventClient) } -// CECKey is used to associate the CloudEventClient inside the context.Context -type CECKey struct{} +// ceKey is used to associate the CloudEventClient inside the context.Context +type ceKey struct{} func withCloudEventClient(ctx context.Context) context.Context { logger := logging.FromContext(ctx) @@ -59,12 +59,12 @@ func withCloudEventClient(ctx context.Context) context.Context { logger.Panicf("Error creating the cloudevents client: %s", err) } - return context.WithValue(ctx, CECKey{}, cloudEventClient) + return context.WithValue(ctx, ceKey{}, cloudEventClient) } // Get extracts the cloudEventClient client from the context. func Get(ctx context.Context) CEClient { - untyped := ctx.Value(CECKey{}) + untyped := ctx.Value(ceKey{}) if untyped == nil { logging.FromContext(ctx).Errorf( "Unable to fetch client from context.") @@ -75,5 +75,5 @@ func Get(ctx context.Context) CEClient { // ToContext adds the cloud events client to the context func ToContext(ctx context.Context, cec CEClient) context.Context { - return context.WithValue(ctx, CECKey{}, cec) + return context.WithValue(ctx, ceKey{}, cec) } diff --git a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go index b9ef53f7163..b727ac537ec 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go @@ -74,5 +74,5 @@ func (c FakeClient) StartReceiver(ctx context.Context, fn interface{}) error { // WithClient adds to the context a fake client with the desired behaviour func WithClient(ctx context.Context, behaviour *FakeClientBehaviour) context.Context { - return context.WithValue(ctx, CECKey{}, newFakeClient(behaviour)) + return context.WithValue(ctx, ceKey{}, newFakeClient(behaviour)) } diff --git a/pkg/reconciler/events/event.go b/pkg/reconciler/events/event.go index 1aca5578227..e2bb638b1a3 100644 --- a/pkg/reconciler/events/event.go +++ b/pkg/reconciler/events/event.go @@ -69,6 +69,23 @@ func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition * } } +// EmitCloudEvents emits CloudEvents (only) for object +func EmitCloudEvents(ctx context.Context, object runtime.Object) { + logger := logging.FromContext(ctx) + configs := config.FromContextOrDefaults(ctx) + sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "") + if sendCloudEvents { + ctx = cloudevents.ContextWithTarget(ctx, configs.Defaults.DefaultCloudEventsSink) + } + + if sendCloudEvents { + err := cloudevent.SendCloudEventWithRetries(ctx, object) + if err != nil { + logger.Warnf("Failed to emit cloud events %v", err.Error()) + } + } +} + func sendKubernetesEvents(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) { // Events that are going to be sent // diff --git a/pkg/reconciler/run/controller.go b/pkg/reconciler/run/controller.go new file mode 100644 index 00000000000..74368b86d16 --- /dev/null +++ b/pkg/reconciler/run/controller.go @@ -0,0 +1,59 @@ +/* +Copyright 2022 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package run + +import ( + "context" + + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" + runinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/run" + runreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1alpha1/run" + cacheclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" + cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" +) + +// NewController instantiates a new controller.Impl from knative.dev/pkg/controller +// This is a read-only controller, hence the SkipStatusUpdates set to true +func NewController() func(context.Context, configmap.Watcher) *controller.Impl { + return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { + logger := logging.FromContext(ctx) + runInformer := runinformer.Get(ctx) + + configStore := config.NewStore(logger.Named("config-store")) + configStore.WatchConfigs(cmw) + + c := &Reconciler{ + cloudEventClient: cloudeventclient.Get(ctx), + cacheClient: cacheclient.Get(ctx), + } + impl := runreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { + return controller.Options{ + AgentName: pipeline.RunControllerName, + ConfigStore: configStore, + SkipStatusUpdates: true, + } + }) + + runInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + return impl + } +} diff --git a/pkg/reconciler/run/run.go b/pkg/reconciler/run/run.go new file mode 100644 index 00000000000..9a4f0f0cf16 --- /dev/null +++ b/pkg/reconciler/run/run.go @@ -0,0 +1,73 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package run + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + runreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1alpha1/run" + "github.com/tektoncd/pipeline/pkg/reconciler/events" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + _ "github.com/tektoncd/pipeline/pkg/taskrunmetrics/fake" // Make sure the taskrunmetrics are setup + "knative.dev/pkg/apis" + "knative.dev/pkg/logging" + pkgreconciler "knative.dev/pkg/reconciler" +) + +// Reconciler implements controller.Reconciler for Configuration resources. +type Reconciler struct { + cloudEventClient cloudevent.CEClient + cacheClient *lru.Cache +} + +// Check that our Reconciler implements runreconciler.Interface +var ( + _ runreconciler.Interface = (*Reconciler)(nil) +) + +// ReconcileKind compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Task Run +// resource with the current status of the resource. +func (c *Reconciler) ReconcileKind(ctx context.Context, run *v1alpha1.Run) pkgreconciler.Event { + logger := logging.FromContext(ctx) + configs := config.FromContextOrDefaults(ctx) + ctx = cloudevent.ToContext(ctx, c.cloudEventClient) + ctx = cache.ToContext(ctx, c.cacheClient) + // ctx = cache.ToContext(ctx, c.cacheClient) + logger.Infof("Reconciling %s", run.Name) + + // Create a copy of the run object, just in case, to avoid sync'ing changes + runEvents := *run.DeepCopy() + + if configs.FeatureFlags.SendCloudEventsForRuns { + // Custom task controllers may be sending events for "Runs" associated + // to the custom tasks they control. To avoid sending duplicate events, + // CloudEvents for "Runs" are only sent when enabled + + // Read and log the condition + condition := runEvents.Status.GetCondition(apis.ConditionSucceeded) + logger.Debugf("Emitting cloudevent for %s, condition: %s", runEvents.Name, condition) + + events.EmitCloudEvents(ctx, &runEvents) + } + + return nil +} diff --git a/pkg/reconciler/run/run_test.go b/pkg/reconciler/run/run_test.go new file mode 100644 index 00000000000..0b75a564ad3 --- /dev/null +++ b/pkg/reconciler/run/run_test.go @@ -0,0 +1,359 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package run + +import ( + "context" + "strings" + "testing" + + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + + "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" + ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing" + "github.com/tektoncd/pipeline/test" + eventstest "github.com/tektoncd/pipeline/test/events" + "github.com/tektoncd/pipeline/test/names" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "knative.dev/pkg/apis" + cminformer "knative.dev/pkg/configmap/informer" + pkgreconciler "knative.dev/pkg/reconciler" + + "knative.dev/pkg/system" + + _ "knative.dev/pkg/system/testing" // Setup system.Namespace() +) + +func ensureConfigurationConfigMapsExist(d *test.Data) { + var defaultsExists, featureFlagsExists, artifactBucketExists, artifactPVCExists, metricsExists bool + for _, cm := range d.ConfigMaps { + if cm.Name == config.GetDefaultsConfigName() { + defaultsExists = true + } + if cm.Name == config.GetFeatureFlagsConfigName() { + featureFlagsExists = true + } + if cm.Name == config.GetArtifactBucketConfigName() { + artifactBucketExists = true + } + if cm.Name == config.GetArtifactPVCConfigName() { + artifactPVCExists = true + } + if cm.Name == config.GetMetricsConfigName() { + metricsExists = true + } + } + if !defaultsExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } + if !featureFlagsExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } + if !artifactBucketExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetArtifactBucketConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } + if !artifactPVCExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetArtifactPVCConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } + if !metricsExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetMetricsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{}, + }) + } +} + +func initializeRunControllerAssets(t *testing.T, d test.Data) (test.Assets, func()) { + ctx, _ := ttesting.SetupFakeContext(t) + ctx, cancel := context.WithCancel(ctx) + ensureConfigurationConfigMapsExist(&d) + c, informers := test.SeedTestData(t, ctx, d) + configMapWatcher := cminformer.NewInformedWatcher(c.Kube, system.Namespace()) + ctl := NewController()(ctx, configMapWatcher) + if err := configMapWatcher.Start(ctx.Done()); err != nil { + t.Fatalf("error starting configmap watcher: %v", err) + } + + if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok { + la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {}) + } + + return test.Assets{ + Logger: logging.FromContext(ctx), + Controller: ctl, + Clients: c, + Informers: informers, + Recorder: controller.GetEventRecorder(ctx).(*record.FakeRecorder), + Ctx: ctx, + }, cancel +} + +func getRunName(run v1alpha1.Run) string { + return strings.Join([]string{run.Namespace, run.Name}, "/") +} + +// getRunController returns an instance of the TaskRun controller/reconciler that has been seeded with +// d, where d represents the state of the system (existing resources) needed for the test. +func getRunController(t *testing.T, d test.Data) (test.Assets, func()) { + t.Helper() + names.TestingSeed() + return initializeRunControllerAssets(t, d) +} + +// TestReconcile_CloudEvents runs reconcile with a cloud event sink configured +// to ensure that events are sent in different cases +func TestReconcile_CloudEvents(t *testing.T) { + names.TestingSeed() + + cms := []*corev1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "http://synk:8080", + }, + }, { + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "send-cloudevents-for-runs": "true", + }, + }, + } + testcases := []struct { + name string + condition *apis.Condition + wantCloudEvents []string + }{{ + name: "Run with no condition", + condition: nil, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.started.v1.*test-run`}, + }, { + name: "Run with unknown condition", + condition: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Reason: v1beta1.TaskRunReasonRunning.String(), + }, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.running.v1.*test-run`}, + }, { + name: "Run with finished true condition", + condition: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + Reason: v1beta1.PipelineRunReasonSuccessful.String(), + }, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.successful.v1.*test-run`}, + }, { + name: "Run with finished false condition", + condition: &apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: v1beta1.PipelineRunReasonCancelled.String(), + }, + wantCloudEvents: []string{`(?s)dev.tekton.event.run.failed.v1.*test-run`}, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + + objectStatus := duckv1.Status{ + Conditions: []apis.Condition{}, + } + runStatusFields := v1alpha1.RunStatusFields{} + if tc.condition != nil { + objectStatus.Conditions = append(objectStatus.Conditions, *tc.condition) + } + run := v1alpha1.Run{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-run", + Namespace: "foo", + }, + Spec: v1alpha1.RunSpec{}, + Status: v1alpha1.RunStatus{ + Status: objectStatus, + RunStatusFields: runStatusFields, + }, + } + runs := []*v1alpha1.Run{&run} + + d := test.Data{ + Runs: runs, + ConfigMaps: cms, + } + testAssets, cancel := getRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(run)); err != nil { + t.Fatal("Didn't expect an error, but got one.") + } + + for _, a := range clients.Kube.Actions() { + aVerb := a.GetVerb() + if aVerb != "get" && aVerb != "list" && aVerb != "watch" { + t.Errorf("Expected only read actions to be logged in the kubeclient, got %s", aVerb) + } + } + + urun, err := clients.Pipeline.TektonV1alpha1().Runs(run.Namespace).Get(testAssets.Ctx, run.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated run: %v", err) + } + + if d := cmp.Diff(run.Status, urun.Status); d != "" { + t.Fatalf("run should not have changed, go %v instead", d) + } + + ceClient := clients.CloudEvents.(cloudevent.FakeClient) + err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, tc.wantCloudEvents) + if err != nil { + t.Errorf(err.Error()) + } + + // Try and reconcile again - expect no event + if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(run)); err != nil { + t.Fatal("Didn't expect an error, but got one.") + } + err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, []string{}) + if err != nil { + t.Errorf(err.Error()) + } + }) + } +} + +func TestReconcile_CloudEvents_Disabled(t *testing.T) { + names.TestingSeed() + + cmSinkOn := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "http://synk:8080", + }, + } + cmSinkOff := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "", + }, + } + cmRunsOn := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "send-cloudevents-for-runs": "true", + }, + } + cmRunsOff := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, + Data: map[string]string{ + "send-cloudevents-for-runs": "false", + }, + } + testcases := []struct { + name string + cms []*corev1.ConfigMap + }{{ + name: "Both disabled", + cms: []*corev1.ConfigMap{cmSinkOff, cmRunsOff}, + }, { + name: "Sink Disabled", + cms: []*corev1.ConfigMap{cmSinkOff, cmRunsOn}, + }, { + name: "Runs Disabled", + cms: []*corev1.ConfigMap{cmSinkOn, cmRunsOff}, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + + objectStatus := duckv1.Status{ + Conditions: []apis.Condition{ + { + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: v1beta1.PipelineRunReasonCancelled.String(), + }, + }, + } + runStatusFields := v1alpha1.RunStatusFields{} + run := v1alpha1.Run{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-run", + Namespace: "foo", + }, + Spec: v1alpha1.RunSpec{}, + Status: v1alpha1.RunStatus{ + Status: objectStatus, + RunStatusFields: runStatusFields, + }, + } + runs := []*v1alpha1.Run{&run} + + d := test.Data{ + Runs: runs, + ConfigMaps: tc.cms, + } + testAssets, cancel := getRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(run)); err != nil { + t.Fatal("Didn't expect an error, but got one.") + } + if len(clients.Kube.Actions()) == 0 { + t.Errorf("Expected actions to be logged in the kubeclient, got none") + } + + urun, err := clients.Pipeline.TektonV1alpha1().Runs(run.Namespace).Get(testAssets.Ctx, run.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated run: %v", err) + } + + if d := cmp.Diff(run.Status, urun.Status); d != "" { + t.Fatalf("run should not have changed, go %v instead", d) + } + + ceClient := clients.CloudEvents.(cloudevent.FakeClient) + err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, []string{}) + if err != nil { + t.Errorf(err.Error()) + } + }) + } +} diff --git a/pkg/reconciler/testing/logger.go b/pkg/reconciler/testing/logger.go index 386e10837a7..d836d58dde6 100644 --- a/pkg/reconciler/testing/logger.go +++ b/pkg/reconciler/testing/logger.go @@ -4,6 +4,7 @@ import ( "context" "testing" + lru "github.com/hashicorp/golang-lru" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -11,6 +12,7 @@ import ( "knative.dev/pkg/injection" logtesting "knative.dev/pkg/logging/testing" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cache" "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -28,6 +30,8 @@ func SetupFakeContext(t *testing.T) (context.Context, []controller.Informer) { SendSuccessfully: true, } ctx = cloudevent.WithClient(ctx, &cloudEventClientBehaviour) + cacheClient, _ := lru.New(128) + ctx = cache.ToContext(ctx, cacheClient) return WithLogger(ctx, t), informer }