Skip to content

Commit

Permalink
Fix race conditions in NetworkPolicyController (#4028)
Browse files Browse the repository at this point in the history
There were a few race conditions in NetworkPolicyController:
* An AppliedToGroup or AddressGroup in use may be removed if situations
like this happens:
1. addANP creates a group for ANP A;
2. addNetworkPolicy reuses the group for KNP B, is going to create an
   internal NetworkPolicy;
3. deleteANP deletes the group for ANP A because at that moment no other
   internal NetworkPolicies are using the group;
4. addNetworkPolicy commits the internal NetworkPolicy for KNP B to
   storage, but the group no longer exists.

* An Antrea-native NetworkPolicy may be out-of-date if situations like
this happens:
1. An ACNP event is received, `updateCNP` calculates the new internal
   NetworkPolicy for the ACNP, is going to commit it to storage;
2. A ClusterGroup event triggers update of the ACNP via
   triggerCNPUpdates
3. triggerCNPUpdates calls reprocessCNP which updates the new internal
   NetworkPolicy for the ACNP and commits it to storage;
4. updateCNP in the first step commits its internal NetworkPolicy to
   storage which overrides the update of the ClusterGroup event.

The second one caused test flake of the test case
"TestGroupNoK8sNP/Case=ACNPNestedClusterGroup".

To resolve the race conditions completely and make NetworkPolicy
handling less error prone, this patch refactors NetworkPolicyController:
* Event handlers no longer update the storage of internal NetworkPolicy
  directly and only triggers resync of affected policies, which ensures
  that there is at most one worker handling an internal NetworkPolicy at
  any moment.
* Ensure atomicity when updating internal NetworkPolicy and creating or
  deleting AddressGroups and AppliedToGroups.

Duplicate code and tests are deleted with the refactoring.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn authored Aug 16, 2022
1 parent cab72fc commit d1c6a43
Show file tree
Hide file tree
Showing 15 changed files with 1,504 additions and 2,735 deletions.
176 changes: 67 additions & 109 deletions pkg/controller/networkpolicy/antreanetworkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,70 +24,38 @@ import (
"antrea.io/antrea/pkg/apis/controlplane"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
antreatypes "antrea.io/antrea/pkg/controller/types"
"antrea.io/antrea/pkg/util/k8s"
)

// addANP receives AntreaNetworkPolicy ADD events and creates resources
// which can be consumed by agents to configure corresponding rules on the Nodes.
func getANPReference(anp *crdv1alpha1.NetworkPolicy) *controlplane.NetworkPolicyReference {
return &controlplane.NetworkPolicyReference{
Type: controlplane.AntreaNetworkPolicy,
Namespace: anp.Namespace,
Name: anp.Name,
UID: anp.UID,
}
}

// addANP receives AntreaNetworkPolicy ADD events and enqueues a reference of
// the AntreaNetworkPolicy to trigger its process.
func (n *NetworkPolicyController) addANP(obj interface{}) {
defer n.heartbeat("addANP")
np := obj.(*crdv1alpha1.NetworkPolicy)
klog.Infof("Processing Antrea NetworkPolicy %s/%s ADD event", np.Namespace, np.Name)
// Create an internal NetworkPolicy object corresponding to this
// NetworkPolicy and enqueue task to internal NetworkPolicy Workqueue.
internalNP := n.processAntreaNetworkPolicy(np)
klog.V(2).Infof("Creating new internal NetworkPolicy %s for %s", internalNP.Name, internalNP.SourceRef.ToString())
n.internalNetworkPolicyStore.Create(internalNP)
key := internalNetworkPolicyKeyFunc(np)
n.enqueueInternalNetworkPolicy(key)
n.enqueueInternalNetworkPolicy(getANPReference(np))
}

// updateANP receives AntreaNetworkPolicy UPDATE events and updates resources
// which can be consumed by agents to configure corresponding rules on the Nodes.
// updateANP receives AntreaNetworkPolicy UPDATE events and enqueues a reference
// of the AntreaNetworkPolicy to trigger its process.
func (n *NetworkPolicyController) updateANP(old, cur interface{}) {
defer n.heartbeat("updateANP")
curNP := cur.(*crdv1alpha1.NetworkPolicy)
klog.Infof("Processing Antrea NetworkPolicy %s/%s UPDATE event", curNP.Namespace, curNP.Name)
// Update an internal NetworkPolicy, corresponding to this NetworkPolicy and
// enqueue task to internal NetworkPolicy Workqueue.
curInternalNP := n.processAntreaNetworkPolicy(curNP)
klog.V(2).Infof("Updating existing internal NetworkPolicy %s for %s", curInternalNP.Name, curInternalNP.SourceRef.ToString())
// Retrieve old crdv1alpha1.NetworkPolicy object.
oldNP := old.(*crdv1alpha1.NetworkPolicy)
// Old and current NetworkPolicy share the same key.
key := internalNetworkPolicyKeyFunc(oldNP)
// Lock access to internal NetworkPolicy store such that concurrent access
// to an internal NetworkPolicy is not allowed. This will avoid the
// case in which an Update to an internal NetworkPolicy object may
// cause the SpanMeta member to be overridden with stale SpanMeta members
// from an older internal NetworkPolicy.
n.internalNetworkPolicyMutex.Lock()
oldInternalNPObj, _, _ := n.internalNetworkPolicyStore.Get(key)
oldInternalNP := oldInternalNPObj.(*antreatypes.NetworkPolicy)
// Must preserve old internal NetworkPolicy Span.
curInternalNP.SpanMeta = oldInternalNP.SpanMeta
n.internalNetworkPolicyStore.Update(curInternalNP)
// Unlock the internal NetworkPolicy store.
n.internalNetworkPolicyMutex.Unlock()
// Enqueue addressGroup keys to update their Node span.
for _, rule := range curInternalNP.Rules {
for _, addrGroupName := range rule.From.AddressGroups {
n.enqueueAddressGroup(addrGroupName)
}
for _, addrGroupName := range rule.To.AddressGroups {
n.enqueueAddressGroup(addrGroupName)
}
}
n.enqueueInternalNetworkPolicy(key)
for _, atg := range oldInternalNP.AppliedToGroups {
// Delete the old AppliedToGroup object if it is not referenced
// by any internal NetworkPolicy.
n.deleteDereferencedAppliedToGroup(atg)
}
n.deleteDereferencedAddressGroups(oldInternalNP)
n.enqueueInternalNetworkPolicy(getANPReference(curNP))
}

// deleteANP receives AntreaNetworkPolicy DELETED events and deletes resources
// which can be consumed by agents to delete corresponding rules on the Nodes.
// deleteANP receives AntreaNetworkPolicy DELETE events and enqueues a reference
// of the AntreaNetworkPolicy to trigger its process.
func (n *NetworkPolicyController) deleteANP(old interface{}) {
np, ok := old.(*crdv1alpha1.NetworkPolicy)
if !ok {
Expand All @@ -104,33 +72,20 @@ func (n *NetworkPolicyController) deleteANP(old interface{}) {
}
defer n.heartbeat("deleteANP")
klog.Infof("Processing Antrea NetworkPolicy %s/%s DELETE event", np.Namespace, np.Name)
key := internalNetworkPolicyKeyFunc(np)
oldInternalNPObj, _, _ := n.internalNetworkPolicyStore.Get(key)
oldInternalNP := oldInternalNPObj.(*antreatypes.NetworkPolicy)
klog.V(2).Infof("Deleting internal NetworkPolicy %s for %s", oldInternalNP.Name, oldInternalNP.SourceRef.ToString())
err := n.internalNetworkPolicyStore.Delete(key)
if err != nil {
klog.Errorf("Error deleting internal NetworkPolicy during Antrea NetworkPolicy %s delete: %v", np.Name, err)
return
}
for _, atg := range oldInternalNP.AppliedToGroups {
n.deleteDereferencedAppliedToGroup(atg)
}
n.deleteDereferencedAddressGroups(oldInternalNP)
n.enqueueInternalNetworkPolicy(getANPReference(np))
}

// processAntreaNetworkPolicy creates an internal NetworkPolicy instance
// corresponding to the crdv1alpha1.NetworkPolicy object. This method
// does not commit the internal NetworkPolicy in store, instead returns an
// instance to the caller wherein, it will be either stored as a new Object
// in case of ADD event or modified and store the updated instance, in case
// of an UPDATE event.
func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.NetworkPolicy) *antreatypes.NetworkPolicy {
// instance to the caller.
func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.NetworkPolicy) (*antreatypes.NetworkPolicy, map[string]*antreatypes.AppliedToGroup, map[string]*antreatypes.AddressGroup) {
appliedToPerRule := len(np.Spec.AppliedTo) == 0
// appliedToGroupNames tracks all distinct appliedToGroups referred to by the Antrea NetworkPolicy,
// appliedToGroups tracks all distinct appliedToGroups referred to by the Antrea NetworkPolicy,
// either in the spec section or in ingress/egress rules.
// The span calculation and stale appliedToGroup cleanup logic would work seamlessly for both cases.
appliedToGroupNamesSet := sets.String{}
appliedToGroups := map[string]*antreatypes.AppliedToGroup{}
addressGroups := map[string]*antreatypes.AddressGroup{}
rules := make([]controlplane.NetworkPolicyRule, 0, len(np.Spec.Ingress)+len(np.Spec.Egress))
newUnrealizableInternalNetworkPolicy := func(err error) *antreatypes.NetworkPolicy {
return &antreatypes.NetworkPolicy{
Expand All @@ -147,54 +102,61 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net
}
}
// Create AppliedToGroup for each AppliedTo present in AntreaNetworkPolicy spec.
_, err := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo, appliedToGroupNamesSet)
atgs, err := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo)
if err != nil {
return newUnrealizableInternalNetworkPolicy(err)
return newUnrealizableInternalNetworkPolicy(err), nil, nil
}
appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...)
// Compute NetworkPolicyRule for Ingress Rule.
for idx, ingressRule := range np.Spec.Ingress {
// Set default action to ALLOW to allow traffic.
services, namedPortExists := toAntreaServicesForCRD(ingressRule.Ports, ingressRule.Protocols)
// Create AppliedToGroup for each AppliedTo present in the ingress rule.
atGroups, err := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo, appliedToGroupNamesSet)
atgs, err := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo)
if err != nil {
return newUnrealizableInternalNetworkPolicy(err)
return newUnrealizableInternalNetworkPolicy(err), nil, nil
}
appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...)
peer, ags := n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists)
addressGroups = mergeAddressGroups(addressGroups, ags...)
rules = append(rules, controlplane.NetworkPolicyRule{
Direction: controlplane.DirectionIn,
From: *n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists),
From: *peer,
Services: services,
Name: ingressRule.Name,
Action: ingressRule.Action,
Priority: int32(idx),
EnableLogging: ingressRule.EnableLogging,
AppliedToGroups: atGroups,
AppliedToGroups: getAppliedToGroupNames(atgs),
})
}
// Compute NetworkPolicyRule for Egress Rule.
for idx, egressRule := range np.Spec.Egress {
// Set default action to ALLOW to allow traffic.
services, namedPortExists := toAntreaServicesForCRD(egressRule.Ports, egressRule.Protocols)
// Create AppliedToGroup for each AppliedTo present in the egress rule.
atGroups, err := n.processAppliedTo(np.Namespace, egressRule.AppliedTo, appliedToGroupNamesSet)
atgs, err := n.processAppliedTo(np.Namespace, egressRule.AppliedTo)
if err != nil {
return newUnrealizableInternalNetworkPolicy(err)
return newUnrealizableInternalNetworkPolicy(err), nil, nil
}
var peers *controlplane.NetworkPolicyPeer
appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...)
var peer *controlplane.NetworkPolicyPeer
if egressRule.ToServices != nil {
peers = n.svcRefToPeerForCRD(egressRule.ToServices, np.Namespace)
peer = n.svcRefToPeerForCRD(egressRule.ToServices, np.Namespace)
} else {
peers = n.toAntreaPeerForCRD(egressRule.To, np, controlplane.DirectionOut, namedPortExists)
var ags []*antreatypes.AddressGroup
peer, ags = n.toAntreaPeerForCRD(egressRule.To, np, controlplane.DirectionOut, namedPortExists)
addressGroups = mergeAddressGroups(addressGroups, ags...)
}
rules = append(rules, controlplane.NetworkPolicyRule{
Direction: controlplane.DirectionOut,
To: *peers,
To: *peer,
Services: services,
Name: egressRule.Name,
Action: egressRule.Action,
Priority: int32(idx),
EnableLogging: egressRule.EnableLogging,
AppliedToGroups: atGroups,
AppliedToGroups: getAppliedToGroupNames(atgs),
})
}
tierPriority := n.getTierPriority(np.Spec.Tier)
Expand All @@ -208,34 +170,33 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net
Name: internalNetworkPolicyKeyFunc(np),
UID: np.UID,
Generation: np.Generation,
AppliedToGroups: appliedToGroupNamesSet.List(),
AppliedToGroups: sets.StringKeySet(appliedToGroups).List(),
Rules: rules,
Priority: &np.Spec.Priority,
TierPriority: &tierPriority,
AppliedToPerRule: appliedToPerRule,
}
return internalNetworkPolicy
return internalNetworkPolicy, appliedToGroups, addressGroups
}

func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo []crdv1alpha1.NetworkPolicyPeer, appliedToGroupNamesSet sets.String) ([]string, error) {
var appliedToGroupNames []string
func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo []crdv1alpha1.NetworkPolicyPeer) ([]*antreatypes.AppliedToGroup, error) {
var appliedToGroups []*antreatypes.AppliedToGroup
for _, at := range appliedTo {
var atg string
var atg *antreatypes.AppliedToGroup
if at.Group != "" {
var err error
atg, err = n.processAppliedToGroupForNamespacedGroup(namespace, at.Group)
atg, err = n.createAppliedToGroupForNamespacedGroup(namespace, at.Group)
if err != nil {
return appliedToGroupNames, err
return nil, err
}
} else {
atg = n.createAppliedToGroup(namespace, at.PodSelector, at.NamespaceSelector, at.ExternalEntitySelector)
}
if atg != "" {
appliedToGroupNames = append(appliedToGroupNames, atg)
appliedToGroupNamesSet.Insert(atg)
if atg != nil {
appliedToGroups = append(appliedToGroups, atg)
}
}
return appliedToGroupNames, nil
return appliedToGroups, nil
}

// ErrNetworkPolicyAppliedToUnsupportedGroup is an error response when
Expand All @@ -249,26 +210,23 @@ func (e ErrNetworkPolicyAppliedToUnsupportedGroup) Error() string {
return fmt.Sprintf("group %s/%s with IPBlocks or NamespaceSelector can not be used as AppliedTo", e.namespace, e.groupName)
}

func (n *NetworkPolicyController) processAppliedToGroupForNamespacedGroup(namespace, groupName string) (string, error) {
// Retrieve Group for corresponding entry in the AppliedToGroup.
g, err := n.grpLister.Groups(namespace).Get(groupName)
if err != nil {
// The Group referred to has not been created yet.
return "", nil
}
key := internalGroupKeyFunc(g)
// Find the internal Group corresponding to this Group
func (n *NetworkPolicyController) createAppliedToGroupForNamespacedGroup(namespace, groupName string) (*antreatypes.AppliedToGroup, error) {
// Namespaced group uses NAMESPACE/NAME as the key of the corresponding internal group.
key := k8s.NamespacedName(namespace, groupName)
// Find the internal Group corresponding to this Group.
// There is no need to check if the namespaced group exists in groupLister because its existence will eventually be
// reflected in internalGroupStore.
ig, found, _ := n.internalGroupStore.Get(key)
if !found {
// Internal Group was not found. Once the internal Group is created, the sync
// worker for internal group will re-enqueue the ClusterNetworkPolicy processing
// which will trigger the creation of AddressGroup.
return "", nil
// Internal Group is not found, which means the corresponding namespaced group is either not created yet or not
// processed yet. Once the internal Group is created and processed, the sync worker for internal group will
// re-enqueue the ClusterNetworkPolicy processing which will trigger the creation of AppliedToGroup.
return nil, nil
}
intGrp := ig.(*antreatypes.Group)
if len(intGrp.IPBlocks) > 0 || (intGrp.Selector != nil && intGrp.Selector.NamespaceSelector != nil) {
klog.V(2).InfoS("Group with IPBlocks or NamespaceSelector can not be used as AppliedTo", "Group", g)
return "", ErrNetworkPolicyAppliedToUnsupportedGroup{namespace: namespace, groupName: groupName}
klog.V(2).InfoS("Group with IPBlocks or NamespaceSelector can not be used as AppliedTo", "Group", key)
return nil, ErrNetworkPolicyAppliedToUnsupportedGroup{namespace: namespace, groupName: groupName}
}
return n.createAppliedToGroupForInternalGroup(intGrp), nil
return &antreatypes.AppliedToGroup{UID: intGrp.UID, Name: key}, nil
}
Loading

0 comments on commit d1c6a43

Please sign in to comment.