Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #5554: Fix NetworkPolicy span calculation #5574

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ type NetworkPolicyController struct {
// to the same addressgroups/appliedtogroups.
internalNetworkPolicyMutex sync.RWMutex

// appliedToGroupNotifier is responsible for notifying subscribers of an AppliedToGroup about its update.
// The typical subscribers of AppliedToGroup are NetworkPolicies.
appliedToGroupNotifier *notifier

groupingInterface grouping.Interface
// Added as a member to the struct to allow injection for testing.
groupingInterfaceSynced func() bool
Expand Down Expand Up @@ -410,6 +414,7 @@ func NewNetworkPolicyController(kubeClient clientset.Interface,
groupingInterfaceSynced: groupingInterface.HasSynced,
labelIdentityInterface: labelIdentityInterface,
stretchNPEnabled: stretchedNPEnabled,
appliedToGroupNotifier: newNotifier(),
}
n.groupingInterface.AddEventHandler(appliedToGroupType, n.enqueueAppliedToGroup)
n.groupingInterface.AddEventHandler(addressGroupType, n.enqueueAddressGroup)
Expand Down Expand Up @@ -1308,19 +1313,9 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error {
}
}
n.appliedToGroupStore.Update(updatedAppliedToGroup)
// Get all internal NetworkPolicy objects that refers this AppliedToGroup.
// Note that this must be executed after storing the result, to ensure that
// both of the NetworkPolicies that referred it before storing it and the
// ones after storing it can get the right span.
nps, err := n.internalNetworkPolicyStore.GetByIndex(store.AppliedToGroupIndex, key)
if err != nil {
return fmt.Errorf("unable to filter internal NetworkPolicies for AppliedToGroup %s: %v", key, err)
}
// Enqueue syncInternalNetworkPolicy for each affected internal NetworkPolicy so
// that corresponding Node spans are updated.
for _, npObj := range nps {
n.enqueueInternalNetworkPolicy(npObj.(*antreatypes.NetworkPolicy).SourceRef)
}
// the notified subscribers get the latest state.
n.appliedToGroupNotifier.notify(key)
return nil
}

Expand Down Expand Up @@ -1449,6 +1444,15 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne
newInternalNetworkPolicy, newAppliedToGroups, newAddressGroups = n.processNetworkPolicy(knp)
}

// The NetworkPolicy must subscribe to the updates of AppliedToGroups before calculating span based on them,
// otherwise the calculated span may be outdated as AppliedToGroups can be updated concurrently and the
// NetworkPolicy wouldn't be notified.
for group := range newAppliedToGroups {
n.appliedToGroupNotifier.subscribe(group, internalNetworkPolicyName, func() {
n.enqueueInternalNetworkPolicy(key)
})
}

newNodeNames, err := func() (sets.String, error) {
nodeNames := sets.NewString()
// Calculate the set of Node names based on the span of the
Expand Down Expand Up @@ -1540,9 +1544,11 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne
// Enqueue AddressGroups that are affected by this NetworkPolicy.
var oldNodeNames sets.String
var oldAddressGroupNames sets.String
var oldAppliedToGroupNames sets.String
if oldInternalNetworkPolicy != nil {
oldNodeNames = oldInternalNetworkPolicy.NodeNames
oldAddressGroupNames = oldInternalNetworkPolicy.GetAddressGroups()
oldAppliedToGroupNames = oldInternalNetworkPolicy.GetAppliedToGroups()
}
var addressGroupsToSync sets.String
newAddressGroupNames := sets.StringKeySet(newAddressGroups)
Expand All @@ -1556,6 +1562,12 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne
for addressGroup := range addressGroupsToSync {
n.enqueueAddressGroup(addressGroup)
}
// Unsubscribe to the updates of the stale AppliedToGroups.
for name := range oldAppliedToGroupNames {
if _, exists := newAppliedToGroups[name]; !exists {
n.appliedToGroupNotifier.unsubscribe(name, internalNetworkPolicyName)
}
}
return nil
}

Expand All @@ -1573,6 +1585,10 @@ func (n *NetworkPolicyController) deleteInternalNetworkPolicy(name string) {
internalNetworkPolicy := obj.(*antreatypes.NetworkPolicy)
n.internalNetworkPolicyStore.Delete(internalNetworkPolicy.Name)
n.cleanupOrphanGroups(internalNetworkPolicy)
// Unsubscribe to the updates of the AppliedToGroups.
for appliedToGroup := range internalNetworkPolicy.GetAppliedToGroups() {
n.appliedToGroupNotifier.unsubscribe(appliedToGroup, name)
}
if n.stretchNPEnabled && internalNetworkPolicy.SourceRef.Type != controlplane.K8sNetworkPolicy {
n.labelIdentityInterface.DeletePolicySelectors(internalNetworkPolicy.Name)
}
Expand Down
188 changes: 188 additions & 0 deletions pkg/controller/networkpolicy/networkpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func newControllerWithoutEventHandler(k8sObjects, crdObjects []runtime.Object) (
internalNetworkPolicyQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "internalNetworkPolicy"),
internalGroupQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "internalGroup"),
groupingInterface: groupEntityIndex,
appliedToGroupNotifier: newNotifier(),
}
npController.tierInformer.Informer().AddIndexers(tierIndexers)
npController.cnpInformer.Informer().AddIndexers(cnpIndexers)
Expand Down Expand Up @@ -2921,6 +2922,193 @@ func compareIPNet(ipn1, ipn2 controlplane.IPNet) bool {
return true
}

// TestMultipleNetworkPoliciesWithSameAppliedTo verifies NetworkPolicyController can create and delete
// InternalNetworkPolicy, AppliedToGroups and AddressGroups correctly when concurrently processing multiple
// NetworkPolicies that refer to the same groups.
func TestMultipleNetworkPoliciesWithSameAppliedTo(t *testing.T) {
// podA and podB will be selected by the AppliedToGroup.
podA := getPod("podA", "default", "nodeA", "10.0.0.1", false)
podA.Labels = selectorA.MatchLabels
podB := getPod("podB", "default", "nodeB", "10.0.1.1", false)
podB.Labels = selectorA.MatchLabels
// podC will be selected by the AddressGroup.
podC := getPod("podC", "default", "nodeC", "10.0.2.1", false)
podC.Labels = selectorB.MatchLabels
// policyA and policyB use the same AppliedToGroup and AddressGroup.
policyA := &networkingv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "npA", UID: "uidA"},
Spec: networkingv1.NetworkPolicySpec{
PodSelector: selectorA,
Ingress: []networkingv1.NetworkPolicyIngressRule{
{
From: []networkingv1.NetworkPolicyPeer{{PodSelector: &selectorB}},
},
},
},
}
policyB := &networkingv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "npB", UID: "uidB"},
Spec: networkingv1.NetworkPolicySpec{
PodSelector: selectorA,
Egress: []networkingv1.NetworkPolicyEgressRule{
{
To: []networkingv1.NetworkPolicyPeer{{PodSelector: &selectorB}},
},
},
},
}

selectorAGroup := antreatypes.NewGroupSelector("default", &selectorA, nil, nil, nil)
selectorAGroupUID := getNormalizedUID(selectorAGroup.NormalizedName)
selectorBGroup := antreatypes.NewGroupSelector("default", &selectorB, nil, nil, nil)
selectorBGroupUID := getNormalizedUID(selectorBGroup.NormalizedName)
expectedAppliedToGroup := &antreatypes.AppliedToGroup{
SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA", "nodeB")}, // according to podA and podB
UID: types.UID(selectorAGroupUID),
Name: selectorAGroupUID,
Selector: selectorAGroup,
GroupMemberByNode: map[string]controlplane.GroupMemberSet{
"nodeA": controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{
Name: podA.Name,
Namespace: podA.Namespace,
}}),
"nodeB": controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{
Name: podB.Name,
Namespace: podB.Namespace,
}}),
},
}
expectedAddressGroup := &antreatypes.AddressGroup{
SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA", "nodeB")}, // according to policyA and policyB
UID: types.UID(selectorBGroupUID),
Name: selectorBGroupUID,
Selector: *selectorBGroup,
GroupMembers: controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{
Name: podC.Name,
Namespace: podC.Namespace,
}, IPs: []controlplane.IPAddress{ipStrToIPAddress(podC.Status.PodIP)}}),
}
expectedPolicyA := &antreatypes.NetworkPolicy{
UID: "uidA",
Name: "uidA",
SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA", "nodeB")}, // according to AppliedToGroup
SourceRef: &controlplane.NetworkPolicyReference{
Type: controlplane.K8sNetworkPolicy,
Namespace: "default",
Name: "npA",
UID: "uidA",
},
Rules: []controlplane.NetworkPolicyRule{
{
Direction: controlplane.DirectionIn,
From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroupUID}},
Priority: defaultRulePriority,
Action: &defaultAction,
},
},
AppliedToGroups: []string{selectorAGroupUID},
}
expectedPolicyB := &antreatypes.NetworkPolicy{
UID: "uidB",
Name: "uidB",
SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA", "nodeB")}, // according to AppliedToGroup
SourceRef: &controlplane.NetworkPolicyReference{
Type: controlplane.K8sNetworkPolicy,
Namespace: "default",
Name: "npB",
UID: "uidB",
},
Rules: []controlplane.NetworkPolicyRule{
{
Direction: controlplane.DirectionOut,
To: controlplane.NetworkPolicyPeer{
AddressGroups: []string{selectorBGroupUID},
},
Priority: defaultRulePriority,
Action: &defaultAction,
},
},
AppliedToGroups: []string{selectorAGroupUID},
}
_, c := newController(podA, podB, podC)
stopCh := make(chan struct{})
defer close(stopCh)
c.informerFactory.Start(stopCh)
c.crdInformerFactory.Start(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)
go c.groupingInterface.Run(stopCh)
go c.groupingController.Run(stopCh)
go c.Run(stopCh)

c.kubeClient.NetworkingV1().NetworkPolicies(policyA.Namespace).Create(context.TODO(), policyA, metav1.CreateOptions{})
c.kubeClient.NetworkingV1().NetworkPolicies(policyB.Namespace).Create(context.TODO(), policyB, metav1.CreateOptions{})

checkInternalNetworkPolicyExist(t, c, expectedPolicyA)
checkInternalNetworkPolicyExist(t, c, expectedPolicyB)
checkAppliedToGroupExist(t, c, expectedAppliedToGroup)
checkAddressGroupExist(t, c, expectedAddressGroup)

c.kubeClient.NetworkingV1().NetworkPolicies(policyA.Namespace).Delete(context.TODO(), policyA.Name, metav1.DeleteOptions{})
c.kubeClient.NetworkingV1().NetworkPolicies(policyB.Namespace).Delete(context.TODO(), policyB.Name, metav1.DeleteOptions{})

checkInternalNetworkPolicyNotExist(t, c, expectedPolicyA)
checkInternalNetworkPolicyNotExist(t, c, expectedPolicyB)
checkAppliedToGroupNotExist(t, c, expectedAppliedToGroup)
checkAddressGroupNotExist(t, c, expectedAddressGroup)
}

func checkInternalNetworkPolicyExist(t *testing.T, c *networkPolicyController, policy *antreatypes.NetworkPolicy) {
assert.Eventually(t, func() bool {
obj, exists, _ := c.internalNetworkPolicyStore.Get(string(policy.UID))
if !exists {
return false
}
return reflect.DeepEqual(policy, obj.(*antreatypes.NetworkPolicy))
}, 3*time.Second, 10*time.Millisecond)
}

func checkAppliedToGroupExist(t *testing.T, c *networkPolicyController, appliedToGroup *antreatypes.AppliedToGroup) {
assert.Eventually(t, func() bool {
obj, exists, _ := c.appliedToGroupStore.Get(string(appliedToGroup.UID))
if !exists {
return false
}
return reflect.DeepEqual(appliedToGroup, obj.(*antreatypes.AppliedToGroup))
}, 3*time.Second, 10*time.Millisecond)
}

func checkAddressGroupExist(t *testing.T, c *networkPolicyController, addressGroup *antreatypes.AddressGroup) {
assert.Eventually(t, func() bool {
obj, exists, _ := c.addressGroupStore.Get(string(addressGroup.UID))
if !exists {
return false
}
return reflect.DeepEqual(addressGroup, obj.(*antreatypes.AddressGroup))
}, 3*time.Second, 10*time.Millisecond)
}

func checkInternalNetworkPolicyNotExist(t *testing.T, c *networkPolicyController, policy *antreatypes.NetworkPolicy) {
assert.Eventually(t, func() bool {
_, exists, _ := c.internalNetworkPolicyStore.Get(string(policy.UID))
return !exists
}, 3*time.Second, 10*time.Millisecond)
}

func checkAppliedToGroupNotExist(t *testing.T, c *networkPolicyController, appliedToGroup *antreatypes.AppliedToGroup) {
assert.Eventually(t, func() bool {
_, exists, _ := c.appliedToGroupStore.Get(string(appliedToGroup.UID))
return !exists
}, 3*time.Second, 10*time.Millisecond)
}

func checkAddressGroupNotExist(t *testing.T, c *networkPolicyController, addressGroup *antreatypes.AddressGroup) {
assert.Eventually(t, func() bool {
_, exists, _ := c.addressGroupStore.Get(string(addressGroup.UID))
return !exists
}, 3*time.Second, 10*time.Millisecond)
}

func TestSyncInternalNetworkPolicy(t *testing.T) {
p10 := float64(10)
allowAction := crdv1alpha1.RuleActionAllow
Expand Down
77 changes: 77 additions & 0 deletions pkg/controller/networkpolicy/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package networkpolicy

import "sync"

// notifier notifies multiple subscribers about any events that happen to the objects they have subscribed.
type notifier struct {
mutex sync.RWMutex
subscribers map[string]map[string]func()
}

func newNotifier() *notifier {
return &notifier{subscribers: map[string]map[string]func(){}}
}

// Subscribe the subscriber to the given resourceID with a callback.
// If the subscription already exists, it does nothing.
func (n *notifier) subscribe(resourceID, subscriberID string, callback func()) {
n.mutex.Lock()
defer n.mutex.Unlock()
subscribers, exists := n.subscribers[resourceID]
if !exists {
subscribers = map[string]func(){}
n.subscribers[resourceID] = subscribers
}
_, subscribed := subscribers[subscriberID]
if subscribed {
return
}
subscribers[subscriberID] = callback
}

// unsubscribe cancels the subscription.
// If the subscription does not exist, it does nothing.
func (n *notifier) unsubscribe(resourceID, subscriberID string) {
n.mutex.Lock()
defer n.mutex.Unlock()
subscribers, exists := n.subscribers[resourceID]
if !exists {
return
}
_, subscribed := subscribers[subscriberID]
if !subscribed {
return
}
delete(subscribers, subscriberID)
// If the resource is no longer subscribed by any notifier, remove its key.
if len(subscribers) == 0 {
delete(n.subscribers, resourceID)
}
}

// Notify the subscribers by calling the callbacks they registered.
func (n *notifier) notify(resourceID string) {
n.mutex.RLock()
defer n.mutex.RUnlock()
subscribers, exists := n.subscribers[resourceID]
if !exists {
return
}
for _, callback := range subscribers {
callback()
}
}
Loading