diff --git a/charts/kubefed/README.md b/charts/kubefed/README.md index dad535d16b..2c6f4d497c 100644 --- a/charts/kubefed/README.md +++ b/charts/kubefed/README.md @@ -115,6 +115,7 @@ chart and their default values. | controllermanager.featureGates.SchedulerPreferences | Scheduler preferences feature. | true | | controllermanager.clusterAvailableDelay | Time to wait before reconciling on a healthy cluster. | 20s | | controllermanager.clusterUnavailableDelay | Time to wait before giving up on an unhealthy cluster. | 60s | +| controllermanager.cacheSyncTimeout | Time to wait for all caches to sync before exit. | 5m | | controllermanager.leaderElectLeaseDuration | The maximum duration that a leader can be stopped before it is replaced by another candidate. | 15s | | controllermanager.leaderElectRenewDeadline | The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to `controllermanager.LeaderElectLeaseDuration. | 10s | | controllermanager.leaderElectRetryPeriod | The duration the clients should wait between attempting acquisition and renewal of a leadership. | 5s | diff --git a/charts/kubefed/charts/controllermanager/crds/crds.yaml b/charts/kubefed/charts/controllermanager/crds/crds.yaml index 5cb414c61f..67d3327f2c 100644 --- a/charts/kubefed/charts/controllermanager/crds/crds.yaml +++ b/charts/kubefed/charts/controllermanager/crds/crds.yaml @@ -645,6 +645,9 @@ spec: availableDelay: description: Time to wait before reconciling on a healthy cluster. type: string + cacheSyncTimeout: + description: Time to wait for all caches to sync before exit. + type: string unavailableDelay: description: Time to wait before giving up on an unhealthy cluster. type: string diff --git a/charts/kubefed/charts/controllermanager/templates/kubefedconfig.yaml b/charts/kubefed/charts/controllermanager/templates/kubefedconfig.yaml index f0c3071613..4696da7a88 100644 --- a/charts/kubefed/charts/controllermanager/templates/kubefedconfig.yaml +++ b/charts/kubefed/charts/controllermanager/templates/kubefedconfig.yaml @@ -8,6 +8,7 @@ spec: controllerDuration: availableDelay: {{ .Values.clusterAvailableDelay | default "20s" | quote }} unavailableDelay: {{ .Values.clusterUnavailableDelay | default "60s" | quote }} + cacheSyncTimeout: {{ .Values.cacheSyncTimeout | default "5m" | quote }} leaderElect: leaseDuration: {{ .Values.leaderElectLeaseDuration | default "15s" | quote }} renewDeadline: {{ .Values.leaderElectRenewDeadline | default "10s" | quote }} diff --git a/charts/kubefed/values.yaml b/charts/kubefed/values.yaml index d2c50c1b75..c7362882b3 100644 --- a/charts/kubefed/values.yaml +++ b/charts/kubefed/values.yaml @@ -8,6 +8,7 @@ controllermanager: enabled: true clusterAvailableDelay: clusterUnavailableDelay: + cacheSyncTimeout: leaderElectLeaseDuration: leaderElectRenewDeadline: leaderElectRetryPeriod: diff --git a/cmd/controller-manager/app/controller-manager.go b/cmd/controller-manager/app/controller-manager.go index ab028989a2..f39ce76158 100644 --- a/cmd/controller-manager/app/controller-manager.go +++ b/cmd/controller-manager/app/controller-manager.go @@ -364,6 +364,7 @@ func setOptionsByKubeFedConfig(opts *options.Options) { opts.Config.ClusterAvailableDelay = spec.ControllerDuration.AvailableDelay.Duration opts.Config.ClusterUnavailableDelay = spec.ControllerDuration.UnavailableDelay.Duration + opts.Config.CacheSyncTimeout = spec.ControllerDuration.CacheSyncTimeout.Duration opts.LeaderElection.ResourceLock = *spec.LeaderElect.ResourceLock opts.LeaderElection.RetryPeriod = spec.LeaderElect.RetryPeriod.Duration diff --git a/config/kubefedconfig.yaml b/config/kubefedconfig.yaml index 777022dc2b..9da9eaad64 100644 --- a/config/kubefedconfig.yaml +++ b/config/kubefedconfig.yaml @@ -8,6 +8,7 @@ spec: controllerDuration: availableDelay: 20s unavailableDelay: 60s + cacheSyncTimeout: 5m leaderElect: leaseDuration: 1500ms renewDeadline: 1000ms diff --git a/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go index 36cb36cd28..42636c7f8d 100644 --- a/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/pkg/apis/core/v1beta1/defaults/defaults.go b/pkg/apis/core/v1beta1/defaults/defaults.go index 1abb5fc11f..c5e905a65f 100644 --- a/pkg/apis/core/v1beta1/defaults/defaults.go +++ b/pkg/apis/core/v1beta1/defaults/defaults.go @@ -29,6 +29,7 @@ import ( const ( DefaultClusterAvailableDelay = 20 * time.Second DefaultClusterUnavailableDelay = 60 * time.Second + DefaultCacheSyncTimeout = 5 * time.Minute DefaultLeaderElectionLeaseDuration = 15 * time.Second DefaultLeaderElectionRenewDeadline = 10 * time.Second @@ -54,6 +55,7 @@ func SetDefaultKubeFedConfig(fedConfig *v1beta1.KubeFedConfig) { duration := spec.ControllerDuration setDuration(&duration.AvailableDelay, DefaultClusterAvailableDelay) setDuration(&duration.UnavailableDelay, DefaultClusterUnavailableDelay) + setDuration(&duration.CacheSyncTimeout, DefaultCacheSyncTimeout) if spec.LeaderElect == nil { spec.LeaderElect = &v1beta1.LeaderElectConfig{} diff --git a/pkg/apis/core/v1beta1/defaults/defaults_test.go b/pkg/apis/core/v1beta1/defaults/defaults_test.go index e093be7bdf..10bd3b8fad 100644 --- a/pkg/apis/core/v1beta1/defaults/defaults_test.go +++ b/pkg/apis/core/v1beta1/defaults/defaults_test.go @@ -50,6 +50,12 @@ func TestSetDefaultKubeFedConfig(t *testing.T) { SetDefaultKubeFedConfig(modifiedUnavailableDelayKFC) successCases["spec.controllerDuration.unavailableDelay is preserved"] = KubeFedConfigComparison{unavailableDelayKFC, modifiedUnavailableDelayKFC} + cacheSyncTimeoutKFC := defaultKubeFedConfig() + cacheSyncTimeoutKFC.Spec.ControllerDuration.CacheSyncTimeout.Duration = DefaultCacheSyncTimeout + 31*time.Second + modifiedCacheSyncTimeoutKFC := cacheSyncTimeoutKFC.DeepCopyObject().(*v1beta1.KubeFedConfig) + SetDefaultKubeFedConfig(modifiedCacheSyncTimeoutKFC) + successCases["spec.controllerDuration.cacheSyncTimeout is preserved"] = KubeFedConfigComparison{cacheSyncTimeoutKFC, modifiedCacheSyncTimeoutKFC} + // LeaderElect leaseDurationKFC := defaultKubeFedConfig() leaseDurationKFC.Spec.LeaderElect.LeaseDuration.Duration = DefaultLeaderElectionLeaseDuration + 11*time.Second diff --git a/pkg/apis/core/v1beta1/kubefedconfig_types.go b/pkg/apis/core/v1beta1/kubefedconfig_types.go index 73a201b986..981e68e704 100644 --- a/pkg/apis/core/v1beta1/kubefedconfig_types.go +++ b/pkg/apis/core/v1beta1/kubefedconfig_types.go @@ -48,6 +48,9 @@ type DurationConfig struct { // Time to wait before giving up on an unhealthy cluster. // +optional UnavailableDelay *metav1.Duration `json:"unavailableDelay,omitempty"` + // Time to wait for all caches to sync before exit. + // +optional + CacheSyncTimeout *metav1.Duration `json:"cacheSyncTimeout,omitempty"` } type LeaderElectConfig struct { // The duration that non-leader candidates will wait after observing a leadership diff --git a/pkg/apis/core/v1beta1/validation/validation.go b/pkg/apis/core/v1beta1/validation/validation.go index b63145f9ec..283f0189a9 100644 --- a/pkg/apis/core/v1beta1/validation/validation.go +++ b/pkg/apis/core/v1beta1/validation/validation.go @@ -285,6 +285,7 @@ func ValidateKubeFedConfig(kubeFedConfig, oldKubeFedConfig *v1beta1.KubeFedConfi } else { allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("availableDelay"), duration.AvailableDelay)...) allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("unavailableDelay"), duration.UnavailableDelay)...) + allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("cacheSyncTimeout"), duration.CacheSyncTimeout)...) } elect := spec.LeaderElect diff --git a/pkg/apis/core/v1beta1/validation/validation_test.go b/pkg/apis/core/v1beta1/validation/validation_test.go index 23fa73eb16..26867413d0 100644 --- a/pkg/apis/core/v1beta1/validation/validation_test.go +++ b/pkg/apis/core/v1beta1/validation/validation_test.go @@ -732,6 +732,14 @@ func TestValidateKubeFedConfig(t *testing.T) { invalidUnavailableDelayGreaterThan0.Spec.ControllerDuration.UnavailableDelay.Duration = 0 errorCases["spec.controllerDuration.unavailableDelay: Invalid value"] = invalidUnavailableDelayGreaterThan0 + invalidCacheSyncTimeoutNil := testcommon.ValidKubeFedConfig() + invalidCacheSyncTimeoutNil.Spec.ControllerDuration.CacheSyncTimeout = nil + errorCases["spec.controllerDuration.cacheSyncTimeout: Required value"] = invalidCacheSyncTimeoutNil + + invalidCacheSyncTimeoutGreaterThan0 := testcommon.ValidKubeFedConfig() + invalidCacheSyncTimeoutGreaterThan0.Spec.ControllerDuration.CacheSyncTimeout.Duration = 0 + errorCases["spec.controllerDuration.cacheSyncTimeout: Invalid value"] = invalidCacheSyncTimeoutGreaterThan0 + invalidLeaderElectNil := testcommon.ValidKubeFedConfig() invalidLeaderElectNil.Spec.LeaderElect = nil errorCases["spec.leaderElect: Required value"] = invalidLeaderElectNil diff --git a/pkg/apis/core/v1beta1/zz_generated.deepcopy.go b/pkg/apis/core/v1beta1/zz_generated.deepcopy.go index 013f3a6c77..c1e586fec1 100644 --- a/pkg/apis/core/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/core/v1beta1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* @@ -118,6 +119,11 @@ func (in *DurationConfig) DeepCopyInto(out *DurationConfig) { *out = new(v1.Duration) **out = **in } + if in.CacheSyncTimeout != nil { + in, out := &in.CacheSyncTimeout, &out.CacheSyncTimeout + *out = new(v1.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DurationConfig. diff --git a/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go index bbb678e28e..f7d39351a5 100644 --- a/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/pkg/controller/status/controller.go b/pkg/controller/status/controller.go index 8de57a2467..bf898849e5 100644 --- a/pkg/controller/status/controller.go +++ b/pkg/controller/status/controller.go @@ -24,6 +24,8 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/util/wait" + "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -70,6 +72,8 @@ type KubeFedStatusController struct { clusterUnavailableDelay time.Duration smallDelay time.Duration + cacheSyncTimeout time.Duration + typeConfig typeconfig.Interface client genericclient.Client @@ -116,6 +120,7 @@ func newKubeFedStatusController(controllerConfig *util.ControllerConfig, typeCon clusterAvailableDelay: controllerConfig.ClusterAvailableDelay, clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay, smallDelay: time.Second * 3, + cacheSyncTimeout: controllerConfig.CacheSyncTimeout, typeConfig: typeConfig, client: client, statusClient: statusClient, @@ -195,6 +200,13 @@ func (s *KubeFedStatusController) Run(stopChan <-chan struct{}) { }() } +// Wait until all data stores are in sync for a definitive timeout, and returns if there is an error or a timeout. +func (s *KubeFedStatusController) waitForSync() error { + return wait.PollImmediate(util.SyncedPollPeriod, s.cacheSyncTimeout, func() (bool, error) { + return s.isSynced(), nil + }) +} + // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet // synced with the corresponding api server. func (s *KubeFedStatusController) isSynced() bool { @@ -217,6 +229,7 @@ func (s *KubeFedStatusController) isSynced() bool { return false } if !s.informer.GetTargetStore().ClustersSynced(clusters) { + klog.V(2).Info("Target clusters' informers not synced") return false } return true @@ -234,10 +247,8 @@ func (s *KubeFedStatusController) reconcileOnClusterChange() { } func (s *KubeFedStatusController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus { - defer metrics.UpdateControllerReconcileDurationFromStart("statuscontroller", time.Now()) - - if !s.isSynced() { - return util.StatusNotSynced + if err := s.waitForSync(); err != nil { + klog.Fatalf("failed to wait for all data stores to sync: %v", err) } federatedKind := s.typeConfig.GetFederatedType().Kind @@ -248,6 +259,7 @@ func (s *KubeFedStatusController) reconcile(qualifiedName util.QualifiedName) ut startTime := time.Now() defer func() { klog.V(4).Infof("Finished reconciling %v %v (duration: %v)", statusKind, key, time.Since(startTime)) + metrics.UpdateControllerReconcileDurationFromStart("statuscontroller", startTime) }() fedObject, err := s.objFromCache(s.federatedStore, federatedKind, key) diff --git a/pkg/controller/sync/controller.go b/pkg/controller/sync/controller.go index 4c3cc941f3..8611f7696f 100644 --- a/pkg/controller/sync/controller.go +++ b/pkg/controller/sync/controller.go @@ -79,6 +79,8 @@ type KubeFedSyncController struct { clusterUnavailableDelay time.Duration smallDelay time.Duration + cacheSyncTimeout time.Duration + typeConfig typeconfig.Interface fedAccessor FederatedResourceAccessor @@ -123,6 +125,7 @@ func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfi clusterAvailableDelay: controllerConfig.ClusterAvailableDelay, clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay, smallDelay: time.Second * 3, + cacheSyncTimeout: controllerConfig.CacheSyncTimeout, eventRecorder: recorder, typeConfig: typeConfig, hostClusterClient: client, @@ -203,6 +206,13 @@ func (s *KubeFedSyncController) Run(stopChan <-chan struct{}) { }() } +// Wait until all data stores are in sync for a definitive timeout, and returns if there is an error or a timeout. +func (s *KubeFedSyncController) waitForSync() error { + return wait.PollImmediate(util.SyncedPollPeriod, s.cacheSyncTimeout, func() (bool, error) { + return s.isSynced(), nil + }) +} + // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet // synced with the corresponding api server. func (s *KubeFedSyncController) isSynced() bool { @@ -223,6 +233,7 @@ func (s *KubeFedSyncController) isSynced() bool { return false } if !s.informer.GetTargetStore().ClustersSynced(clusters) { + klog.V(2).Info("Target clusters' informers not synced") return false } return true @@ -240,8 +251,8 @@ func (s *KubeFedSyncController) reconcileOnClusterChange() { } func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus { - if !s.isSynced() { - return util.StatusNotSynced + if err := s.waitForSync(); err != nil { + klog.Fatalf("failed to wait for all data stores to sync: %v", err) } kind := s.typeConfig.GetFederatedType().Kind diff --git a/pkg/controller/util/constants.go b/pkg/controller/util/constants.go index 4c2b7f780d..afffd13582 100644 --- a/pkg/controller/util/constants.go +++ b/pkg/controller/util/constants.go @@ -20,9 +20,11 @@ import ( "time" ) -// Providing 0 duration to an informer indicates that resync should be delayed as long as possible const ( - NoResyncPeriod time.Duration = 0 * time.Second + // Providing 0 duration to an informer indicates that resync should be delayed as long as possible + NoResyncPeriod = 0 * time.Second + + SyncedPollPeriod = 10 * time.Second NamespaceName = "namespaces" NamespaceKind = "Namespace" diff --git a/pkg/controller/util/controllerconfig.go b/pkg/controller/util/controllerconfig.go index 1731711960..4c51cae525 100644 --- a/pkg/controller/util/controllerconfig.go +++ b/pkg/controller/util/controllerconfig.go @@ -72,6 +72,7 @@ type ControllerConfig struct { ClusterAvailableDelay time.Duration ClusterUnavailableDelay time.Duration MinimizeLatency bool + CacheSyncTimeout time.Duration MaxConcurrentSyncReconciles int64 MaxConcurrentStatusReconciles int64 SkipAdoptingResources bool diff --git a/pkg/controller/util/federated_informer.go b/pkg/controller/util/federated_informer.go index b7f3887d81..ff1d25251a 100644 --- a/pkg/controller/util/federated_informer.go +++ b/pkg/controller/util/federated_informer.go @@ -534,33 +534,24 @@ func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string { return key } -// Checks whether stores for all clusters form the lists (and only these) are there and +// ClustersSynced checks whether stores for all clusters form the lists (and only these) are there and // are synced. func (fs *federatedStoreImpl) ClustersSynced(clusters []*fedv1b1.KubeFedCluster) bool { - // Get the list of informers to check under a lock and check it outside. - okSoFar, informersToCheck := func() (bool, []informer) { - fs.federatedInformer.Lock() - defer fs.federatedInformer.Unlock() - - if len(fs.federatedInformer.targetInformers) != len(clusters) { - return false, []informer{} - } - informersToCheck := make([]informer, 0, len(clusters)) - for _, cluster := range clusters { - if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found { - informersToCheck = append(informersToCheck, targetInformer) - } else { - return false, []informer{} - } - } - return true, informersToCheck - }() + fs.federatedInformer.Lock() + defer fs.federatedInformer.Unlock() - if !okSoFar { + if len(fs.federatedInformer.targetInformers) != len(clusters) { + klog.V(4).Infof("The number of target informers mismatch with given clusters") return false } - for _, informerToCheck := range informersToCheck { - if !informerToCheck.controller.HasSynced() { + for _, cluster := range clusters { + if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found { + if !targetInformer.controller.HasSynced() { + klog.V(4).Infof("Informer of cluster %q not synced", cluster.Name) + return false + } + } else { + klog.V(4).Infof("Informer of cluster %q not found", cluster.Name) return false } } diff --git a/test/common/bindata.go b/test/common/bindata.go index ef4385ee64..6c4defded0 100644 --- a/test/common/bindata.go +++ b/test/common/bindata.go @@ -382,6 +382,7 @@ spec: controllerDuration: availableDelay: 20s unavailableDelay: 60s + cacheSyncTimeout: 5m leaderElect: leaseDuration: 1500ms renewDeadline: 1000ms