Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1460 from zqzten/cache_sync_timeout
Browse files Browse the repository at this point in the history
feat: introduce informer cache sync timeout
  • Loading branch information
k8s-ci-robot authored Oct 21, 2021
2 parents 5b1f868 + bfd7751 commit 224fe95
Show file tree
Hide file tree
Showing 20 changed files with 84 additions and 30 deletions.
1 change: 1 addition & 0 deletions charts/kubefed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ chart and their default values.
| controllermanager.featureGates.SchedulerPreferences | Scheduler preferences feature. | true |
| controllermanager.clusterAvailableDelay | Time to wait before reconciling on a healthy cluster. | 20s |
| controllermanager.clusterUnavailableDelay | Time to wait before giving up on an unhealthy cluster. | 60s |
| controllermanager.cacheSyncTimeout | Time to wait for all caches to sync before exit. | 5m |
| controllermanager.leaderElectLeaseDuration | The maximum duration that a leader can be stopped before it is replaced by another candidate. | 15s |
| controllermanager.leaderElectRenewDeadline | The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to `controllermanager.LeaderElectLeaseDuration. | 10s |
| controllermanager.leaderElectRetryPeriod | The duration the clients should wait between attempting acquisition and renewal of a leadership. | 5s |
Expand Down
3 changes: 3 additions & 0 deletions charts/kubefed/charts/controllermanager/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,9 @@ spec:
availableDelay:
description: Time to wait before reconciling on a healthy cluster.
type: string
cacheSyncTimeout:
description: Time to wait for all caches to sync before exit.
type: string
unavailableDelay:
description: Time to wait before giving up on an unhealthy cluster.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ spec:
controllerDuration:
availableDelay: {{ .Values.clusterAvailableDelay | default "20s" | quote }}
unavailableDelay: {{ .Values.clusterUnavailableDelay | default "60s" | quote }}
cacheSyncTimeout: {{ .Values.cacheSyncTimeout | default "5m" | quote }}
leaderElect:
leaseDuration: {{ .Values.leaderElectLeaseDuration | default "15s" | quote }}
renewDeadline: {{ .Values.leaderElectRenewDeadline | default "10s" | quote }}
Expand Down
1 change: 1 addition & 0 deletions charts/kubefed/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ controllermanager:
enabled: true
clusterAvailableDelay:
clusterUnavailableDelay:
cacheSyncTimeout:
leaderElectLeaseDuration:
leaderElectRenewDeadline:
leaderElectRetryPeriod:
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/controller-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func setOptionsByKubeFedConfig(opts *options.Options) {

opts.Config.ClusterAvailableDelay = spec.ControllerDuration.AvailableDelay.Duration
opts.Config.ClusterUnavailableDelay = spec.ControllerDuration.UnavailableDelay.Duration
opts.Config.CacheSyncTimeout = spec.ControllerDuration.CacheSyncTimeout.Duration

opts.LeaderElection.ResourceLock = *spec.LeaderElect.ResourceLock
opts.LeaderElection.RetryPeriod = spec.LeaderElect.RetryPeriod.Duration
Expand Down
1 change: 1 addition & 0 deletions config/kubefedconfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ spec:
controllerDuration:
availableDelay: 20s
unavailableDelay: 60s
cacheSyncTimeout: 5m
leaderElect:
leaseDuration: 1500ms
renewDeadline: 1000ms
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/core/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/apis/core/v1beta1/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
const (
DefaultClusterAvailableDelay = 20 * time.Second
DefaultClusterUnavailableDelay = 60 * time.Second
DefaultCacheSyncTimeout = 5 * time.Minute

DefaultLeaderElectionLeaseDuration = 15 * time.Second
DefaultLeaderElectionRenewDeadline = 10 * time.Second
Expand All @@ -54,6 +55,7 @@ func SetDefaultKubeFedConfig(fedConfig *v1beta1.KubeFedConfig) {
duration := spec.ControllerDuration
setDuration(&duration.AvailableDelay, DefaultClusterAvailableDelay)
setDuration(&duration.UnavailableDelay, DefaultClusterUnavailableDelay)
setDuration(&duration.CacheSyncTimeout, DefaultCacheSyncTimeout)

if spec.LeaderElect == nil {
spec.LeaderElect = &v1beta1.LeaderElectConfig{}
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/core/v1beta1/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func TestSetDefaultKubeFedConfig(t *testing.T) {
SetDefaultKubeFedConfig(modifiedUnavailableDelayKFC)
successCases["spec.controllerDuration.unavailableDelay is preserved"] = KubeFedConfigComparison{unavailableDelayKFC, modifiedUnavailableDelayKFC}

cacheSyncTimeoutKFC := defaultKubeFedConfig()
cacheSyncTimeoutKFC.Spec.ControllerDuration.CacheSyncTimeout.Duration = DefaultCacheSyncTimeout + 31*time.Second
modifiedCacheSyncTimeoutKFC := cacheSyncTimeoutKFC.DeepCopyObject().(*v1beta1.KubeFedConfig)
SetDefaultKubeFedConfig(modifiedCacheSyncTimeoutKFC)
successCases["spec.controllerDuration.cacheSyncTimeout is preserved"] = KubeFedConfigComparison{cacheSyncTimeoutKFC, modifiedCacheSyncTimeoutKFC}

// LeaderElect
leaseDurationKFC := defaultKubeFedConfig()
leaseDurationKFC.Spec.LeaderElect.LeaseDuration.Duration = DefaultLeaderElectionLeaseDuration + 11*time.Second
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/core/v1beta1/kubefedconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type DurationConfig struct {
// Time to wait before giving up on an unhealthy cluster.
// +optional
UnavailableDelay *metav1.Duration `json:"unavailableDelay,omitempty"`
// Time to wait for all caches to sync before exit.
// +optional
CacheSyncTimeout *metav1.Duration `json:"cacheSyncTimeout,omitempty"`
}
type LeaderElectConfig struct {
// The duration that non-leader candidates will wait after observing a leadership
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/core/v1beta1/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func ValidateKubeFedConfig(kubeFedConfig, oldKubeFedConfig *v1beta1.KubeFedConfi
} else {
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("availableDelay"), duration.AvailableDelay)...)
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("unavailableDelay"), duration.UnavailableDelay)...)
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("cacheSyncTimeout"), duration.CacheSyncTimeout)...)
}

elect := spec.LeaderElect
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/core/v1beta1/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,14 @@ func TestValidateKubeFedConfig(t *testing.T) {
invalidUnavailableDelayGreaterThan0.Spec.ControllerDuration.UnavailableDelay.Duration = 0
errorCases["spec.controllerDuration.unavailableDelay: Invalid value"] = invalidUnavailableDelayGreaterThan0

invalidCacheSyncTimeoutNil := testcommon.ValidKubeFedConfig()
invalidCacheSyncTimeoutNil.Spec.ControllerDuration.CacheSyncTimeout = nil
errorCases["spec.controllerDuration.cacheSyncTimeout: Required value"] = invalidCacheSyncTimeoutNil

invalidCacheSyncTimeoutGreaterThan0 := testcommon.ValidKubeFedConfig()
invalidCacheSyncTimeoutGreaterThan0.Spec.ControllerDuration.CacheSyncTimeout.Duration = 0
errorCases["spec.controllerDuration.cacheSyncTimeout: Invalid value"] = invalidCacheSyncTimeoutGreaterThan0

invalidLeaderElectNil := testcommon.ValidKubeFedConfig()
invalidLeaderElectNil.Spec.LeaderElect = nil
errorCases["spec.leaderElect: Required value"] = invalidLeaderElectNil
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/core/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions pkg/controller/status/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -70,6 +72,8 @@ type KubeFedStatusController struct {
clusterUnavailableDelay time.Duration
smallDelay time.Duration

cacheSyncTimeout time.Duration

typeConfig typeconfig.Interface

client genericclient.Client
Expand Down Expand Up @@ -116,6 +120,7 @@ func newKubeFedStatusController(controllerConfig *util.ControllerConfig, typeCon
clusterAvailableDelay: controllerConfig.ClusterAvailableDelay,
clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
smallDelay: time.Second * 3,
cacheSyncTimeout: controllerConfig.CacheSyncTimeout,
typeConfig: typeConfig,
client: client,
statusClient: statusClient,
Expand Down Expand Up @@ -195,6 +200,13 @@ func (s *KubeFedStatusController) Run(stopChan <-chan struct{}) {
}()
}

// Wait until all data stores are in sync for a definitive timeout, and returns if there is an error or a timeout.
func (s *KubeFedStatusController) waitForSync() error {
return wait.PollImmediate(util.SyncedPollPeriod, s.cacheSyncTimeout, func() (bool, error) {
return s.isSynced(), nil
})
}

// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (s *KubeFedStatusController) isSynced() bool {
Expand All @@ -217,6 +229,7 @@ func (s *KubeFedStatusController) isSynced() bool {
return false
}
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
klog.V(2).Info("Target clusters' informers not synced")
return false
}
return true
Expand All @@ -234,10 +247,8 @@ func (s *KubeFedStatusController) reconcileOnClusterChange() {
}

func (s *KubeFedStatusController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
defer metrics.UpdateControllerReconcileDurationFromStart("statuscontroller", time.Now())

if !s.isSynced() {
return util.StatusNotSynced
if err := s.waitForSync(); err != nil {
klog.Fatalf("failed to wait for all data stores to sync: %v", err)
}

federatedKind := s.typeConfig.GetFederatedType().Kind
Expand All @@ -248,6 +259,7 @@ func (s *KubeFedStatusController) reconcile(qualifiedName util.QualifiedName) ut
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished reconciling %v %v (duration: %v)", statusKind, key, time.Since(startTime))
metrics.UpdateControllerReconcileDurationFromStart("statuscontroller", startTime)
}()

fedObject, err := s.objFromCache(s.federatedStore, federatedKind, key)
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type KubeFedSyncController struct {
clusterUnavailableDelay time.Duration
smallDelay time.Duration

cacheSyncTimeout time.Duration

typeConfig typeconfig.Interface

fedAccessor FederatedResourceAccessor
Expand Down Expand Up @@ -123,6 +125,7 @@ func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfi
clusterAvailableDelay: controllerConfig.ClusterAvailableDelay,
clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
smallDelay: time.Second * 3,
cacheSyncTimeout: controllerConfig.CacheSyncTimeout,
eventRecorder: recorder,
typeConfig: typeConfig,
hostClusterClient: client,
Expand Down Expand Up @@ -203,6 +206,13 @@ func (s *KubeFedSyncController) Run(stopChan <-chan struct{}) {
}()
}

// Wait until all data stores are in sync for a definitive timeout, and returns if there is an error or a timeout.
func (s *KubeFedSyncController) waitForSync() error {
return wait.PollImmediate(util.SyncedPollPeriod, s.cacheSyncTimeout, func() (bool, error) {
return s.isSynced(), nil
})
}

// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (s *KubeFedSyncController) isSynced() bool {
Expand All @@ -223,6 +233,7 @@ func (s *KubeFedSyncController) isSynced() bool {
return false
}
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
klog.V(2).Info("Target clusters' informers not synced")
return false
}
return true
Expand All @@ -240,8 +251,8 @@ func (s *KubeFedSyncController) reconcileOnClusterChange() {
}

func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
if !s.isSynced() {
return util.StatusNotSynced
if err := s.waitForSync(); err != nil {
klog.Fatalf("failed to wait for all data stores to sync: %v", err)
}

kind := s.typeConfig.GetFederatedType().Kind
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"time"
)

// Providing 0 duration to an informer indicates that resync should be delayed as long as possible
const (
NoResyncPeriod time.Duration = 0 * time.Second
// Providing 0 duration to an informer indicates that resync should be delayed as long as possible
NoResyncPeriod = 0 * time.Second

SyncedPollPeriod = 10 * time.Second

NamespaceName = "namespaces"
NamespaceKind = "Namespace"
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/util/controllerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type ControllerConfig struct {
ClusterAvailableDelay time.Duration
ClusterUnavailableDelay time.Duration
MinimizeLatency bool
CacheSyncTimeout time.Duration
MaxConcurrentSyncReconciles int64
MaxConcurrentStatusReconciles int64
SkipAdoptingResources bool
Expand Down
35 changes: 13 additions & 22 deletions pkg/controller/util/federated_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,33 +534,24 @@ func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string {
return key
}

// Checks whether stores for all clusters form the lists (and only these) are there and
// ClustersSynced checks whether stores for all clusters form the lists (and only these) are there and
// are synced.
func (fs *federatedStoreImpl) ClustersSynced(clusters []*fedv1b1.KubeFedCluster) bool {
// Get the list of informers to check under a lock and check it outside.
okSoFar, informersToCheck := func() (bool, []informer) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()

if len(fs.federatedInformer.targetInformers) != len(clusters) {
return false, []informer{}
}
informersToCheck := make([]informer, 0, len(clusters))
for _, cluster := range clusters {
if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
informersToCheck = append(informersToCheck, targetInformer)
} else {
return false, []informer{}
}
}
return true, informersToCheck
}()
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()

if !okSoFar {
if len(fs.federatedInformer.targetInformers) != len(clusters) {
klog.V(4).Infof("The number of target informers mismatch with given clusters")
return false
}
for _, informerToCheck := range informersToCheck {
if !informerToCheck.controller.HasSynced() {
for _, cluster := range clusters {
if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
if !targetInformer.controller.HasSynced() {
klog.V(4).Infof("Informer of cluster %q not synced", cluster.Name)
return false
}
} else {
klog.V(4).Infof("Informer of cluster %q not found", cluster.Name)
return false
}
}
Expand Down
1 change: 1 addition & 0 deletions test/common/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 224fe95

Please sign in to comment.