Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]: Started using tracker to ensure typemeta is set #7093

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/reconciler/inmemorychannel/dispatcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kelseyhightower/envconfig"
"knative.dev/pkg/kmeta"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/channel/multichannelfanout"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/kncloudevents"
Expand Down Expand Up @@ -127,6 +128,8 @@ func NewController(
return controller.Options{SkipStatusUpdates: true, FinalizerName: finalizerName}
})

r.tracker = impl.Tracker

// Watch for inmemory channels.
inmemorychannelInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
Expand All @@ -137,6 +140,13 @@ func NewController(
DeleteFunc: r.deleteFunc,
}})

inmemorychannelInformer.Informer().AddEventHandler(controller.HandleAll(
controller.EnsureTypeMeta(
r.tracker.OnChanged,
messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel"),
),
))
Comment on lines +143 to +148
Copy link
Member

@pierDipi pierDipi Jul 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the vague comment, it's not the tracker the solution but EnsureTypeMeta therefore where we set the handler (just above, it eventually calls ReconcileKind) we need to call EnsureTypeMeta

	gvk := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel")

	// ...

				AddFunc:    controller.EnsureTypeMeta(impl.Enqueue, gvk)
				UpdateFunc: controller.PassNew(controller.EnsureTypeMeta(impl.Enqueue, gvk)),
				DeleteFunc: controller.EnsureTypeMeta(r.deleteFunc, gvk),

that ensures that whatever object is passed to callback has the Typemeta based on gvk

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay thanks that makes sense!


featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
impl.GlobalResync(inmemorychannelInformer.Informer())
})
Expand Down
13 changes: 13 additions & 0 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ import (
messagingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/messaging/v1"
reconcilerv1 "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/tracker"
)

// Reconciler reconciles InMemory Channels.
type Reconciler struct {
multiChannelMessageHandler multichannelfanout.MultiChannelMessageHandler
reporter channel.StatsReporter
messagingClientSet messagingv1.MessagingV1Interface
tracker tracker.Interface
}

// Check the interfaces Reconciler should implement
Expand All @@ -74,6 +76,17 @@ func (r *Reconciler) ObserveKind(ctx context.Context, imc *v1.InMemoryChannel) r

func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) reconciler.Event {
logging.FromContext(ctx).Infow("Reconciling", zap.Any("InMemoryChannel", imc))
ref := tracker.Reference{
APIVersion: "messaging.knative.dev/v1",
Kind: "InMemoryChannel",
Namespace: imc.Namespace,
Name: imc.Name,
}
logging.FromContext(ctx).Infof("ref: %+v", ref)
if err := r.tracker.TrackReference(ref, imc); err != nil {
logging.FromContext(ctx).Infow("TrackReference for InMemoryChannel failed", zap.Any("InMemoryChannel", imc), zap.Error(err))
}
logging.FromContext(ctx).Infof("imc: %+v", imc)

if !imc.IsReady() {
logging.FromContext(ctx).Debug("IMC is not ready, skipping")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ func TestAllCases(t *testing.T) {
r := &Reconciler{
multiChannelMessageHandler: newFakeMultiChannelHandler(),
messagingClientSet: fakeeventingclient.Get(ctx).MessagingV1(),
tracker: &FakeTracker{},
}
return inmemorychannel.NewReconciler(ctx, logger,
fakeeventingclient.Get(ctx), listers.GetInMemoryChannelLister(),
Expand Down Expand Up @@ -508,6 +509,7 @@ func TestReconciler_ReconcileKind(t *testing.T) {
r := &Reconciler{
multiChannelMessageHandler: handler,
messagingClientSet: fakeEventingClient.MessagingV1(),
tracker: &FakeTracker{},
}
e := r.ReconcileKind(ctx, tc.imc)
if e != tc.wantResult {
Expand Down Expand Up @@ -550,6 +552,7 @@ func TestReconciler_InvalidInputs(t *testing.T) {
}
r := &Reconciler{
multiChannelMessageHandler: handler,
tracker: &FakeTracker{},
}
r.deleteFunc(tc.imc)
})
Expand Down Expand Up @@ -580,6 +583,7 @@ func TestReconciler_Deletion(t *testing.T) {
}
r := &Reconciler{
multiChannelMessageHandler: handler,
tracker: &FakeTracker{},
}
r.deleteFunc(tc.imc)
if handler.GetChannelHandler(channelServiceAddress.URL.Host) != nil {
Expand Down
Loading