Skip to content

Commit

Permalink
Fix NetworkPolicy span calculation
Browse files Browse the repository at this point in the history
A NetworkPolicy's span is calculated in internalNetworkPolicyWorker,
based on the span of the AppliedToGroups it refers to, while the span of
AppliedToGroup is calculated in appliedToGroupWorker which runs in
parallel with internalNetworkPolicyWorker. It could happen that the
calcuated span is out of date if AppliedToGroups' span is updated after
internalNetworkPolicyWorker calculates a NetworkPolicy's span, and the
NetworkPolicy wouldn't be enqueued for another sync if it's not
committed to the storage yet.

On the other hand, if we commit the NetworkPolicy to the storage before
calculating the NetworkPolicy's span, it would have to use a stale span
first and might need to update the NetworkPolicy twice and generate two
update events in one sync.

To fix the issue without generating extra events, we introduce a
separate subscription mechanism that allows subscribing to update of
AppliedToGroup for NetworkPolicy. With the subscription, we can still
calculate the NetworkPolicy's span first, then commit it to the storage.
If any of the subscribed AppliedToGroups are updated, the NetworkPolicy
will be notified and resynced.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Oct 13, 2023
1 parent cb64db3 commit 8874d57
Show file tree
Hide file tree
Showing 4 changed files with 388 additions and 12 deletions.
40 changes: 28 additions & 12 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ type NetworkPolicyController struct {
// to the same addressgroups/appliedtogroups.
internalNetworkPolicyMutex sync.RWMutex

// appliedToGroupNotifier is responsible for notifying subscribers of an AppliedToGroup about its update.
// The typical subscribers of AppliedToGroup are NetworkPolicies.
appliedToGroupNotifier *notifier

groupingInterface grouping.Interface
// Added as a member to the struct to allow injection for testing.
groupingInterfaceSynced func() bool
Expand Down Expand Up @@ -410,6 +414,7 @@ func NewNetworkPolicyController(kubeClient clientset.Interface,
groupingInterfaceSynced: groupingInterface.HasSynced,
labelIdentityInterface: labelIdentityInterface,
stretchNPEnabled: stretchedNPEnabled,
appliedToGroupNotifier: newNotifier(),
}
n.groupingInterface.AddEventHandler(appliedToGroupType, n.enqueueAppliedToGroup)
n.groupingInterface.AddEventHandler(addressGroupType, n.enqueueAddressGroup)
Expand Down Expand Up @@ -1308,19 +1313,9 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error {
}
}
n.appliedToGroupStore.Update(updatedAppliedToGroup)
// Get all internal NetworkPolicy objects that refers this AppliedToGroup.
// Note that this must be executed after storing the result, to ensure that
// both of the NetworkPolicies that referred it before storing it and the
// ones after storing it can get the right span.
nps, err := n.internalNetworkPolicyStore.GetByIndex(store.AppliedToGroupIndex, key)
if err != nil {
return fmt.Errorf("unable to filter internal NetworkPolicies for AppliedToGroup %s: %v", key, err)
}
// Enqueue syncInternalNetworkPolicy for each affected internal NetworkPolicy so
// that corresponding Node spans are updated.
for _, npObj := range nps {
n.enqueueInternalNetworkPolicy(npObj.(*antreatypes.NetworkPolicy).SourceRef)
}
// the notified subscribers get the latest state.
n.appliedToGroupNotifier.notify(key)
return nil
}

Expand Down Expand Up @@ -1449,6 +1444,15 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne
newInternalNetworkPolicy, newAppliedToGroups, newAddressGroups = n.processNetworkPolicy(knp)
}

// The NetworkPolicy must subscribe to the updates of AppliedToGroups before calculating span based on them,
// otherwise the calculated span may be outdated as AppliedToGroups can be updated concurrently and the
// NetworkPolicy wouldn't be notified.
for group := range newAppliedToGroups {
n.appliedToGroupNotifier.subscribe(group, internalNetworkPolicyName, func() {
n.enqueueInternalNetworkPolicy(key)
})
}

newNodeNames, err := func() (sets.String, error) {
nodeNames := sets.NewString()
// Calculate the set of Node names based on the span of the
Expand Down Expand Up @@ -1540,9 +1544,11 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne
// Enqueue AddressGroups that are affected by this NetworkPolicy.
var oldNodeNames sets.String
var oldAddressGroupNames sets.String
var oldAppliedToGroupNames sets.String
if oldInternalNetworkPolicy != nil {
oldNodeNames = oldInternalNetworkPolicy.NodeNames
oldAddressGroupNames = oldInternalNetworkPolicy.GetAddressGroups()
oldAppliedToGroupNames = oldInternalNetworkPolicy.GetAppliedToGroups()
}
var addressGroupsToSync sets.String
newAddressGroupNames := sets.StringKeySet(newAddressGroups)
Expand All @@ -1556,6 +1562,12 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne
for addressGroup := range addressGroupsToSync {
n.enqueueAddressGroup(addressGroup)
}
// Unsubscribe to the updates of the stale AppliedToGroups.
for name := range oldAppliedToGroupNames {
if _, exists := newAppliedToGroups[name]; !exists {
n.appliedToGroupNotifier.unsubscribe(name, internalNetworkPolicyName)
}
}
return nil
}

Expand All @@ -1573,6 +1585,10 @@ func (n *NetworkPolicyController) deleteInternalNetworkPolicy(name string) {
internalNetworkPolicy := obj.(*antreatypes.NetworkPolicy)
n.internalNetworkPolicyStore.Delete(internalNetworkPolicy.Name)
n.cleanupOrphanGroups(internalNetworkPolicy)
// Unsubscribe to the updates of the AppliedToGroups.
for appliedToGroup := range internalNetworkPolicy.GetAppliedToGroups() {
n.appliedToGroupNotifier.unsubscribe(appliedToGroup, name)
}
if n.stretchNPEnabled && internalNetworkPolicy.SourceRef.Type != controlplane.K8sNetworkPolicy {
n.labelIdentityInterface.DeletePolicySelectors(internalNetworkPolicy.Name)
}
Expand Down
188 changes: 188 additions & 0 deletions pkg/controller/networkpolicy/networkpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func newControllerWithoutEventHandler(k8sObjects, crdObjects []runtime.Object) (
internalNetworkPolicyQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "internalNetworkPolicy"),
internalGroupQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "internalGroup"),
groupingInterface: groupEntityIndex,
appliedToGroupNotifier: newNotifier(),
}
npController.tierInformer.Informer().AddIndexers(tierIndexers)
npController.cnpInformer.Informer().AddIndexers(cnpIndexers)
Expand Down Expand Up @@ -2921,6 +2922,193 @@ func compareIPNet(ipn1, ipn2 controlplane.IPNet) bool {
return true
}

// TestMultipleNetworkPoliciesWithSameAppliedTo verifies NetworkPolicyController can create and delete
// InternalNetworkPolicy, AppliedToGroups and AddressGroups correctly when concurrently processing multiple
// NetworkPolicies that refer to the same groups.
func TestMultipleNetworkPoliciesWithSameAppliedTo(t *testing.T) {
// podA and podB will be selected by the AppliedToGroup.
podA := getPod("podA", "default", "nodeA", "10.0.0.1", false)
podA.Labels = selectorA.MatchLabels
podB := getPod("podB", "default", "nodeB", "10.0.1.1", false)
podB.Labels = selectorA.MatchLabels
// podC will be selected by the AddressGroup.
podC := getPod("podC", "default", "nodeC", "10.0.2.1", false)
podC.Labels = selectorB.MatchLabels
// policyA and policyB use the same AppliedToGroup and AddressGroup.
policyA := &networkingv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "npA", UID: "uidA"},
Spec: networkingv1.NetworkPolicySpec{
PodSelector: selectorA,
Ingress: []networkingv1.NetworkPolicyIngressRule{
{
From: []networkingv1.NetworkPolicyPeer{{PodSelector: &selectorB}},
},
},
},
}
policyB := &networkingv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "npB", UID: "uidB"},
Spec: networkingv1.NetworkPolicySpec{
PodSelector: selectorA,
Egress: []networkingv1.NetworkPolicyEgressRule{
{
To: []networkingv1.NetworkPolicyPeer{{PodSelector: &selectorB}},
},
},
},
}

selectorAGroup := antreatypes.NewGroupSelector("default", &selectorA, nil, nil, nil)
selectorAGroupUID := getNormalizedUID(selectorAGroup.NormalizedName)
selectorBGroup := antreatypes.NewGroupSelector("default", &selectorB, nil, nil, nil)
selectorBGroupUID := getNormalizedUID(selectorBGroup.NormalizedName)
expectedAppliedToGroup := &antreatypes.AppliedToGroup{
SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA", "nodeB")}, // according to podA and podB
UID: types.UID(selectorAGroupUID),
Name: selectorAGroupUID,
Selector: selectorAGroup,
GroupMemberByNode: map[string]controlplane.GroupMemberSet{
"nodeA": controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{
Name: podA.Name,
Namespace: podA.Namespace,
}}),
"nodeB": controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{
Name: podB.Name,
Namespace: podB.Namespace,
}}),
},
}
expectedAddressGroup := &antreatypes.AddressGroup{
SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA", "nodeB")}, // according to policyA and policyB
UID: types.UID(selectorBGroupUID),
Name: selectorBGroupUID,
Selector: *selectorBGroup,
GroupMembers: controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{
Name: podC.Name,
Namespace: podC.Namespace,
}, IPs: []controlplane.IPAddress{ipStrToIPAddress(podC.Status.PodIP)}}),
}
expectedPolicyA := &antreatypes.NetworkPolicy{
UID: "uidA",
Name: "uidA",
SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA", "nodeB")}, // according to AppliedToGroup
SourceRef: &controlplane.NetworkPolicyReference{
Type: controlplane.K8sNetworkPolicy,
Namespace: "default",
Name: "npA",
UID: "uidA",
},
Rules: []controlplane.NetworkPolicyRule{
{
Direction: controlplane.DirectionIn,
From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroupUID}},
Priority: defaultRulePriority,
Action: &defaultAction,
},
},
AppliedToGroups: []string{selectorAGroupUID},
}
expectedPolicyB := &antreatypes.NetworkPolicy{
UID: "uidB",
Name: "uidB",
SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA", "nodeB")}, // according to AppliedToGroup
SourceRef: &controlplane.NetworkPolicyReference{
Type: controlplane.K8sNetworkPolicy,
Namespace: "default",
Name: "npB",
UID: "uidB",
},
Rules: []controlplane.NetworkPolicyRule{
{
Direction: controlplane.DirectionOut,
To: controlplane.NetworkPolicyPeer{
AddressGroups: []string{selectorBGroupUID},
},
Priority: defaultRulePriority,
Action: &defaultAction,
},
},
AppliedToGroups: []string{selectorAGroupUID},
}
_, c := newController(podA, podB, podC)
stopCh := make(chan struct{})
defer close(stopCh)
c.informerFactory.Start(stopCh)
c.crdInformerFactory.Start(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)
go c.groupingInterface.Run(stopCh)
go c.groupingController.Run(stopCh)
go c.Run(stopCh)

c.kubeClient.NetworkingV1().NetworkPolicies(policyA.Namespace).Create(context.TODO(), policyA, metav1.CreateOptions{})
c.kubeClient.NetworkingV1().NetworkPolicies(policyB.Namespace).Create(context.TODO(), policyB, metav1.CreateOptions{})

checkInternalNetworkPolicyExist(t, c, expectedPolicyA)
checkInternalNetworkPolicyExist(t, c, expectedPolicyB)
checkAppliedToGroupExist(t, c, expectedAppliedToGroup)
checkAddressGroupExist(t, c, expectedAddressGroup)

c.kubeClient.NetworkingV1().NetworkPolicies(policyA.Namespace).Delete(context.TODO(), policyA.Name, metav1.DeleteOptions{})
c.kubeClient.NetworkingV1().NetworkPolicies(policyB.Namespace).Delete(context.TODO(), policyB.Name, metav1.DeleteOptions{})

checkInternalNetworkPolicyNotExist(t, c, expectedPolicyA)
checkInternalNetworkPolicyNotExist(t, c, expectedPolicyB)
checkAppliedToGroupNotExist(t, c, expectedAppliedToGroup)
checkAddressGroupNotExist(t, c, expectedAddressGroup)
}

func checkInternalNetworkPolicyExist(t *testing.T, c *networkPolicyController, policy *antreatypes.NetworkPolicy) {
assert.Eventually(t, func() bool {
obj, exists, _ := c.internalNetworkPolicyStore.Get(string(policy.UID))
if !exists {
return false
}
return reflect.DeepEqual(policy, obj.(*antreatypes.NetworkPolicy))
}, 3*time.Second, 10*time.Millisecond)
}

func checkAppliedToGroupExist(t *testing.T, c *networkPolicyController, appliedToGroup *antreatypes.AppliedToGroup) {
assert.Eventually(t, func() bool {
obj, exists, _ := c.appliedToGroupStore.Get(string(appliedToGroup.UID))
if !exists {
return false
}
return reflect.DeepEqual(appliedToGroup, obj.(*antreatypes.AppliedToGroup))
}, 3*time.Second, 10*time.Millisecond)
}

func checkAddressGroupExist(t *testing.T, c *networkPolicyController, addressGroup *antreatypes.AddressGroup) {
assert.Eventually(t, func() bool {
obj, exists, _ := c.addressGroupStore.Get(string(addressGroup.UID))
if !exists {
return false
}
return reflect.DeepEqual(addressGroup, obj.(*antreatypes.AddressGroup))
}, 3*time.Second, 10*time.Millisecond)
}

func checkInternalNetworkPolicyNotExist(t *testing.T, c *networkPolicyController, policy *antreatypes.NetworkPolicy) {
assert.Eventually(t, func() bool {
_, exists, _ := c.internalNetworkPolicyStore.Get(string(policy.UID))
return !exists
}, 3*time.Second, 10*time.Millisecond)
}

func checkAppliedToGroupNotExist(t *testing.T, c *networkPolicyController, appliedToGroup *antreatypes.AppliedToGroup) {
assert.Eventually(t, func() bool {
_, exists, _ := c.appliedToGroupStore.Get(string(appliedToGroup.UID))
return !exists
}, 3*time.Second, 10*time.Millisecond)
}

func checkAddressGroupNotExist(t *testing.T, c *networkPolicyController, addressGroup *antreatypes.AddressGroup) {
assert.Eventually(t, func() bool {
_, exists, _ := c.addressGroupStore.Get(string(addressGroup.UID))
return !exists
}, 3*time.Second, 10*time.Millisecond)
}

func TestSyncInternalNetworkPolicy(t *testing.T) {
p10 := float64(10)
allowAction := crdv1alpha1.RuleActionAllow
Expand Down
77 changes: 77 additions & 0 deletions pkg/controller/networkpolicy/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package networkpolicy

import "sync"

// notifier notifies multiple subscribers about any events that happen to the objects they have subscribed.
type notifier struct {
mutex sync.RWMutex
subscribers map[string]map[string]func()
}

func newNotifier() *notifier {
return &notifier{subscribers: map[string]map[string]func(){}}
}

// Subscribe the subscriber to the given resourceID with a callback.
// If the subscription already exists, it does nothing.
func (n *notifier) subscribe(resourceID, subscriberID string, callback func()) {
n.mutex.Lock()
defer n.mutex.Unlock()
subscribers, exists := n.subscribers[resourceID]
if !exists {
subscribers = map[string]func(){}
n.subscribers[resourceID] = subscribers
}
_, subscribed := subscribers[subscriberID]
if subscribed {
return
}
subscribers[subscriberID] = callback
}

// unsubscribe cancels the subscription.
// If the subscription does not exist, it does nothing.
func (n *notifier) unsubscribe(resourceID, subscriberID string) {
n.mutex.Lock()
defer n.mutex.Unlock()
subscribers, exists := n.subscribers[resourceID]
if !exists {
return
}
_, subscribed := subscribers[subscriberID]
if !subscribed {
return
}
delete(subscribers, subscriberID)
// If the resource is no longer subscribed by any notifier, remove its key.
if len(subscribers) == 0 {
delete(n.subscribers, resourceID)
}
}

// Notify the subscribers by calling the callbacks they registered.
func (n *notifier) notify(resourceID string) {
n.mutex.RLock()
defer n.mutex.RUnlock()
subscribers, exists := n.subscribers[resourceID]
if !exists {
return
}
for _, callback := range subscribers {
callback()
}
}
Loading

0 comments on commit 8874d57

Please sign in to comment.