From 24c11614d9949109ffb397391b412a02296a0eaf Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 15 Apr 2021 15:14:38 +0800 Subject: [PATCH] Fix Antrea-native Policy with multiple AppliedTo (#2084) A Policy rule may have multiple AppliedToGroups, not all of which select some workloads on the Nodes that the Policy applies to. It's by design that an AppliedToGroup won't be sent to a Node if it doesn't select any workload on it, so agents shouldn't require all AppliedToGroups to be received before it can realize a rule. What's more, it may happen that none of its AppliedToGroups is sent to a Node when the rule itself is being evaluated on the Node if it's sent to the Node because other rules of its parent Policy apply to it. This patch fixes the logic by making the controller install a rule when any of its AppliedToGroups can be populated and all of its AddressGroups can be populated, and uninstall it when none of its AppliedToGroups can be populated. --- pkg/agent/controller/networkpolicy/cache.go | 75 ++++++++++++------- .../controller/networkpolicy/cache_test.go | 51 ++++++++++--- .../networkpolicy/networkpolicy_controller.go | 22 +++--- .../networkpolicy_controller_test.go | 34 ++++----- .../networkpolicy/status_controller.go | 2 +- .../networkpolicy/status_controller_test.go | 10 ++- test/e2e/antreapolicy_test.go | 60 +++++++++++++++ 7 files changed, 186 insertions(+), 68 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index 4fbd12e506a..e46a86693e0 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -220,22 +220,32 @@ func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string, npFilter *q return policies } -func (c *ruleCache) getRule(ruleID string) (*rule, bool) { - obj, exists, _ := c.rules.GetByKey(ruleID) - if !exists { - return nil, false - } - return obj.(*rule), true -} - -func (c *ruleCache) getRulesByNetworkPolicy(uid string) []*rule { +func (c *ruleCache) getEffectiveRulesByNetworkPolicy(uid string) []*rule { objs, _ := c.rules.ByIndex(policyIndex, uid) if len(objs) == 0 { return nil } - rules := make([]*rule, len(objs)) - for i, obj := range objs { - rules[i] = obj.(*rule) + rules := make([]*rule, 0, len(objs)) + + // A rule is considered effective when any of its AppliedToGroups can be populated. + isEffective := func(r *rule) bool { + for _, g := range r.AppliedToGroups { + _, exists := c.memberSetByGroup[g] + if exists { + return true + } + } + return false + } + + c.podSetLock.RLock() + defer c.podSetLock.RUnlock() + + for _, obj := range objs { + rule := obj.(*rule) + if isEffective(rule) { + rules = append(rules, rule) + } } return rules } @@ -518,13 +528,13 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error } // DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup. -// It should only happen when a group is no longer referenced by any rule, so -// no need to mark dirty rules. +// It may be called when a rule becomes ineffective, so it needs to mark dirty rules. func (c *ruleCache) DeleteAppliedToGroup(group *v1beta.AppliedToGroup) error { c.podSetLock.Lock() defer c.podSetLock.Unlock() delete(c.memberSetByGroup, group.Name) + c.onAppliedToGroupUpdate(group.Name) return nil } @@ -689,16 +699,33 @@ func (c *ruleCache) deleteNetworkPolicyLocked(uid string) error { } // GetCompletedRule constructs a *CompletedRule for the provided ruleID. -// If the rule is not found or not completed due to missing group data, -// the return value will indicate it. -func (c *ruleCache) GetCompletedRule(ruleID string) (completedRule *CompletedRule, exists bool, completed bool) { +// If the rule is not effective or not realizable due to missing group data, the return value will indicate it. +// A rule is considered effective when any of its AppliedToGroups can be populated. +// A rule is considered realizable when it's effective and all of its AddressGroups can be populated. +// When a rule is not effective, it should be removed from the datapath. +// When a rule is effective but not realizable, the caller should wait for it being realizable before doing anything. +// When a rule is effective and realizable, the caller should realize it. +// This is because some AppliedToGroups in a rule might never be sent to this Node if one of the following is true: +// - The original policy has multiple AppliedToGroups and some AppliedToGroups' span does not include this Node. +// - The original policy is appliedTo-per-rule, and some of the rule's AppliedToGroups do not include this Node. +// - The original policy is appliedTo-per-rule, none of the rule's AppliedToGroups includes this Node, but some other rules' (in the same policy) AppliedToGroups include this Node. +// In these cases, it is not guaranteed that all AppliedToGroups in the rule will eventually be present in the cache. +// Only the AppliedToGroups whose span includes this Node will eventually be received. +func (c *ruleCache) GetCompletedRule(ruleID string) (completedRule *CompletedRule, effective bool, realizable bool) { obj, exists, _ := c.rules.GetByKey(ruleID) if !exists { return nil, false, false } r := obj.(*rule) + + groupMembers, anyExists := c.unionAppliedToGroups(r.AppliedToGroups) + if !anyExists { + return nil, false, false + } + var fromAddresses, toAddresses v1beta.GroupMemberSet + var completed bool if r.Direction == v1beta.DirectionIn { fromAddresses, completed = c.unionAddressGroups(r.From.AddressGroups) } else { @@ -708,11 +735,6 @@ func (c *ruleCache) GetCompletedRule(ruleID string) (completedRule *CompletedRul return nil, true, false } - groupMembers, completed := c.unionAppliedToGroups(r.AppliedToGroups) - if !completed { - return nil, true, false - } - completedRule = &CompletedRule{ rule: r, FromAddresses: fromAddresses, @@ -760,20 +782,21 @@ func (c *ruleCache) unionAddressGroups(groupNames []string) (v1beta.GroupMemberS } // unionAppliedToGroups gets the union of pods of the provided appliedTo groups. -// If any group is not found, nil and false will be returned to indicate the -// set is not complete yet. +// If any group is found, the union and true will be returned. Otherwise an empty set and false will be returned. func (c *ruleCache) unionAppliedToGroups(groupNames []string) (v1beta.GroupMemberSet, bool) { c.podSetLock.RLock() defer c.podSetLock.RUnlock() + anyExists := false set := v1beta.NewGroupMemberSet() for _, groupName := range groupNames { curSet, exists := c.memberSetByGroup[groupName] if !exists { klog.V(2).Infof("AppliedToGroup %v was not found", groupName) - return nil, false + continue } + anyExists = true set = set.Union(curSet) } - return set, true + return set, anyExists } diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 48432576b43..0e6fc0ef752 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -745,12 +745,24 @@ func TestRuleCacheGetCompletedRule(t *testing.T) { From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1", "addressGroup2", "addressGroup3"}}, AppliedToGroups: []string{"appliedToGroup1", "appliedToGroup2"}, } + rule4 := &rule{ + ID: "rule4", + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1", "addressGroup2"}}, + AppliedToGroups: []string{"appliedToGroup1", "appliedToGroup2", "appliedToGroup3"}, + } + rule5 := &rule{ + ID: "rule5", + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1", "addressGroup2"}}, + AppliedToGroups: []string{"appliedToGroup3", "appliedToGroup4"}, + } tests := []struct { name string args string wantCompletedRule *CompletedRule - wantExists bool - wantCompleted bool + wantEffective bool + wantRealizable bool }{ { "one-group-rule", @@ -777,15 +789,34 @@ func TestRuleCacheGetCompletedRule(t *testing.T) { true, }, { - "incompleted-rule", + "missing-one-addressgroup-rule", rule3.ID, nil, true, false, }, + { + "missing-one-appliedtogroup-rule", + rule4.ID, + &CompletedRule{ + rule: rule4, + FromAddresses: addressGroup1.Union(addressGroup2), + ToAddresses: nil, + TargetMembers: appliedToGroup1.Union(appliedToGroup2), + }, + true, + true, + }, + { + "missing-all-appliedtogroups-rule", + rule5.ID, + nil, + false, + false, + }, { "non-existing-rule", - "rule4", + "rule6", nil, false, false, @@ -801,16 +832,18 @@ func TestRuleCacheGetCompletedRule(t *testing.T) { c.rules.Add(rule1) c.rules.Add(rule2) c.rules.Add(rule3) + c.rules.Add(rule4) + c.rules.Add(rule5) - gotCompletedRule, gotExists, gotCompleted := c.GetCompletedRule(tt.args) + gotCompletedRule, gotEffective, gotRealizable := c.GetCompletedRule(tt.args) if !reflect.DeepEqual(gotCompletedRule, tt.wantCompletedRule) { t.Errorf("GetCompletedRule() gotCompletedRule = %v, want %v", gotCompletedRule, tt.wantCompletedRule) } - if gotExists != tt.wantExists { - t.Errorf("GetCompletedRule() gotExists = %v, want %v", gotExists, tt.wantExists) + if gotEffective != tt.wantEffective { + t.Errorf("GetCompletedRule() gotEffective = %v, want %v", gotEffective, tt.wantEffective) } - if gotCompleted != tt.wantCompleted { - t.Errorf("GetCompletedRule() gotCompleted = %v, want %v", gotCompleted, tt.wantCompleted) + if gotRealizable != tt.wantRealizable { + t.Errorf("GetCompletedRule() gotRealizable = %v, want %v", gotRealizable, tt.wantRealizable) } }) } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 9e34d97008e..39dfd2b7706 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -454,9 +454,9 @@ func (c *Controller) syncRule(key string) error { klog.V(4).Infof("Finished syncing rule %q. (%v)", key, time.Since(startTime)) }() - rule, exists, completed := c.ruleCache.GetCompletedRule(key) - if !exists { - klog.V(2).Infof("Rule %v had been deleted, removing its flows", key) + rule, effective, realizable := c.ruleCache.GetCompletedRule(key) + if !effective { + klog.V(2).Infof("Rule %v was not effective, removing its flows", key) if err := c.reconciler.Forget(key); err != nil { return err } @@ -467,10 +467,10 @@ func (c *Controller) syncRule(key string) error { } return nil } - // If the rule is not complete, we can simply skip it as it will be marked as dirty + // If the rule is not realizable, we can simply skip it as it will be marked as dirty // and queued again when we receive the missing group it missed. - if !completed { - klog.V(2).Infof("Rule %v was not complete, skipping", key) + if !realizable { + klog.V(2).Infof("Rule %v was not realizable, skipping", key) return nil } if err := c.reconciler.Reconcile(rule); err != nil { @@ -493,9 +493,13 @@ func (c *Controller) syncRules(keys []string) error { var allRules []*CompletedRule for _, key := range keys { - rule, exists, completed := c.ruleCache.GetCompletedRule(key) - if !exists || !completed { - klog.Errorf("Rule %s is not complete or does not exist in cache", key) + rule, effective, realizable := c.ruleCache.GetCompletedRule(key) + // It's normal that a rule is not effective on this Node but abnormal that it is not realizable after watchers + // complete full sync. + if !effective { + klog.Infof("Rule %s is not effective on this Node", key) + } else if !realizable { + klog.Errorf("Rule %s is effective but not realizable", key) } else { allRules = append(allRules, rule) } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index cda871f019e..88cb3ad84d0 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -277,7 +277,7 @@ func TestAddMultipleGroupsRule(t *testing.T) { desiredRule := &CompletedRule{ rule: &rule{Direction: v1beta2.DirectionIn, Services: services}, FromAddresses: v1beta2.NewGroupMemberSet(newAddressGroupMember("1.1.1.1"), newAddressGroupMember("2.2.2.2"), newAddressGroupMember("3.3.3.3")), - ToAddresses: v1beta2.NewGroupMemberSet(), + ToAddresses: nil, TargetMembers: v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1"), newAppliedToGroupMember("pod2", "ns2")), } stopCh := make(chan struct{}) @@ -306,37 +306,33 @@ func TestAddMultipleGroupsRule(t *testing.T) { assert.Equal(t, 1, controller.GetAddressGroupNum()) assert.Equal(t, 1, controller.GetAppliedToGroupNum()) - // addressGroup2 comes, no rule will be synced due to missing appliedToGroup2 data. + // addressGroup2 comes, policy1 will be synced with the TargetMembers populated from appliedToGroup1. addressGroupWatcher.Add(newAddressGroup("addressGroup2", []v1beta2.GroupMember{*newAddressGroupMember("1.1.1.1"), *newAddressGroupMember("3.3.3.3")})) select { case ruleID := <-reconciler.updated: - t.Fatalf("Expected no update, got %v", ruleID) + actualRule, _ := reconciler.getLastRealized(ruleID) + assert.Equal(t, actualRule.Direction, desiredRule.Direction) + assert.ElementsMatch(t, actualRule.Services, desiredRule.Services) + assert.Equal(t, actualRule.FromAddresses, desiredRule.FromAddresses) + assert.Equal(t, actualRule.ToAddresses, desiredRule.ToAddresses) + assert.Equal(t, actualRule.TargetMembers, v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1"))) case <-time.After(time.Millisecond * 100): + t.Fatal("Expected one update, got none") } assert.Equal(t, 1, controller.GetNetworkPolicyNum()) assert.Equal(t, 2, controller.GetAddressGroupNum()) assert.Equal(t, 1, controller.GetAppliedToGroupNum()) - // appliedToGroup2 comes, policy1 will be synced. + // appliedToGroup2 comes, policy1 will be synced with the TargetMembers populated from appliedToGroup1 and appliedToGroup2. appliedToGroupWatcher.Add(newAppliedToGroup("appliedToGroup2", []v1beta2.GroupMember{*newAppliedToGroupMember("pod2", "ns2")})) select { case ruleID := <-reconciler.updated: actualRule, _ := reconciler.getLastRealized(ruleID) - if actualRule.Direction != desiredRule.Direction { - t.Errorf("Expected Direction %v, got %v", actualRule.Direction, desiredRule.Direction) - } - if !assert.ElementsMatch(t, actualRule.Services, desiredRule.Services) { - t.Errorf("Expected Services %v, got %v", actualRule.Services, desiredRule.Services) - } - if !actualRule.FromAddresses.Equal(desiredRule.FromAddresses) { - t.Errorf("Expected FromAddresses %v, got %v", actualRule.FromAddresses, desiredRule.FromAddresses) - } - if !actualRule.ToAddresses.Equal(desiredRule.ToAddresses) { - t.Errorf("Expected ToAddresses %v, got %v", actualRule.ToAddresses, desiredRule.ToAddresses) - } - if !actualRule.TargetMembers.Equal(desiredRule.TargetMembers) { - t.Errorf("Expected Pods %v, got %v", actualRule.TargetMembers, desiredRule.TargetMembers) - } + assert.Equal(t, actualRule.Direction, desiredRule.Direction) + assert.ElementsMatch(t, actualRule.Services, desiredRule.Services) + assert.Equal(t, actualRule.FromAddresses, desiredRule.FromAddresses) + assert.Equal(t, actualRule.ToAddresses, desiredRule.ToAddresses) + assert.Equal(t, actualRule.TargetMembers, desiredRule.TargetMembers) case <-time.After(time.Millisecond * 100): t.Fatal("Expected one update, got none") } diff --git a/pkg/agent/controller/networkpolicy/status_controller.go b/pkg/agent/controller/networkpolicy/status_controller.go index 8dd9d52db8f..0d921c858d6 100644 --- a/pkg/agent/controller/networkpolicy/status_controller.go +++ b/pkg/agent/controller/networkpolicy/status_controller.go @@ -164,7 +164,7 @@ func (c *StatusController) syncHandler(uid types.UID) error { if policy == nil { return nil } - desiredRules := c.ruleCache.getRulesByNetworkPolicy(string(uid)) + desiredRules := c.ruleCache.getEffectiveRulesByNetworkPolicy(string(uid)) // The policy must have been deleted, no further processing. if len(desiredRules) == 0 { return nil diff --git a/pkg/agent/controller/networkpolicy/status_controller_test.go b/pkg/agent/controller/networkpolicy/status_controller_test.go index 6cd9358489d..6d262c73661 100644 --- a/pkg/agent/controller/networkpolicy/status_controller_test.go +++ b/pkg/agent/controller/networkpolicy/status_controller_test.go @@ -105,7 +105,8 @@ func TestSyncStatusForNewPolicy(t *testing.T) { go statusController.Run(stopCh) ruleCache.AddNetworkPolicy(tt.policy) - rules := ruleCache.getRulesByNetworkPolicy(string(tt.policy.UID)) + ruleCache.AddAppliedToGroup(newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*newAppliedToGroupMember("pod1", "ns1")})) + rules := ruleCache.getEffectiveRulesByNetworkPolicy(string(tt.policy.UID)) for i, rule := range rules { // Only make specified number of rules realized. if i >= tt.realizedRules { @@ -126,10 +127,11 @@ func TestSyncStatusUpForUpdatedPolicy(t *testing.T) { defer close(stopCh) go statusController.Run(stopCh) + ruleCache.AddAppliedToGroup(newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*newAppliedToGroupMember("pod1", "ns1")})) policy := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, []string{}, []string{"appliedToGroup1"}, nil) policy.Generation = 1 ruleCache.AddNetworkPolicy(policy) - rule1 := ruleCache.getRulesByNetworkPolicy(string(policy.UID))[0] + rule1 := ruleCache.getEffectiveRulesByNetworkPolicy(string(policy.UID))[0] statusController.SetRuleRealization(rule1.ID, policy.UID) matchGeneration := func(generation int64) error { @@ -149,7 +151,7 @@ func TestSyncStatusUpForUpdatedPolicy(t *testing.T) { ruleCache.UpdateNetworkPolicy(policy) assert.Error(t, matchGeneration(policy.Generation), "The generation should not be updated to %v but was updated", policy.Generation) - rules := ruleCache.getRulesByNetworkPolicy(string(policy.UID)) + rules := ruleCache.getEffectiveRulesByNetworkPolicy(string(policy.UID)) for _, rule := range rules { // Only call SetRuleRealization for new rule. if rule.ID != rule1.ID { @@ -184,7 +186,7 @@ func BenchmarkSyncHandler(b *testing.B) { policy.Rules = append(policy.Rules, newPolicyRule(v1beta2.DirectionOut, nil, []string{fmt.Sprintf("addressGroup%d", i)}, nil)) } ruleCache.AddNetworkPolicy(policy) - rules := ruleCache.getRulesByNetworkPolicy(string(policy.UID)) + rules := ruleCache.getEffectiveRulesByNetworkPolicy(string(policy.UID)) for _, rule := range rules { statusController.SetRuleRealization(rule.ID, policy.UID) } diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index 9d47919460e..224c609e0a2 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -759,6 +759,65 @@ func testANPBasic(t *testing.T) { executeTests(t, testCase) } +// testANPMultipleAppliedTo tests traffic from X/B to Y/A on port 80 will be dropped, after applying Antrea +// NetworkPolicy that applies to multiple AppliedTos, one of which doesn't select any Pod. It also ensures the Policy is +// updated correctly when one of its AppliedToGroup starts and stops selecting Pods. +func testANPMultipleAppliedTo(t *testing.T) { + tempLabel := randName("temp-") + builder := &AntreaNetworkPolicySpecBuilder{} + // Make it apply to an extra dummy AppliedTo to ensure it handles multiple AppliedToGroups correctly. + // See https://github.com/vmware-tanzu/antrea/issues/2083. + builder = builder.SetName("y", "np-multiple-appliedto").SetPriority(1.0). + SetAppliedToGroup(map[string]string{"pod": "a"}, nil). + SetAppliedToGroup(map[string]string{"tempLabel": ""}, nil). + AddIngress(v1.ProtocolTCP, &p80, nil, nil, map[string]string{"pod": "b"}, map[string]string{"ns": "x"}, nil, nil, secv1alpha1.RuleActionDrop) + + reachability := NewReachability(allPods, true) + reachability.Expect(Pod("x/b"), Pod("y/a"), false) + + _, err := k8sUtils.CreateOrUpdateANP(builder.Get()) + failOnError(err, t) + time.Sleep(networkPolicyDelay) + k8sUtils.Validate(allPods, reachability, 80) + reachability.PrintSummary(true, true, true) + _, wrong, _ := reachability.Summary() + if wrong != 0 { + t.Errorf("failure -- %d wrong results", wrong) + } + + t.Logf("Making the Policy apply to y/c by labeling it with the temporary label that matches the dummy AppliedTo") + podYC, _ := k8sUtils.GetPod("y", "c") + podYC.Labels[tempLabel] = "" + podYC, err = k8sUtils.clientset.CoreV1().Pods(podYC.Namespace).Update(context.TODO(), podYC, metav1.UpdateOptions{}) + assert.NoError(t, err) + reachability = NewReachability(allPods, true) + reachability.Expect(Pod("x/b"), Pod("y/a"), false) + reachability.Expect(Pod("x/b"), Pod("y/c"), false) + time.Sleep(networkPolicyDelay) + k8sUtils.Validate(allPods, reachability, 80) + reachability.PrintSummary(true, true, true) + _, wrong, _ = reachability.Summary() + if wrong != 0 { + t.Errorf("failure -- %d wrong results", wrong) + } + + t.Logf("Making the Policy not apply to y/c by removing the temporary label") + delete(podYC.Labels, tempLabel) + _, err = k8sUtils.clientset.CoreV1().Pods(podYC.Namespace).Update(context.TODO(), podYC, metav1.UpdateOptions{}) + assert.NoError(t, err) + reachability = NewReachability(allPods, true) + reachability.Expect(Pod("x/b"), Pod("y/a"), false) + time.Sleep(networkPolicyDelay) + k8sUtils.Validate(allPods, reachability, 80) + reachability.PrintSummary(true, true, true) + _, wrong, _ = reachability.Summary() + if wrong != 0 { + t.Errorf("failure -- %d wrong results", wrong) + } + + failOnError(k8sUtils.CleanANPs(namespaces), t) +} + // testAuditLoggingBasic tests that a audit log is generated when egress drop applied func testAuditLoggingBasic(t *testing.T, data *TestData) { builder := &ClusterNetworkPolicySpecBuilder{} @@ -919,6 +978,7 @@ func TestAntreaPolicy(t *testing.T) { t.Run("Case=CNPPriorityConflictingRule", func(t *testing.T) { testCNPPriorityConflictingRule(t) }) t.Run("Case=CNPRulePriority", func(t *testing.T) { testCNPRulePrioirty(t) }) t.Run("Case=ANPBasic", func(t *testing.T) { testANPBasic(t) }) + t.Run("Case=testANPMultipleAppliedTo", func(t *testing.T) { testANPMultipleAppliedTo(t) }) }) // print results for reachability tests printResults()