diff --git a/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go b/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go index a61486a0cd5..2b1da457833 100644 --- a/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go @@ -20,6 +20,9 @@ package eventtype import ( context "context" + fmt "fmt" + reflect "reflect" + strings "strings" corev1 "k8s.io/api/core/v1" watch "k8s.io/apimachinery/pkg/watch" @@ -27,9 +30,9 @@ import ( v1 "k8s.io/client-go/kubernetes/typed/core/v1" record "k8s.io/client-go/tools/record" versionedscheme "knative.dev/eventing/pkg/client/clientset/versioned/scheme" - injectionclient "knative.dev/eventing/pkg/client/injection/client" + client "knative.dev/eventing/pkg/client/injection/client" eventtype "knative.dev/eventing/pkg/client/injection/informers/eventing/v1beta1/eventtype" - client "knative.dev/pkg/client/injection/kube/client" + kubeclient "knative.dev/pkg/client/injection/kube/client" controller "knative.dev/pkg/controller" logging "knative.dev/pkg/logging" ) @@ -37,7 +40,6 @@ import ( const ( defaultControllerAgentName = "eventtype-controller" defaultFinalizerName = "eventtypes.eventing.knative.dev" - defaultQueueName = "eventtypes" ) // NewImpl returns a controller.Impl that handles queuing and feeding work from @@ -54,6 +56,41 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF eventtypeInformer := eventtype.Get(ctx) + rec := &reconcilerImpl{ + Client: client.Get(ctx), + Lister: eventtypeInformer.Lister(), + reconciler: r, + finalizerName: defaultFinalizerName, + } + + t := reflect.TypeOf(r).Elem() + queueName := fmt.Sprintf("%s.%s", strings.ReplaceAll(t.PkgPath(), "/", "-"), t.Name()) + + impl := controller.NewImpl(rec, logger, queueName) + agentName := defaultControllerAgentName + + // Pass impl to the options. Save any optional results. + for _, fn := range optionsFns { + opts := fn(impl) + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + if opts.AgentName != "" { + agentName = opts.AgentName + } + } + + rec.Recorder = createRecorder(ctx, agentName) + + return impl +} + +func createRecorder(ctx context.Context, agentName string) record.EventRecorder { + logger := logging.FromContext(ctx) + recorder := controller.GetEventRecorder(ctx) if recorder == nil { // Create event broadcaster @@ -62,9 +99,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF watches := []watch.Interface{ eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), eventBroadcaster.StartRecordingToSink( - &v1.EventSinkImpl{Interface: client.Get(ctx).CoreV1().Events("")}), + &v1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events("")}), } - recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: defaultControllerAgentName}) + recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName}) go func() { <-ctx.Done() for _, w := range watches { @@ -73,23 +110,7 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF }() } - rec := &reconcilerImpl{ - Client: injectionclient.Get(ctx), - Lister: eventtypeInformer.Lister(), - Recorder: recorder, - reconciler: r, - } - impl := controller.NewImpl(rec, logger, defaultQueueName) - - // Pass impl to the options. Save any optional results. - for _, fn := range optionsFns { - opts := fn(impl) - if opts.ConfigStore != nil { - rec.configStore = opts.ConfigStore - } - } - - return impl + return recorder } func init() { diff --git a/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/reconciler.go b/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/reconciler.go index 3fd10e551b3..c2c68a3062d 100644 --- a/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/reconciler.go +++ b/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/reconciler.go @@ -20,15 +20,15 @@ package eventtype import ( context "context" - "encoding/json" - "reflect" + json "encoding/json" + reflect "reflect" zap "go.uber.org/zap" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" + equality "k8s.io/apimachinery/pkg/api/equality" errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + types "k8s.io/apimachinery/pkg/types" sets "k8s.io/apimachinery/pkg/util/sets" cache "k8s.io/client-go/tools/cache" record "k8s.io/client-go/tools/record" @@ -81,6 +81,9 @@ type reconcilerImpl struct { // reconciler is the implementation of the business logic of the resource. reconciler Interface + + // finalizerName is the name of the finalizer to reconcile. + finalizerName string } // Check that our Reconciler implements controller.Reconciler @@ -93,16 +96,20 @@ func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client versio } rec := &reconcilerImpl{ - Client: client, - Lister: lister, - Recorder: recorder, - reconciler: r, + Client: client, + Lister: lister, + Recorder: recorder, + reconciler: r, + finalizerName: defaultFinalizerName, } for _, opts := range options { if opts.ConfigStore != nil { rec.configStore = opts.ConfigStore } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } } return rec @@ -137,7 +144,7 @@ func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { if errors.IsNotFound(err) { // The resource may no longer exist, in which case we stop processing. - logger.Errorf("resource %q no longer exists", key) + logger.Debugf("resource %q no longer exists", key) return nil } else if err != nil { return err @@ -160,6 +167,7 @@ func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { // Reconcile this copy of the resource and then write back any status // updates regardless of whether the reconciliation errored out. reconcileEvent = r.reconciler.ReconcileKind(ctx, resource) + } else if fin, ok := r.reconciler.(Finalizer); ok { // Append the target method to the logger. logger = logger.With(zap.String("targetMethod", "FinalizeKind")) @@ -189,15 +197,21 @@ func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { if reconcileEvent != nil { var event *reconciler.ReconcilerEvent if reconciler.EventAs(reconcileEvent, &event) { - logger.Infow("returned an event", zap.Any("event", reconcileEvent)) + logger.Infow("Returned an event", zap.Any("event", reconcileEvent)) r.Recorder.Eventf(resource, event.EventType, event.Reason, event.Format, event.Args...) + + // the event was wrapped inside an error, consider the reconciliation as failed + if _, isEvent := reconcileEvent.(*reconciler.ReconcilerEvent); !isEvent { + return reconcileEvent + } return nil - } else { - logger.Errorw("returned an error", zap.Error(reconcileEvent)) - r.Recorder.Event(resource, v1.EventTypeWarning, "InternalError", reconcileEvent.Error()) - return reconcileEvent } + + logger.Errorw("Returned an error", zap.Error(reconcileEvent)) + r.Recorder.Event(resource, v1.EventTypeWarning, "InternalError", reconcileEvent.Error()) + return reconcileEvent } + return nil } @@ -231,9 +245,8 @@ func (r *reconcilerImpl) updateStatus(existing *v1beta1.EventType, desired *v1be // updateFinalizersFiltered will update the Finalizers of the resource. // TODO: this method could be generic and sync all finalizers. For now it only -// updates defaultFinalizerName. +// updates defaultFinalizerName or its override. func (r *reconcilerImpl) updateFinalizersFiltered(ctx context.Context, resource *v1beta1.EventType) (*v1beta1.EventType, error) { - finalizerName := defaultFinalizerName getter := r.Lister.EventTypes(resource.Namespace) @@ -251,20 +264,20 @@ func (r *reconcilerImpl) updateFinalizersFiltered(ctx context.Context, resource existingFinalizers := sets.NewString(existing.Finalizers...) desiredFinalizers := sets.NewString(resource.Finalizers...) - if desiredFinalizers.Has(finalizerName) { - if existingFinalizers.Has(finalizerName) { + if desiredFinalizers.Has(r.finalizerName) { + if existingFinalizers.Has(r.finalizerName) { // Nothing to do. return resource, nil } // Add the finalizer. - finalizers = append(existing.Finalizers, finalizerName) + finalizers = append(existing.Finalizers, r.finalizerName) } else { - if !existingFinalizers.Has(finalizerName) { + if !existingFinalizers.Has(r.finalizerName) { // Nothing to do. return resource, nil } // Remove the finalizer. - existingFinalizers.Delete(finalizerName) + existingFinalizers.Delete(r.finalizerName) finalizers = existingFinalizers.List() } @@ -302,12 +315,12 @@ func (r *reconcilerImpl) setFinalizerIfFinalizer(ctx context.Context, resource * // If this resource is not being deleted, mark the finalizer. if resource.GetDeletionTimestamp().IsZero() { - finalizers.Insert(defaultFinalizerName) + finalizers.Insert(r.finalizerName) } resource.Finalizers = finalizers.List() - // Synchronize the finalizers filtered by defaultFinalizerName. + // Synchronize the finalizers filtered by r.finalizerName. return r.updateFinalizersFiltered(ctx, resource) } @@ -325,15 +338,15 @@ func (r *reconcilerImpl) clearFinalizer(ctx context.Context, resource *v1beta1.E var event *reconciler.ReconcilerEvent if reconciler.EventAs(reconcileEvent, &event) { if event.EventType == v1.EventTypeNormal { - finalizers.Delete(defaultFinalizerName) + finalizers.Delete(r.finalizerName) } } } else { - finalizers.Delete(defaultFinalizerName) + finalizers.Delete(r.finalizerName) } resource.Finalizers = finalizers.List() - // Synchronize the finalizers filtered by defaultFinalizerName. + // Synchronize the finalizers filtered by r.finalizerName. return r.updateFinalizersFiltered(ctx, resource) }