Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

feat: introduce informer cache sync timeout #1460

Merged
merged 4 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/kubefed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ chart and their default values.
| controllermanager.featureGates.SchedulerPreferences | Scheduler preferences feature. | true |
| controllermanager.clusterAvailableDelay | Time to wait before reconciling on a healthy cluster. | 20s |
| controllermanager.clusterUnavailableDelay | Time to wait before giving up on an unhealthy cluster. | 60s |
| controllermanager.cacheSyncTimeout | Time to wait for all caches to sync before exit. | 5m |
| controllermanager.leaderElectLeaseDuration | The maximum duration that a leader can be stopped before it is replaced by another candidate. | 15s |
| controllermanager.leaderElectRenewDeadline | The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to `controllermanager.LeaderElectLeaseDuration. | 10s |
| controllermanager.leaderElectRetryPeriod | The duration the clients should wait between attempting acquisition and renewal of a leadership. | 5s |
Expand Down
3 changes: 3 additions & 0 deletions charts/kubefed/charts/controllermanager/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,9 @@ spec:
availableDelay:
description: Time to wait before reconciling on a healthy cluster.
type: string
cacheSyncTimeout:
description: Time to wait for all caches to sync before exit.
type: string
unavailableDelay:
description: Time to wait before giving up on an unhealthy cluster.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ spec:
controllerDuration:
availableDelay: {{ .Values.clusterAvailableDelay | default "20s" | quote }}
unavailableDelay: {{ .Values.clusterUnavailableDelay | default "60s" | quote }}
cacheSyncTimeout: {{ .Values.cacheSyncTimeout | default "5m" | quote }}
leaderElect:
leaseDuration: {{ .Values.leaderElectLeaseDuration | default "15s" | quote }}
renewDeadline: {{ .Values.leaderElectRenewDeadline | default "10s" | quote }}
Expand Down
1 change: 1 addition & 0 deletions charts/kubefed/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ controllermanager:
enabled: true
clusterAvailableDelay:
clusterUnavailableDelay:
cacheSyncTimeout:
leaderElectLeaseDuration:
leaderElectRenewDeadline:
leaderElectRetryPeriod:
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/controller-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func setOptionsByKubeFedConfig(opts *options.Options) {

opts.Config.ClusterAvailableDelay = spec.ControllerDuration.AvailableDelay.Duration
opts.Config.ClusterUnavailableDelay = spec.ControllerDuration.UnavailableDelay.Duration
opts.Config.CacheSyncTimeout = spec.ControllerDuration.CacheSyncTimeout.Duration

opts.LeaderElection.ResourceLock = *spec.LeaderElect.ResourceLock
opts.LeaderElection.RetryPeriod = spec.LeaderElect.RetryPeriod.Duration
Expand Down
1 change: 1 addition & 0 deletions config/kubefedconfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ spec:
controllerDuration:
availableDelay: 20s
unavailableDelay: 60s
cacheSyncTimeout: 5m
zqzten marked this conversation as resolved.
Show resolved Hide resolved
leaderElect:
leaseDuration: 1500ms
renewDeadline: 1000ms
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/core/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/apis/core/v1beta1/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
const (
DefaultClusterAvailableDelay = 20 * time.Second
DefaultClusterUnavailableDelay = 60 * time.Second
DefaultCacheSyncTimeout = 5 * time.Minute

DefaultLeaderElectionLeaseDuration = 15 * time.Second
DefaultLeaderElectionRenewDeadline = 10 * time.Second
Expand All @@ -54,6 +55,7 @@ func SetDefaultKubeFedConfig(fedConfig *v1beta1.KubeFedConfig) {
duration := spec.ControllerDuration
setDuration(&duration.AvailableDelay, DefaultClusterAvailableDelay)
setDuration(&duration.UnavailableDelay, DefaultClusterUnavailableDelay)
setDuration(&duration.CacheSyncTimeout, DefaultCacheSyncTimeout)

if spec.LeaderElect == nil {
spec.LeaderElect = &v1beta1.LeaderElectConfig{}
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/core/v1beta1/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func TestSetDefaultKubeFedConfig(t *testing.T) {
SetDefaultKubeFedConfig(modifiedUnavailableDelayKFC)
successCases["spec.controllerDuration.unavailableDelay is preserved"] = KubeFedConfigComparison{unavailableDelayKFC, modifiedUnavailableDelayKFC}

cacheSyncTimeoutKFC := defaultKubeFedConfig()
cacheSyncTimeoutKFC.Spec.ControllerDuration.CacheSyncTimeout.Duration = DefaultCacheSyncTimeout + 31*time.Second
modifiedCacheSyncTimeoutKFC := cacheSyncTimeoutKFC.DeepCopyObject().(*v1beta1.KubeFedConfig)
SetDefaultKubeFedConfig(modifiedCacheSyncTimeoutKFC)
successCases["spec.controllerDuration.cacheSyncTimeout is preserved"] = KubeFedConfigComparison{cacheSyncTimeoutKFC, modifiedCacheSyncTimeoutKFC}

// LeaderElect
leaseDurationKFC := defaultKubeFedConfig()
leaseDurationKFC.Spec.LeaderElect.LeaseDuration.Duration = DefaultLeaderElectionLeaseDuration + 11*time.Second
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/core/v1beta1/kubefedconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type DurationConfig struct {
// Time to wait before giving up on an unhealthy cluster.
// +optional
UnavailableDelay *metav1.Duration `json:"unavailableDelay,omitempty"`
// Time to wait for all caches to sync before exit.
// +optional
CacheSyncTimeout *metav1.Duration `json:"cacheSyncTimeout,omitempty"`
}
type LeaderElectConfig struct {
// The duration that non-leader candidates will wait after observing a leadership
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/core/v1beta1/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func ValidateKubeFedConfig(kubeFedConfig, oldKubeFedConfig *v1beta1.KubeFedConfi
} else {
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("availableDelay"), duration.AvailableDelay)...)
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("unavailableDelay"), duration.UnavailableDelay)...)
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("cacheSyncTimeout"), duration.CacheSyncTimeout)...)
}

elect := spec.LeaderElect
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/core/v1beta1/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,14 @@ func TestValidateKubeFedConfig(t *testing.T) {
invalidUnavailableDelayGreaterThan0.Spec.ControllerDuration.UnavailableDelay.Duration = 0
errorCases["spec.controllerDuration.unavailableDelay: Invalid value"] = invalidUnavailableDelayGreaterThan0

invalidCacheSyncTimeoutNil := testcommon.ValidKubeFedConfig()
invalidCacheSyncTimeoutNil.Spec.ControllerDuration.CacheSyncTimeout = nil
errorCases["spec.controllerDuration.cacheSyncTimeout: Required value"] = invalidCacheSyncTimeoutNil

invalidCacheSyncTimeoutGreaterThan0 := testcommon.ValidKubeFedConfig()
invalidCacheSyncTimeoutGreaterThan0.Spec.ControllerDuration.CacheSyncTimeout.Duration = 0
errorCases["spec.controllerDuration.cacheSyncTimeout: Invalid value"] = invalidCacheSyncTimeoutGreaterThan0

invalidLeaderElectNil := testcommon.ValidKubeFedConfig()
invalidLeaderElectNil.Spec.LeaderElect = nil
errorCases["spec.leaderElect: Required value"] = invalidLeaderElectNil
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/core/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions pkg/controller/status/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -70,6 +72,8 @@ type KubeFedStatusController struct {
clusterUnavailableDelay time.Duration
smallDelay time.Duration

cacheSyncTimeout time.Duration

typeConfig typeconfig.Interface

client genericclient.Client
Expand Down Expand Up @@ -116,6 +120,7 @@ func newKubeFedStatusController(controllerConfig *util.ControllerConfig, typeCon
clusterAvailableDelay: controllerConfig.ClusterAvailableDelay,
clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
smallDelay: time.Second * 3,
cacheSyncTimeout: controllerConfig.CacheSyncTimeout,
typeConfig: typeConfig,
client: client,
statusClient: statusClient,
Expand Down Expand Up @@ -195,6 +200,13 @@ func (s *KubeFedStatusController) Run(stopChan <-chan struct{}) {
}()
}

// Wait until all data stores are in sync for a definitive timeout, and returns if there is an error or a timeout.
func (s *KubeFedStatusController) waitForSync() error {
return wait.PollImmediate(util.SyncedPollPeriod, s.cacheSyncTimeout, func() (bool, error) {
return s.isSynced(), nil
})
}

// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (s *KubeFedStatusController) isSynced() bool {
Expand All @@ -217,6 +229,7 @@ func (s *KubeFedStatusController) isSynced() bool {
return false
}
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
klog.V(2).Info("Target clusters' informers not synced")
return false
}
return true
Expand All @@ -234,10 +247,8 @@ func (s *KubeFedStatusController) reconcileOnClusterChange() {
}

func (s *KubeFedStatusController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
defer metrics.UpdateControllerReconcileDurationFromStart("statuscontroller", time.Now())

if !s.isSynced() {
return util.StatusNotSynced
if err := s.waitForSync(); err != nil {
klog.Fatalf("failed to wait for all data stores to sync: %v", err)
}

federatedKind := s.typeConfig.GetFederatedType().Kind
Expand All @@ -248,6 +259,7 @@ func (s *KubeFedStatusController) reconcile(qualifiedName util.QualifiedName) ut
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished reconciling %v %v (duration: %v)", statusKind, key, time.Since(startTime))
metrics.UpdateControllerReconcileDurationFromStart("statuscontroller", startTime)
}()

fedObject, err := s.objFromCache(s.federatedStore, federatedKind, key)
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type KubeFedSyncController struct {
clusterUnavailableDelay time.Duration
smallDelay time.Duration

cacheSyncTimeout time.Duration

typeConfig typeconfig.Interface

fedAccessor FederatedResourceAccessor
Expand Down Expand Up @@ -123,6 +125,7 @@ func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfi
clusterAvailableDelay: controllerConfig.ClusterAvailableDelay,
clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
smallDelay: time.Second * 3,
cacheSyncTimeout: controllerConfig.CacheSyncTimeout,
eventRecorder: recorder,
typeConfig: typeConfig,
hostClusterClient: client,
Expand Down Expand Up @@ -203,6 +206,13 @@ func (s *KubeFedSyncController) Run(stopChan <-chan struct{}) {
}()
}

// Wait until all data stores are in sync for a definitive timeout, and returns if there is an error or a timeout.
func (s *KubeFedSyncController) waitForSync() error {
return wait.PollImmediate(util.SyncedPollPeriod, s.cacheSyncTimeout, func() (bool, error) {
return s.isSynced(), nil
})
}

// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (s *KubeFedSyncController) isSynced() bool {
Expand All @@ -223,6 +233,7 @@ func (s *KubeFedSyncController) isSynced() bool {
return false
}
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
klog.V(2).Info("Target clusters' informers not synced")
return false
}
return true
Expand All @@ -240,8 +251,8 @@ func (s *KubeFedSyncController) reconcileOnClusterChange() {
}

func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
if !s.isSynced() {
return util.StatusNotSynced
if err := s.waitForSync(); err != nil {
klog.Fatalf("failed to wait for all data stores to sync: %v", err)
zqzten marked this conversation as resolved.
Show resolved Hide resolved
}

kind := s.typeConfig.GetFederatedType().Kind
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"time"
)

// Providing 0 duration to an informer indicates that resync should be delayed as long as possible
const (
NoResyncPeriod time.Duration = 0 * time.Second
// Providing 0 duration to an informer indicates that resync should be delayed as long as possible
NoResyncPeriod = 0 * time.Second

SyncedPollPeriod = 10 * time.Second

NamespaceName = "namespaces"
NamespaceKind = "Namespace"
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/util/controllerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type ControllerConfig struct {
ClusterAvailableDelay time.Duration
ClusterUnavailableDelay time.Duration
MinimizeLatency bool
CacheSyncTimeout time.Duration
MaxConcurrentSyncReconciles int64
MaxConcurrentStatusReconciles int64
SkipAdoptingResources bool
Expand Down
35 changes: 13 additions & 22 deletions pkg/controller/util/federated_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,33 +534,24 @@ func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string {
return key
}

// Checks whether stores for all clusters form the lists (and only these) are there and
// ClustersSynced checks whether stores for all clusters form the lists (and only these) are there and
// are synced.
func (fs *federatedStoreImpl) ClustersSynced(clusters []*fedv1b1.KubeFedCluster) bool {
// Get the list of informers to check under a lock and check it outside.
okSoFar, informersToCheck := func() (bool, []informer) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()

if len(fs.federatedInformer.targetInformers) != len(clusters) {
return false, []informer{}
}
informersToCheck := make([]informer, 0, len(clusters))
for _, cluster := range clusters {
if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
informersToCheck = append(informersToCheck, targetInformer)
} else {
return false, []informer{}
}
}
return true, informersToCheck
}()
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()

if !okSoFar {
if len(fs.federatedInformer.targetInformers) != len(clusters) {
klog.V(4).Infof("The number of target informers mismatch with given clusters")
return false
}
for _, informerToCheck := range informersToCheck {
if !informerToCheck.controller.HasSynced() {
for _, cluster := range clusters {
if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
if !targetInformer.controller.HasSynced() {
klog.V(4).Infof("Informer of cluster %q not synced", cluster.Name)
return false
}
} else {
klog.V(4).Infof("Informer of cluster %q not found", cluster.Name)
return false
}
}
Expand Down
1 change: 1 addition & 0 deletions test/common/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.