Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

Infinite reconciliation loop (rawstatuscollection) #1380

Merged
merged 1 commit into from Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) ut
}

collectedStatus, collectedResourceStatus := dispatcher.CollectedStatus()
klog.V(4).Infof("Setting the federated status '%v'", collectedResourceStatus)
klog.V(4).Infof("Setting the federated status '%v' for %s %q", collectedResourceStatus, kind, key)
return s.setFederatedStatus(fedResource, status.AggregateSuccess, &collectedStatus, &collectedResourceStatus, enableRawResourceStatusCollection)
}

Expand Down
49 changes: 42 additions & 7 deletions pkg/controller/sync/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,25 @@ type CollectedResourceStatus struct {
// whether status should be written to the API.
func SetFederatedStatus(fedObject *unstructured.Unstructured, reason AggregateReason, collectedStatus CollectedPropagationStatus, collectedResourceStatus CollectedResourceStatus, resourceStatusCollection bool) (bool, error) {
resource := &GenericFederatedResource{}

err := util.UnstructuredToInterface(fedObject, resource)
if err != nil {
return false, errors.Wrapf(err, "Failed to unmarshall to generic resource")
}

// we apply to collectedResourceStatus the same marshalling applied to GenericFederatedResource
// so the resources can be actually comparable later on
normalizedCollectedResourceStatus, err := normalizeStatus(collectedResourceStatus)
if err != nil {
return false, errors.Wrap(err, "Failed to normalize status")
}

if resource.Status == nil {
resource.Status = &GenericFederatedStatus{}
}

changed := resource.Status.update(fedObject.GetGeneration(), reason, collectedStatus, collectedResourceStatus, resourceStatusCollection)
changed := resource.Status.update(fedObject.GetGeneration(), reason, collectedStatus, *normalizedCollectedResourceStatus, resourceStatusCollection)

if !changed {
return false, nil
}
Expand Down Expand Up @@ -177,7 +187,7 @@ func (s *GenericFederatedStatus) update(generation int64, reason AggregateReason
}
}

clustersChanged := s.setClusters(collectedStatus.StatusMap, collectedResourceStatus.StatusMap)
clustersChanged := s.setClusters(collectedStatus.StatusMap, collectedResourceStatus.StatusMap, resourceStatusCollection)

// Indicate that changes were propagated if either status.clusters
// was changed or if existing resources were updated (which could
Expand All @@ -196,8 +206,8 @@ func (s *GenericFederatedStatus) update(generation int64, reason AggregateReason
// setClusters sets the status.clusters slice from propagation and resource status
// maps. Returns a boolean indication of whether the status.clusters was
// modified.
func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}) bool {
if !s.clustersDiffer(statusMap, resourceStatusMap) {
func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}, resourceStatusCollection bool) bool {
if !s.clustersDiffer(statusMap, resourceStatusMap, resourceStatusCollection) {
return false
}
s.Clusters = []GenericClusterStatus{}
Expand All @@ -214,9 +224,9 @@ func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, res

// clustersDiffer checks whether `status.clusters` differs from the
// given status map.
func (s *GenericFederatedStatus) clustersDiffer(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}) bool {
if len(s.Clusters) != len(statusMap) || len(s.Clusters) != len(resourceStatusMap) {
klog.V(4).Info("Clusters differs from the size")
func (s *GenericFederatedStatus) clustersDiffer(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}, resourceStatusCollection bool) bool {
if len(s.Clusters) != len(statusMap) || resourceStatusCollection && len(s.Clusters) != len(resourceStatusMap) {
klog.V(4).Infof("Clusters differs from the size: clusters = %v, statusMap = %v, resourceStatusMap = %v", s.Clusters, statusMap, resourceStatusMap)
return true
}
for _, status := range s.Clusters {
Expand Down Expand Up @@ -278,3 +288,28 @@ func (s *GenericFederatedStatus) setPropagationCondition(reason AggregateReason,

return updateRequired
}

func normalizeStatus(collectedResourceStatus CollectedResourceStatus) (*CollectedResourceStatus, error) {
if len(collectedResourceStatus.StatusMap) == 0 {
return &collectedResourceStatus, nil
}
cleanedStatus := CollectedResourceStatus{
StatusMap: map[string]interface{}{},
ResourcesUpdated: collectedResourceStatus.ResourcesUpdated,
}

for key, value := range collectedResourceStatus.StatusMap {
content, err := json.Marshal(value)
if err != nil {
return nil, errors.Wrapf(err, "Failed to marshall collected resource status for cluster %s", key)
}
var status interface{}
err = json.Unmarshal(content, &status)
if err != nil {
return nil, errors.Wrapf(err, "Failed to unmarshall collected resource status as interface for cluster %s", key)
}
cleanedStatus.StatusMap[key] = status
}

return &cleanedStatus, nil
}
234 changes: 206 additions & 28 deletions pkg/controller/sync/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,187 @@ limitations under the License.
package status

import (
"reflect"
"testing"

apiv1 "k8s.io/api/core/v1"
)

func TestGenericPropagationStatusUpdateChanged(t *testing.T) {
testCases := map[string]struct {
generation int64
reason AggregateReason
statusMap PropagationStatusMap
resourceStatusMap map[string]interface{}
resourcesUpdated bool
expectedChanged bool
generation int64
reason AggregateReason
statusMap PropagationStatusMap
resourceStatusMap map[string]interface{}
remoteStatus interface{}
resourcesUpdated bool
expectedChanged bool
resourceStatusCollection bool
}{
"No change in clusters indicates unchanged": {
"Cluster not propagated indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterNotReady,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"Cluster not propagated indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterNotReady,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"Cluster status not retrieved indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"Cluster status not retrieved indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"No collected remote status indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"No collected remote status indicates unchanged with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: false,
},
"No change in clusters indicates unchanged with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": false,
"stage": "absent",
"cluster1": map[string]interface{}{},
},
resourcesUpdated: false,
expectedChanged: true,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"No change in clusters with update indicates changed": {
"No change in clusters indicates unchanged with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": false,
"stage": "absent",
"cluster1": map[string]interface{}{},
},
resourcesUpdated: true,
expectedChanged: true,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"Change in clusters indicates changed": {
"No change in clusters with update indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": true,
"stage": "deployed",
"cluster1": map[string]interface{}{},
},
expectedChanged: true,
resourcesUpdated: true,
resourceStatusCollection: true,
expectedChanged: true,
},
"Transition indicates changed": {
reason: NamespaceNotFederated,
expectedChanged: true,
"No change in clusters with update indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
resourcesUpdated: true,
resourceStatusCollection: false,
expectedChanged: true,
},
"Changed generation indicates changed": {
generation: 1,
expectedChanged: true,
"No change in remote status indicates unchanged with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{
"status": map[string]interface{}{
"status": "remoteStatus",
},
},
},
remoteStatus: map[string]interface{}{
"status": map[string]interface{}{
"status": "remoteStatus",
},
},
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: false,
},
"Change in clusters indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
"cluster2": ClusterPropagationOK,
},
resourceStatusCollection: true,
expectedChanged: true,
},
"Change in clusters indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
"cluster2": ClusterPropagationOK,
},
resourceStatusCollection: false,
expectedChanged: true,
},
"Transition indicates changed with remote status collection enabled": {
reason: NamespaceNotFederated,
resourceStatusCollection: true,
expectedChanged: true,
},
"Transition indicates changed with remote status collection disabled": {
reason: NamespaceNotFederated,
resourceStatusCollection: false,
expectedChanged: true,
},
"Changed generation indicates changed with remote status collection enabled": {
generation: 1,
resourceStatusCollection: true,
expectedChanged: true,
},
"Changed generation indicates changed with remote status collection disabled": {
generation: 1,
resourceStatusCollection: false,
expectedChanged: true,
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
fedStatus := &GenericFederatedStatus{
Clusters: []GenericClusterStatus{
{
Name: "cluster1",
Name: "cluster1",
RemoteStatus: tc.remoteStatus,
},
},
Conditions: []*GenericCondition{
Expand All @@ -95,10 +215,68 @@ func TestGenericPropagationStatusUpdateChanged(t *testing.T) {
StatusMap: tc.resourceStatusMap,
ResourcesUpdated: tc.resourcesUpdated,
}
changed := fedStatus.update(tc.generation, tc.reason, collectedStatus, collectedResourceStatus, true)
changed := fedStatus.update(tc.generation, tc.reason, collectedStatus, collectedResourceStatus, tc.resourceStatusCollection)
if tc.expectedChanged != changed {
t.Fatalf("Expected changed to be %v, got %v", tc.expectedChanged, changed)
}
})
}
}

func TestNormalizeStatus(t *testing.T) {
testCases := []struct {
name string
input CollectedResourceStatus
expectedResult *CollectedResourceStatus
expectedError error
}{
{
name: "CollectedResourceStatus is not modified if StatusMap is nil",
input: CollectedResourceStatus{},
expectedResult: &CollectedResourceStatus{},
},
{
name: "CollectedResourceStatus is not modified if StatusMap is empty",
input: CollectedResourceStatus{
StatusMap: map[string]interface{}{},
},
expectedResult: &CollectedResourceStatus{
StatusMap: map[string]interface{}{},
},
},
{
name: "CollectedResourceStatus StatusMap is correctly normalized and numbers are converted to float64",
input: CollectedResourceStatus{
StatusMap: map[string]interface{}{
"number": 1,
"string": "value",
"complexobj": map[string]int{
"one": 1,
},
},
},
expectedResult: &CollectedResourceStatus{
StatusMap: map[string]interface{}{
"number": float64(1),
"string": "value",
"complexobj": map[string]interface{}{
"one": float64(1),
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := normalizeStatus(tc.input)
if err != tc.expectedError {
t.Fatalf("Expected error to be %v, got %v", tc.expectedError, err)
}

if !reflect.DeepEqual(tc.expectedResult, actual) {
t.Fatalf("Expected result to be %#v, got %#v", tc.expectedResult, actual)
}
})
}
}