Skip to content

Commit

Permalink
Fix deadlock issue and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: wgrayson <[email protected]>
  • Loading branch information
GraysonWu committed Jul 19, 2022
1 parent b72a300 commit 0cc328f
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 49 deletions.
89 changes: 46 additions & 43 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -42,7 +43,6 @@ const (
addressGroupIndex = "addressGroup"
policyIndex = "policy"
toServicesIndex = "toServices"
appliedToServicesIndex = "appliedToServices"
toIGMPReportGroupAddressIndex = "toIGMPReportGroupAddress"
)

Expand Down Expand Up @@ -377,24 +377,6 @@ func toServicesIndexFunc(obj interface{}) ([]string, error) {
return toSvcNamespacedName.UnsortedList(), nil
}

// toServicesIndexFunc knows how to get NamespacedNames of Services referred in
// ToServices field of a *rule. It's provided to cache.Indexer to build an index of
// NetworkPolicy.
func (r *ruleCache) appliedToServicesIndexFunc(obj interface{}) ([]string, error) {
rule := obj.(*rule)
appliedToSvcNamespacedName := sets.String{}
memberSet, exist := r.unionAppliedToGroups(rule.AppliedToGroups)
if !exist {
return []string{}, nil
}
for _, member := range memberSet.Items() {
if member.Service != nil {
appliedToSvcNamespacedName.Insert(k8s.NamespacedName(member.Service.Namespace, member.Service.Name))
}
}
return appliedToSvcNamespacedName.UnsortedList(), nil
}

// toIGMPReportGroupAddressIndexFunc knows how to get IGMP report groupAddresses of a *rule
// It's provided to cache.Indexer to build an index of NetworkPolicy.
func toIGMPReportGroupAddressIndexFunc(obj interface{}) ([]string, error) {
Expand All @@ -414,25 +396,24 @@ func toIGMPReportGroupAddressIndexFunc(obj interface{}) ([]string, error) {

// newRuleCache returns a new *ruleCache.
func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, serviceGroupIDUpdate <-chan string) *ruleCache {
r := &ruleCache{
appliedToSetByGroup: make(map[string]v1beta.GroupMemberSet),
addressSetByGroup: make(map[string]v1beta.GroupMemberSet),
policyMap: make(map[string]*v1beta.NetworkPolicy),
dirtyRuleHandler: dirtyRuleHandler,
groupIDUpdates: serviceGroupIDUpdate,
}
rules := cache.NewIndexer(
ruleKeyFunc,
cache.Indexers{
addressGroupIndex: addressGroupIndexFunc,
appliedToGroupIndex: appliedToGroupIndexFunc,
policyIndex: policyIndexFunc,
toServicesIndex: toServicesIndexFunc,
appliedToServicesIndex: r.appliedToServicesIndexFunc,
toIGMPReportGroupAddressIndex: toIGMPReportGroupAddressIndexFunc,
},
)
r.rules = rules
r := &ruleCache{
appliedToSetByGroup: make(map[string]v1beta.GroupMemberSet),
addressSetByGroup: make(map[string]v1beta.GroupMemberSet),
policyMap: make(map[string]*v1beta.NetworkPolicy),
rules: rules,
dirtyRuleHandler: dirtyRuleHandler,
groupIDUpdates: serviceGroupIDUpdate,
}
// Subscribe Pod update events from CNIServer.
podUpdateSubscriber.Subscribe(r.processPodUpdate)
go r.processGroupIDUpdates()
Expand Down Expand Up @@ -470,21 +451,7 @@ func (c *ruleCache) processGroupIDUpdates() {
for {
select {
case svcStr := <-c.groupIDUpdates:
toSvcRules, err := c.rules.ByIndex(toServicesIndex, svcStr)
if err != nil {
continue
}
for _, toSvcRule := range toSvcRules {
c.dirtyRuleHandler(toSvcRule.(*rule).ID)
}

appliedToSvcRules, err := c.rules.ByIndex(appliedToServicesIndex, svcStr)
if err != nil {
continue
}
for _, appliedToSvcRule := range appliedToSvcRules {
c.dirtyRuleHandler(appliedToSvcRule.(*rule).ID)
}
c.processServiceGroupIDUpdate(svcStr)
}
}
}
Expand Down Expand Up @@ -939,3 +906,39 @@ func (c *ruleCache) unionAppliedToGroups(groupNames []string) (v1beta.GroupMembe
}
return set, anyExists
}

// processServiceGroupIDUpdate gets names of AppliedToGroup by Service NamespacedName.
func (c *ruleCache) processServiceGroupIDUpdate(svcStr string) {
c.appliedToSetLock.RLock()
defer c.appliedToSetLock.RUnlock()

// Reprocess rules if the Service referred by this rule's ToServices has updated.
toSvcRules, err := c.rules.ByIndex(toServicesIndex, svcStr)
if err != nil {
return
}
for _, toSvcRule := range toSvcRules {
c.dirtyRuleHandler(toSvcRule.(*rule).ID)
}

// Reprocess rules if the Service referred by rule's AppliedToGroup has updated.
strListSvcRef := strings.Split(svcStr, "/")
var name, ns string
if len(strListSvcRef) == 2 {
ns = strListSvcRef[0]
name = strListSvcRef[1]
} else if len(strListSvcRef) == 1 {
name = strListSvcRef[1]
}
member := &v1beta.GroupMember{
Service: &v1beta.ServiceReference{
Name: name,
Namespace: ns,
},
}
for group, memberSet := range c.appliedToSetByGroup {
if memberSet.Has(member) {
c.onAppliedToGroupUpdate(group)
}
}
}
3 changes: 2 additions & 1 deletion pkg/apis/controlplane/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type GroupMember struct {
IPs []IPAddress
// Ports is the list NamedPort of the GroupMember.
Ports []NamedPort
// Service maintains the reference to the Service.
// Service is the reference to the Service. It can only be used in an AppliedTo
// Group and only a NodePort type Service can be referred by this field.
Service *ServiceReference
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/controlplane/v1beta2/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/apis/controlplane/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type GroupMember struct {
Ports []NamedPort `json:"ports,omitempty" protobuf:"bytes,4,rep,name=ports"`
// Node maintains the reference to the Node.
Node *NodeReference `json:"node,omitempty" protobuf:"bytes,5,opt,name=node"`
// Service maintains the reference to the Service.
// Service is the reference to the Service. It can only be used in an AppliedTo
// Group and only a NodePort type Service can be referred by this field.
Service *ServiceReference `json:"service,omitempty" protobuf:"bytes,6,opt,name=service"`
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/crd/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,10 @@ type NetworkPolicyPeer struct {
// A NodeSelector cannot be set in AppliedTo field or set with any other selector.
// +optional
NodeSelector *metav1.LabelSelector `json:"nodeSelector,omitempty"`
// Select a certain Service which match the NamespacedName.
// Select a certain Service which matches the NamespacedName.
// A Service can only be set in either policy level AppliedTo field in a policy
// that only has ingress rules or rule level AppliedTo field in an ingress rule.
// Only a NodePort Service can be referred by this field.
// Cannot be set with any other selector.
// +optional
Service *NamespacedName `json:"service,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/controller/types/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type AppliedToGroup struct {
// Selector describes how the group selects pods.
// Selector can't be used with Service.
Selector *GroupSelector
// Service describes the Service this group selects.
// Service refers to the Service this group selects. Only a NodePort type Service
// can be referred by this field.
// Service can't be used with Selector.
Service *controlplane.ServiceReference
// GroupMemberByNode is a mapping from nodeName to a set of GroupMembers on the Node,
Expand Down

0 comments on commit 0cc328f

Please sign in to comment.