Skip to content

Commit

Permalink
aggregator: split availability controller into local and remote part
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <[email protected]>

Kubernetes-commit: 834cd7ca4a1c08b5d32d5e2da377310764f2c11c
  • Loading branch information
sttts authored and k8s-publishing-bot committed Jul 17, 2024
1 parent 006e6b9 commit 87f8e9e
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 35 deletions.
54 changes: 34 additions & 20 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ import (
openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3"
openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
localavailability "k8s.io/kube-aggregator/pkg/controllers/status/local"
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status/remote"
remoteavailability "k8s.io/kube-aggregator/pkg/controllers/status/remote"
apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
openapicommon "k8s.io/kube-openapi/pkg/common"
)
Expand Down Expand Up @@ -102,12 +103,11 @@ type ExtraConfig struct {

RejectForwardingRedirects bool

// DisableAvailableConditionController disables the controller that updates the Available conditions for
// APIServices, Endpoints and Services. This controller runs in kube-aggregator and can interfere with
// Generic Control Plane components when certain apis are not available.
// TODO: We should find a better way to handle this. For now it will be for Generic Control Plane authors to
// disable this controller if they see issues.
DisableAvailableConditionController bool
// DisableRemoteAvailableConditionController disables the controller that updates the Available conditions for
// remote APIServices via querying endpoints of the referenced services. In generic controlplane use-cases,
// the concept of services and endpoints might differ, and might require another implementation of this
// controller. Local APIService are reconciled nevertheless.
DisableRemoteAvailableConditionController bool
}

// Config represents the configuration needed to create an APIAggregator.
Expand Down Expand Up @@ -320,6 +320,12 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
})
}

s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.Done())
c.GenericConfig.SharedInformerFactory.Start(context.Done())
return nil
})

// create shared (remote and local) availability metrics
// TODO: decouple from legacyregistry
metrics := availabilitymetrics.New()
Expand All @@ -328,10 +334,25 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil, err
}

// If the AvailableConditionController is disabled, we don't need to start the informers
// and the controller.
if !c.ExtraConfig.DisableAvailableConditionController {
availableController, err := statuscontrollers.NewAvailableConditionController(
// always run local availability controller
local, err := localavailability.New(
informerFactory.Apiregistration().V1().APIServices(),
apiregistrationClient.ApiregistrationV1(),
metrics,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-local-available-controller", func(context genericapiserver.PostStartHookContext) error {
// if we end up blocking for long periods of time, we may need to increase workers.
go local.Run(5, context.Done())
return nil
})

// conditionally run remote availability controller. This could be replaced in certain
// generic controlplane use-cases where there is another concept of services and/or endpoints.
if !c.ExtraConfig.DisableRemoteAvailableConditionController {
remote, err := remoteavailability.New(
informerFactory.Apiregistration().V1().APIServices(),
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
Expand All @@ -344,16 +365,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
if err != nil {
return nil, err
}

s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.Done())
c.GenericConfig.SharedInformerFactory.Start(context.Done())
return nil
})

s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-remote-available-controller", func(context genericapiserver.PostStartHookContext) error {
// if we end up blocking for long periods of time, we may need to increase workers.
go availableController.Run(5, context.Done())
go remote.Run(5, context.Done())
return nil
})
}
Expand Down
227 changes: 227 additions & 0 deletions pkg/controllers/status/local/local_available_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package external

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers"
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
)

// AvailableConditionController handles checking the availability of registered local API services.
type AvailableConditionController struct {
apiServiceClient apiregistrationclient.APIServicesGetter

apiServiceLister listers.APIServiceLister
apiServiceSynced cache.InformerSynced

// To allow injection for testing.
syncFn func(key string) error

queue workqueue.TypedRateLimitingInterface[string]

// metrics registered into legacy registry
metrics *availabilitymetrics.Metrics
}

// New returns a new local availability AvailableConditionController.
func New(
apiServiceInformer informers.APIServiceInformer,
apiServiceClient apiregistrationclient.APIServicesGetter,
metrics *availabilitymetrics.Metrics,
) (*AvailableConditionController, error) {
c := &AvailableConditionController{
apiServiceClient: apiServiceClient,
apiServiceLister: apiServiceInformer.Lister(),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "LocalAvailabilityController"},
),
metrics: metrics,
}

// resync on this one because it is low cardinality and rechecking the actual discovery
// allows us to detect health in a more timely fashion when network connectivity to
// nodes is snipped, but the network still attempts to route there. See
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addAPIService,
UpdateFunc: c.updateAPIService,
DeleteFunc: c.deleteAPIService,
},
30*time.Second)
c.apiServiceSynced = apiServiceHandler.HasSynced

c.syncFn = c.sync

return c, nil
}

func (c *AvailableConditionController) sync(key string) error {
originalAPIService, err := c.apiServiceLister.Get(key)
if apierrors.IsNotFound(err) {
c.metrics.ForgetAPIService(key)
return nil
}
if err != nil {
return err
}

if originalAPIService.Spec.Service != nil {
// this controller only handles local APIServices
return nil
}

// local API services are always considered available
apiService := originalAPIService.DeepCopy()
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
_, err = c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}

// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
// apiservices. Doing that means we don't want to quickly issue no-op updates.
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
// update this metric on every sync operation to reflect the actual state
c.metrics.SetUnavailableGauge(newAPIService)

if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
return newAPIService, nil
}

orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available)
now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available)
unknown := apiregistrationv1.APIServiceCondition{
Type: apiregistrationv1.Available,
Status: apiregistrationv1.ConditionUnknown,
}
if orig == nil {
orig = &unknown
}
if now == nil {
now = &unknown
}
if *orig != *now {
klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason)
}

newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
if err != nil {
return nil, err
}

c.metrics.SetUnavailableCounter(originalAPIService, newAPIService)
return newAPIService, nil
}

// Run starts the AvailableConditionController loop which manages the availability condition of API services.
func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Info("Starting LocalAvailability controller")
defer klog.Info("Shutting down LocalAvailability controller")

// This waits not just for the informers to sync, but for our handlers
// to be called; since the handlers are three different ways of
// enqueueing the same thing, waiting for this permits the queue to
// maximally de-duplicate the entries.
if !controllers.WaitForCacheSync("LocalAvailability", stopCh, c.apiServiceSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

<-stopCh
}

func (c *AvailableConditionController) runWorker() {
for c.processNextWorkItem() {
}
}

// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *AvailableConditionController) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

err := c.syncFn(key)
if err == nil {
c.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with: %w", key, err))
c.queue.AddRateLimited(key)

return true
}

func (c *AvailableConditionController) addAPIService(obj interface{}) {
castObj := obj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Adding %s", castObj.Name)
c.queue.Add(castObj.Name)
}

func (c *AvailableConditionController) updateAPIService(oldObj, _ interface{}) {
oldCastObj := oldObj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Updating %s", oldCastObj.Name)
c.queue.Add(oldCastObj.Name)
}

func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
castObj, ok := obj.(*apiregistrationv1.APIService)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService)
if !ok {
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
return
}
}
klog.V(4).Infof("Deleting %q", castObj.Name)
c.queue.Add(castObj.Name)
}
Loading

0 comments on commit 87f8e9e

Please sign in to comment.