Skip to content

Commit

Permalink
Store NetworkPolicy in filesystem as a fail-safe data source
Browse files Browse the repository at this point in the history
Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Nov 16, 2023
1 parent bdf2d6b commit 05b7d69
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 1 deletion.
221 changes: 221 additions & 0 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/fs"
"os"
"strings"
"sync"

"github.com/spf13/afero"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -47,6 +51,13 @@ const (
toIGMPReportGroupAddressIndex = "toIGMPReportGroupAddress"
)

const (
dataDir = "/var/run/antrea/networkpolicy"
networkPoliciesDir = dataDir + "/network-policies"
appliedToGroupsDir = dataDir + "/applied-to-groups"
addressGroupsDir = dataDir + "/address-groups"
)

// rule is the struct stored in ruleCache, it contains necessary information
// to construct a complete rule that can be used by reconciler to enforce.
// The K8s NetworkPolicy object doesn't provide ID for its rule, here we
Expand Down Expand Up @@ -185,6 +196,8 @@ func (r *CompletedRule) isIGMPEgressPolicyRule() bool {
// ruleCache caches Antrea AddressGroups, AppliedToGroups and NetworkPolicies,
// can construct complete rules that can be used by reconciler to enforce.
type ruleCache struct {
fs afero.Fs

appliedToSetLock sync.RWMutex
// appliedToSetByGroup stores the AppliedToGroup members.
// It is a mapping from group name to a set of GroupMembers.
Expand Down Expand Up @@ -415,6 +428,7 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub
},
)
cache := &ruleCache{
fs: afero.NewOsFs(),
appliedToSetByGroup: make(map[string]v1beta.GroupMemberSet),
addressSetByGroup: make(map[string]v1beta.GroupMemberSet),
policyMap: make(map[string]*v1beta.NetworkPolicy),
Expand All @@ -430,6 +444,98 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub
externalEntityUpdateSubscriber.Subscribe(cache.processExternalEntityUpdate)
}

for _, dir := range []string{networkPoliciesDir, appliedToGroupsDir, addressGroupsDir} {
klog.InfoS("Creating directory for NetworkPolicy cache", "dir", dir)
if err := cache.fs.MkdirAll(dir, 0o755); err != nil {
klog.ErrorS(err, "Failed to create directory for NetworkPolicy cache", "dir", dir)
return nil
}
}

var networkPolicies []*v1beta.NetworkPolicy
err := afero.Walk(cache.fs, networkPoliciesDir, func(path string, info fs.FileInfo, err error) error {
if info.IsDir() {
return nil
}
file, err := cache.fs.Open(path)
if err != nil {
return err
}
defer file.Close()
data, err := io.ReadAll(file)
if err != nil {
return err
}
var object v1beta.NetworkPolicy
err = json.Unmarshal(data, &object)
if err != nil {
return err
}
networkPolicies = append(networkPolicies, &object)
return nil
})
if err != nil {
klog.ErrorS(err, "Failed to restore NetworkPolicy cache from files")
} else {
cache.ReplaceNetworkPolicies(networkPolicies)
}

var appliedToGroups []*v1beta.AppliedToGroup
err = afero.Walk(cache.fs, appliedToGroupsDir, func(path string, info fs.FileInfo, err error) error {
if info.IsDir() {
return nil
}
file, err := cache.fs.Open(path)
if err != nil {
return err
}
defer file.Close()
data, err := io.ReadAll(file)
if err != nil {
return err
}
var object v1beta.AppliedToGroup
err = json.Unmarshal(data, &object)
if err != nil {
return err
}
appliedToGroups = append(appliedToGroups, &object)
return nil
})
if err != nil {
klog.ErrorS(err, "Failed to restore AppliedToGroup cache from files")
} else {
cache.ReplaceAppliedToGroups(appliedToGroups)
}

var addressGroups []*v1beta.AddressGroup
err = afero.Walk(cache.fs, addressGroupsDir, func(path string, info fs.FileInfo, err error) error {
if info.IsDir() {
return nil
}
file, err := cache.fs.Open(path)
if err != nil {
return err
}
defer file.Close()
data, err := io.ReadAll(file)
if err != nil {
return err
}
var object v1beta.AddressGroup
err = json.Unmarshal(data, &object)
if err != nil {
return err
}
addressGroups = append(addressGroups, &object)
return nil
})
if err != nil {
klog.ErrorS(err, "Failed to restore AddressGroup cache from files")
} else {
cache.ReplaceAddressGroups(addressGroups)
}

go cache.processGroupIDUpdates()
return cache
}
Expand Down Expand Up @@ -531,6 +637,7 @@ func (c *ruleCache) AddAddressGroup(group *v1beta.AddressGroup) error {
}

func (c *ruleCache) addAddressGroupLocked(group *v1beta.AddressGroup) error {
defer c.updateAddressGroupToFile(group)
groupMemberSet := v1beta.GroupMemberSet{}
for i := range group.GroupMembers {
// Must not store address of loop iterator variable as it's the same
Expand Down Expand Up @@ -567,13 +674,25 @@ func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error {
}

c.onAddressGroupUpdate(patch.Name)

members := make([]v1beta.GroupMember, 0, len(groupMemberSet))
for _, member := range groupMemberSet {
members = append(members, *member)
}
group := &v1beta.AddressGroup{
ObjectMeta: patch.ObjectMeta,
GroupMembers: members,
}
c.updateAddressGroupToFile(group)

return nil
}

// DeleteAddressGroup deletes a cached *v1beta.AddressGroup.
// It should only happen when a group is no longer referenced by any rule, so
// no need to mark dirty rules.
func (c *ruleCache) DeleteAddressGroup(group *v1beta.AddressGroup) error {
defer c.deleteAddressGroupFromFile(group)
c.addressSetLock.Lock()
defer c.addressSetLock.Unlock()

Expand Down Expand Up @@ -623,7 +742,66 @@ func (c *ruleCache) AddAppliedToGroup(group *v1beta.AppliedToGroup) error {
return c.addAppliedToGroupLocked(group)
}

func (c *ruleCache) deleteAppliedToGroupFromFile(group *v1beta.AppliedToGroup) {
path := fmt.Sprintf("%s/%s", appliedToGroupsDir, group.Name)
err := c.fs.Remove(path)
if err != nil {
klog.ErrorS(err, "Failed to delete AppliedToGroup file", "group", group.Name)
return
}
}

func (c *ruleCache) updateAppliedToGroupToFile(group *v1beta.AppliedToGroup) {
path := fmt.Sprintf("%s/%s", appliedToGroupsDir, group.Name)
file, err := c.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
if err != nil {
klog.ErrorS(err, "Failed to open AppliedToGroup file", "group", group.Name)
return
}
defer file.Close()
b, err := json.Marshal(group)
if err != nil {
klog.ErrorS(err, "Failed to encode AppliedToGroup", "group", group.Name)
return
}
_, err = file.Write(b)
if err != nil {
klog.ErrorS(err, "Failed to write AppliedToGroup to file", "group", group.Name)
return
}
}

func (c *ruleCache) deleteAddressGroupFromFile(group *v1beta.AddressGroup) {
path := fmt.Sprintf("%s/%s", addressGroupsDir, group.Name)
err := c.fs.Remove(path)
if err != nil {
klog.ErrorS(err, "Failed to delete AddressGroup file", "group", group.Name)
return
}
}

func (c *ruleCache) updateAddressGroupToFile(group *v1beta.AddressGroup) {
path := fmt.Sprintf("%s/%s", addressGroupsDir, group.Name)
file, err := c.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
if err != nil {
klog.ErrorS(err, "Failed to open AddressGroup file", "group", group.Name)
return
}
defer file.Close()
b, err := json.Marshal(group)
if err != nil {
klog.ErrorS(err, "Failed to encode AddressGroup", "group", group.Name)
return
}
_, err = file.Write(b)
if err != nil {
klog.ErrorS(err, "Failed to write AddressGroup to file", "group", group.Name)
return
}
}

func (c *ruleCache) addAppliedToGroupLocked(group *v1beta.AppliedToGroup) error {
defer c.updateAppliedToGroupToFile(group)
memberSet := v1beta.GroupMemberSet{}
for i := range group.GroupMembers {
memberSet.Insert(&group.GroupMembers[i])
Expand Down Expand Up @@ -654,12 +832,23 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error
memberSet.Delete(&patch.RemovedGroupMembers[i])
}
c.onAppliedToGroupUpdate(patch.Name)

members := make([]v1beta.GroupMember, 0, len(memberSet))
for _, member := range memberSet {
members = append(members, *member)
}
group := &v1beta.AppliedToGroup{
ObjectMeta: patch.ObjectMeta,
GroupMembers: members,
}
c.updateAppliedToGroupToFile(group)
return nil
}

// DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup.
// It may be called when a rule becomes ineffective, so it needs to mark dirty rules.
func (c *ruleCache) DeleteAppliedToGroup(group *v1beta.AppliedToGroup) error {
defer c.deleteAppliedToGroupFromFile(group)
c.appliedToSetLock.Lock()
defer c.appliedToSetLock.Unlock()

Expand Down Expand Up @@ -766,8 +955,38 @@ func (c *ruleCache) UpdateNetworkPolicy(policy *v1beta.NetworkPolicy) bool {
return c.updateNetworkPolicyLocked(policy)
}

func (c *ruleCache) updateNetworkPolicyToFile(policy *v1beta.NetworkPolicy) {
path := fmt.Sprintf("%s/%s", networkPoliciesDir, policy.Name)
file, err := c.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
if err != nil {
klog.ErrorS(err, "Failed to open NetworkPolicy file", "policy", policy.Name)
return
}
defer file.Close()
b, err := json.Marshal(policy)
if err != nil {
klog.ErrorS(err, "Failed to encode NetworkPolicy", "policy", policy.Name)
return
}
_, err = file.Write(b)
if err != nil {
klog.ErrorS(err, "Failed to write NetworkPolicy to file", "policy", policy.Name)
return
}
}

func (c *ruleCache) deleteNetworkPolicyFromFile(policy *v1beta.NetworkPolicy) {
path := fmt.Sprintf("%s/%s", networkPoliciesDir, policy.Name)
err := c.fs.Remove(path)
if err != nil {
klog.ErrorS(err, "Failed to delete NetworkPolicy file", "policy", policy.Name)
return
}
}

// updateNetworkPolicyLocked returns whether any rule or the generation changes.
func (c *ruleCache) updateNetworkPolicyLocked(policy *v1beta.NetworkPolicy) bool {
defer c.updateNetworkPolicyToFile(policy)
oldPolicy, exists := c.policyMap[string(policy.UID)]
generationUpdated := !exists || oldPolicy.Generation != policy.Generation
c.policyMap[string(policy.UID)] = policy
Expand Down Expand Up @@ -818,6 +1037,8 @@ func (c *ruleCache) updateNetworkPolicyLocked(policy *v1beta.NetworkPolicy) bool
// DeleteNetworkPolicy deletes a cached *v1beta.NetworkPolicy.
// All its rules will be regarded as dirty.
func (c *ruleCache) DeleteNetworkPolicy(policy *v1beta.NetworkPolicy) {
defer c.deleteNetworkPolicyFromFile(policy)

c.policyMapLock.Lock()
defer c.policyMapLock.Unlock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
go c.fqdnController.runRuleSyncTracker(stopCh)
}
klog.Infof("Waiting for all watchers to complete full sync")
c.fullSyncGroup.Wait()
//c.fullSyncGroup.Wait()
klog.Infof("All watchers have completed full sync, installing flows for init events")
// Batch install all rules in queue after fullSync is finished.
c.processAllItemsInQueue()
Expand Down

0 comments on commit 05b7d69

Please sign in to comment.