Skip to content

Commit

Permalink
fixup: Fix race issues in leader relection
Browse files Browse the repository at this point in the history
Signed-off-by: rjanakiraman <[email protected]>
  • Loading branch information
janakiramanmesh7 committed Jun 26, 2024
1 parent e907f8c commit 02c2045
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 194 deletions.
14 changes: 10 additions & 4 deletions pkg/permissionclaim/permissionclaim_labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ func NewLabeler(
apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer,
apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer,
) *Labeler {
indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

return &Labeler{
listAPIBindingsAcceptingClaimedGroupResource: func(clusterName logicalcluster.Name, groupResource schema.GroupResource) ([]*apisv1alpha1.APIBinding, error) {
indexKey := indexers.ClusterAndGroupResourceValue(clusterName, groupResource)
Expand Down Expand Up @@ -148,3 +144,13 @@ func (l *Labeler) LabelsFor(ctx context.Context, cluster logicalcluster.Name, gr

return labels, nil
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(apiExportInformer apisv1alpha1informers.APIExportClusterInformer) {
indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})
}
40 changes: 25 additions & 15 deletions pkg/reconciler/apis/apibinding/apibinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,6 @@ func NewController(

logger := logging.WithReconciler(klog.Background(), ControllerName)

// APIBinding indexers
indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{
indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport,
})

// APIExport indexers
indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc,
})
indexers.AddIfNotPresentOrDie(globalAPIExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc,
})

// APIBinding handlers
_, _ = apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueAPIBinding(objOrTombstone[*apisv1alpha1.APIBinding](obj), logger, "") },
Expand Down Expand Up @@ -484,3 +469,28 @@ func (c *controller) process(ctx context.Context, key string) (bool, error) {

return requeue, utilerrors.NewAggregate(errs)
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(
apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer,
apiExportInformer apisv1alpha1informers.APIExportClusterInformer,
globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer,
) {
// APIBinding indexers
indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{
indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport,
})

// APIExport indexers
indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc,
})
indexers.AddIfNotPresentOrDie(globalAPIExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc,
})
}
22 changes: 14 additions & 8 deletions pkg/reconciler/apis/apiexport/apiexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,6 @@ func NewController(
commit: committer.NewCommitter[*APIExport, Patcher, *APIExportSpec, *APIExportStatus](kcpClusterClient.ApisV1alpha1().APIExports()),
}

indexers.AddIfNotPresentOrDie(
apiExportInformer.Informer().GetIndexer(),
cache.Indexers{
indexers.APIExportByIdentity: indexers.IndexAPIExportByIdentity,
indexers.APIExportBySecret: indexers.IndexAPIExportBySecret,
},
)

_, _ = apiExportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueAPIExport(obj.(*apisv1alpha1.APIExport))
Expand Down Expand Up @@ -338,3 +330,17 @@ func (c *controller) process(ctx context.Context, key string) error {

return utilerrors.NewAggregate(errs)
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(apiExportInformer apisv1alpha1informers.APIExportClusterInformer) {
indexers.AddIfNotPresentOrDie(
apiExportInformer.Informer().GetIndexer(),
cache.Indexers{
indexers.APIExportByIdentity: indexers.IndexAPIExportByIdentity,
indexers.APIExportBySecret: indexers.IndexAPIExportBySecret,
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,6 @@ func NewController(
commit: committer.NewCommitter[*APIExportEndpointSlice, Patcher, *APIExportEndpointSliceSpec, *APIExportEndpointSliceStatus](kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices()),
}

indexers.AddIfNotPresentOrDie(globalAPIExportClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexAPIExportEndpointSliceByAPIExport: indexAPIExportEndpointSliceByAPIExportFunc,
})

_, _ = apiExportEndpointSliceClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueAPIExportEndpointSlice(obj)
Expand Down Expand Up @@ -158,10 +150,6 @@ func NewController(
},
)

indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc,
})

return c, nil
}

Expand Down Expand Up @@ -389,3 +377,21 @@ func filterShardEvent(oldObj, newObj interface{}) bool {
}
return false
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(globalAPIExportClusterInformer apisinformers.APIExportClusterInformer, apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer) {
indexers.AddIfNotPresentOrDie(globalAPIExportClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexAPIExportEndpointSliceByAPIExport: indexAPIExportEndpointSliceByAPIExportFunc,
})

indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc,
})
}
20 changes: 13 additions & 7 deletions pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ func NewController(
},
}

indexers.AddIfNotPresentOrDie(
apiBindingInformer.Informer().GetIndexer(),
cache.Indexers{
indexers.APIBindingByBoundResourceUID: indexers.IndexAPIBindingByBoundResourceUID,
},
)

_, _ = crdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
crd := obj.(*apiextensionsv1.CustomResourceDefinition)
Expand Down Expand Up @@ -252,3 +245,16 @@ func (c *controller) process(ctx context.Context, key string) error {

return nil
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer) {
indexers.AddIfNotPresentOrDie(
apiBindingInformer.Informer().GetIndexer(),
cache.Indexers{
indexers.APIBindingByBoundResourceUID: indexers.IndexAPIBindingByBoundResourceUID,
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,6 @@ func NewController(

logger := logging.WithReconciler(klog.Background(), ControllerName)

indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{
indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport,
})

_, _ = apiExportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueAPIExport(obj, logger) },
UpdateFunc: func(_, obj interface{}) { c.enqueueAPIExport(obj, logger) },
Expand Down Expand Up @@ -303,3 +295,17 @@ func syncExtraAnnotationPatch(a1, a2 map[string]string) ([]byte, error) {

return json.Marshal(patch)
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(apiExportInformer apisinformers.APIExportClusterInformer, apiBindingInformer apisinformers.APIBindingClusterInformer) {
indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{
indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport,
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ func NewController(
commit: committer.NewCommitter[*APIBinding, Patcher, *APIBindingSpec, *APIBindingStatus](kcpClusterClient.ApisV1alpha1().APIBindings()),
}

indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

_, _ = apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueAPIBinding(obj, logger) },
UpdateFunc: func(_, newObj interface{}) {
Expand Down Expand Up @@ -215,3 +211,21 @@ func (c *controller) process(ctx context.Context, key string) error {

return utilerrors.NewAggregate(errs)
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(apiExportInformer apisv1alpha1informers.APIExportClusterInformer, apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer) {
indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

if err := apiBindingInformer.Informer().GetIndexer().AddIndexers(
cache.Indexers{
indexers.APIBindingByClusterAndAcceptedClaimedGroupResources: indexers.IndexAPIBindingByClusterAndAcceptedClaimedGroupResources,
},
); err != nil {
panic(err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/permissionclaim"
Expand All @@ -55,14 +53,6 @@ func NewResourceController(
apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer,
apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer,
) (*resourceController, error) {
if err := apiBindingInformer.Informer().GetIndexer().AddIndexers(
cache.Indexers{
indexers.APIBindingByClusterAndAcceptedClaimedGroupResources: indexers.IndexAPIBindingByClusterAndAcceptedClaimedGroupResources,
},
); err != nil {
return nil, err
}

c := &resourceController{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ResourceControllerName),
kcpClusterClient: kcpClusterClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@ func NewController(
commit: committer.NewStatuslessCommitter[*rbacv1.ClusterRoleBinding, rbacclientv1.ClusterRoleBindingInterface](kubeClusterClient.RbacV1().ClusterRoleBindings(), committer.ShallowCopy[rbacv1.ClusterRoleBinding]),
}

indexers.AddIfNotPresentOrDie(clusterRoleBindingInformer.Informer().GetIndexer(), cache.Indexers{
labelclusterroles.ClusterRoleBindingByClusterRoleName: labelclusterroles.IndexClusterRoleBindingByClusterRoleName,
})

_, _ = clusterRoleBindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: replication.IsNoSystemClusterName,
Handler: cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -278,3 +274,13 @@ func (c *controller) process(ctx context.Context, key string) (bool, error) {

return requeue, utilerrors.NewAggregate(errs)
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(clusterRoleBindingInformer kcprbacinformers.ClusterRoleBindingClusterInformer) {
indexers.AddIfNotPresentOrDie(clusterRoleBindingInformer.Informer().GetIndexer(), cache.Indexers{
labelclusterroles.ClusterRoleBindingByClusterRoleName: labelclusterroles.IndexClusterRoleBindingByClusterRoleName,
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ func NewController(
commit: committer.NewStatuslessCommitter[*rbacv1.ClusterRole, rbacclientv1.ClusterRoleInterface](kubeClusterClient.RbacV1().ClusterRoles(), committer.ShallowCopy[rbacv1.ClusterRole]),
}

indexers.AddIfNotPresentOrDie(clusterRoleBindingInformer.Informer().GetIndexer(), cache.Indexers{
ClusterRoleBindingByClusterRoleName: IndexClusterRoleBindingByClusterRoleName,
})

_, _ = clusterRoleInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: replication.IsNoSystemClusterName,
Handler: cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -277,3 +273,13 @@ func (c *controller) process(ctx context.Context, key string) (bool, error) {

return requeue, utilerrors.NewAggregate(errs)
}

/*
installIndexers method is added, as we are re-electing the controllers
as the informers are always running, we cannot call this in multiple places.
*/
func InstallIndexers(clusterRoleBindingInformer kcprbacinformers.ClusterRoleBindingClusterInformer) {
indexers.AddIfNotPresentOrDie(clusterRoleBindingInformer.Informer().GetIndexer(), cache.Indexers{
ClusterRoleBindingByClusterRoleName: IndexClusterRoleBindingByClusterRoleName,
})
}
Loading

0 comments on commit 02c2045

Please sign in to comment.