From 05b7d6928e889850a27843bd51334417bc9a10c9 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 16 Nov 2023 16:41:03 +0800 Subject: [PATCH] Store NetworkPolicy in filesystem as a fail-safe data source Signed-off-by: Quan Tian --- pkg/agent/controller/networkpolicy/cache.go | 221 ++++++++++++++++++ .../networkpolicy/networkpolicy_controller.go | 2 +- 2 files changed, 222 insertions(+), 1 deletion(-) diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index efe4d254cb7..55762a89c49 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -19,9 +19,13 @@ import ( "encoding/hex" "encoding/json" "fmt" + "io" + "io/fs" + "os" "strings" "sync" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -47,6 +51,13 @@ const ( toIGMPReportGroupAddressIndex = "toIGMPReportGroupAddress" ) +const ( + dataDir = "/var/run/antrea/networkpolicy" + networkPoliciesDir = dataDir + "/network-policies" + appliedToGroupsDir = dataDir + "/applied-to-groups" + addressGroupsDir = dataDir + "/address-groups" +) + // rule is the struct stored in ruleCache, it contains necessary information // to construct a complete rule that can be used by reconciler to enforce. // The K8s NetworkPolicy object doesn't provide ID for its rule, here we @@ -185,6 +196,8 @@ func (r *CompletedRule) isIGMPEgressPolicyRule() bool { // ruleCache caches Antrea AddressGroups, AppliedToGroups and NetworkPolicies, // can construct complete rules that can be used by reconciler to enforce. type ruleCache struct { + fs afero.Fs + appliedToSetLock sync.RWMutex // appliedToSetByGroup stores the AppliedToGroup members. // It is a mapping from group name to a set of GroupMembers. @@ -415,6 +428,7 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub }, ) cache := &ruleCache{ + fs: afero.NewOsFs(), appliedToSetByGroup: make(map[string]v1beta.GroupMemberSet), addressSetByGroup: make(map[string]v1beta.GroupMemberSet), policyMap: make(map[string]*v1beta.NetworkPolicy), @@ -430,6 +444,98 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub externalEntityUpdateSubscriber.Subscribe(cache.processExternalEntityUpdate) } + for _, dir := range []string{networkPoliciesDir, appliedToGroupsDir, addressGroupsDir} { + klog.InfoS("Creating directory for NetworkPolicy cache", "dir", dir) + if err := cache.fs.MkdirAll(dir, 0o755); err != nil { + klog.ErrorS(err, "Failed to create directory for NetworkPolicy cache", "dir", dir) + return nil + } + } + + var networkPolicies []*v1beta.NetworkPolicy + err := afero.Walk(cache.fs, networkPoliciesDir, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + file, err := cache.fs.Open(path) + if err != nil { + return err + } + defer file.Close() + data, err := io.ReadAll(file) + if err != nil { + return err + } + var object v1beta.NetworkPolicy + err = json.Unmarshal(data, &object) + if err != nil { + return err + } + networkPolicies = append(networkPolicies, &object) + return nil + }) + if err != nil { + klog.ErrorS(err, "Failed to restore NetworkPolicy cache from files") + } else { + cache.ReplaceNetworkPolicies(networkPolicies) + } + + var appliedToGroups []*v1beta.AppliedToGroup + err = afero.Walk(cache.fs, appliedToGroupsDir, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + file, err := cache.fs.Open(path) + if err != nil { + return err + } + defer file.Close() + data, err := io.ReadAll(file) + if err != nil { + return err + } + var object v1beta.AppliedToGroup + err = json.Unmarshal(data, &object) + if err != nil { + return err + } + appliedToGroups = append(appliedToGroups, &object) + return nil + }) + if err != nil { + klog.ErrorS(err, "Failed to restore AppliedToGroup cache from files") + } else { + cache.ReplaceAppliedToGroups(appliedToGroups) + } + + var addressGroups []*v1beta.AddressGroup + err = afero.Walk(cache.fs, addressGroupsDir, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + file, err := cache.fs.Open(path) + if err != nil { + return err + } + defer file.Close() + data, err := io.ReadAll(file) + if err != nil { + return err + } + var object v1beta.AddressGroup + err = json.Unmarshal(data, &object) + if err != nil { + return err + } + addressGroups = append(addressGroups, &object) + return nil + }) + if err != nil { + klog.ErrorS(err, "Failed to restore AddressGroup cache from files") + } else { + cache.ReplaceAddressGroups(addressGroups) + } + go cache.processGroupIDUpdates() return cache } @@ -531,6 +637,7 @@ func (c *ruleCache) AddAddressGroup(group *v1beta.AddressGroup) error { } func (c *ruleCache) addAddressGroupLocked(group *v1beta.AddressGroup) error { + defer c.updateAddressGroupToFile(group) groupMemberSet := v1beta.GroupMemberSet{} for i := range group.GroupMembers { // Must not store address of loop iterator variable as it's the same @@ -567,6 +674,17 @@ func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { } c.onAddressGroupUpdate(patch.Name) + + members := make([]v1beta.GroupMember, 0, len(groupMemberSet)) + for _, member := range groupMemberSet { + members = append(members, *member) + } + group := &v1beta.AddressGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + c.updateAddressGroupToFile(group) + return nil } @@ -574,6 +692,7 @@ func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { // It should only happen when a group is no longer referenced by any rule, so // no need to mark dirty rules. func (c *ruleCache) DeleteAddressGroup(group *v1beta.AddressGroup) error { + defer c.deleteAddressGroupFromFile(group) c.addressSetLock.Lock() defer c.addressSetLock.Unlock() @@ -623,7 +742,66 @@ func (c *ruleCache) AddAppliedToGroup(group *v1beta.AppliedToGroup) error { return c.addAppliedToGroupLocked(group) } +func (c *ruleCache) deleteAppliedToGroupFromFile(group *v1beta.AppliedToGroup) { + path := fmt.Sprintf("%s/%s", appliedToGroupsDir, group.Name) + err := c.fs.Remove(path) + if err != nil { + klog.ErrorS(err, "Failed to delete AppliedToGroup file", "group", group.Name) + return + } +} + +func (c *ruleCache) updateAppliedToGroupToFile(group *v1beta.AppliedToGroup) { + path := fmt.Sprintf("%s/%s", appliedToGroupsDir, group.Name) + file, err := c.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + klog.ErrorS(err, "Failed to open AppliedToGroup file", "group", group.Name) + return + } + defer file.Close() + b, err := json.Marshal(group) + if err != nil { + klog.ErrorS(err, "Failed to encode AppliedToGroup", "group", group.Name) + return + } + _, err = file.Write(b) + if err != nil { + klog.ErrorS(err, "Failed to write AppliedToGroup to file", "group", group.Name) + return + } +} + +func (c *ruleCache) deleteAddressGroupFromFile(group *v1beta.AddressGroup) { + path := fmt.Sprintf("%s/%s", addressGroupsDir, group.Name) + err := c.fs.Remove(path) + if err != nil { + klog.ErrorS(err, "Failed to delete AddressGroup file", "group", group.Name) + return + } +} + +func (c *ruleCache) updateAddressGroupToFile(group *v1beta.AddressGroup) { + path := fmt.Sprintf("%s/%s", addressGroupsDir, group.Name) + file, err := c.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + klog.ErrorS(err, "Failed to open AddressGroup file", "group", group.Name) + return + } + defer file.Close() + b, err := json.Marshal(group) + if err != nil { + klog.ErrorS(err, "Failed to encode AddressGroup", "group", group.Name) + return + } + _, err = file.Write(b) + if err != nil { + klog.ErrorS(err, "Failed to write AddressGroup to file", "group", group.Name) + return + } +} + func (c *ruleCache) addAppliedToGroupLocked(group *v1beta.AppliedToGroup) error { + defer c.updateAppliedToGroupToFile(group) memberSet := v1beta.GroupMemberSet{} for i := range group.GroupMembers { memberSet.Insert(&group.GroupMembers[i]) @@ -654,12 +832,23 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error memberSet.Delete(&patch.RemovedGroupMembers[i]) } c.onAppliedToGroupUpdate(patch.Name) + + members := make([]v1beta.GroupMember, 0, len(memberSet)) + for _, member := range memberSet { + members = append(members, *member) + } + group := &v1beta.AppliedToGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + c.updateAppliedToGroupToFile(group) return nil } // DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup. // It may be called when a rule becomes ineffective, so it needs to mark dirty rules. func (c *ruleCache) DeleteAppliedToGroup(group *v1beta.AppliedToGroup) error { + defer c.deleteAppliedToGroupFromFile(group) c.appliedToSetLock.Lock() defer c.appliedToSetLock.Unlock() @@ -766,8 +955,38 @@ func (c *ruleCache) UpdateNetworkPolicy(policy *v1beta.NetworkPolicy) bool { return c.updateNetworkPolicyLocked(policy) } +func (c *ruleCache) updateNetworkPolicyToFile(policy *v1beta.NetworkPolicy) { + path := fmt.Sprintf("%s/%s", networkPoliciesDir, policy.Name) + file, err := c.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + klog.ErrorS(err, "Failed to open NetworkPolicy file", "policy", policy.Name) + return + } + defer file.Close() + b, err := json.Marshal(policy) + if err != nil { + klog.ErrorS(err, "Failed to encode NetworkPolicy", "policy", policy.Name) + return + } + _, err = file.Write(b) + if err != nil { + klog.ErrorS(err, "Failed to write NetworkPolicy to file", "policy", policy.Name) + return + } +} + +func (c *ruleCache) deleteNetworkPolicyFromFile(policy *v1beta.NetworkPolicy) { + path := fmt.Sprintf("%s/%s", networkPoliciesDir, policy.Name) + err := c.fs.Remove(path) + if err != nil { + klog.ErrorS(err, "Failed to delete NetworkPolicy file", "policy", policy.Name) + return + } +} + // updateNetworkPolicyLocked returns whether any rule or the generation changes. func (c *ruleCache) updateNetworkPolicyLocked(policy *v1beta.NetworkPolicy) bool { + defer c.updateNetworkPolicyToFile(policy) oldPolicy, exists := c.policyMap[string(policy.UID)] generationUpdated := !exists || oldPolicy.Generation != policy.Generation c.policyMap[string(policy.UID)] = policy @@ -818,6 +1037,8 @@ func (c *ruleCache) updateNetworkPolicyLocked(policy *v1beta.NetworkPolicy) bool // DeleteNetworkPolicy deletes a cached *v1beta.NetworkPolicy. // All its rules will be regarded as dirty. func (c *ruleCache) DeleteNetworkPolicy(policy *v1beta.NetworkPolicy) { + defer c.deleteNetworkPolicyFromFile(policy) + c.policyMapLock.Lock() defer c.policyMapLock.Unlock() diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index f25446e30d2..2c1c3bcad9d 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -499,7 +499,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { go c.fqdnController.runRuleSyncTracker(stopCh) } klog.Infof("Waiting for all watchers to complete full sync") - c.fullSyncGroup.Wait() + //c.fullSyncGroup.Wait() klog.Infof("All watchers have completed full sync, installing flows for init events") // Batch install all rules in queue after fullSync is finished. c.processAllItemsInQueue()