diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 82d4b2d3e01..92b7ae82710 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -20,6 +20,7 @@ import ( "net" "time" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/sets" @@ -459,6 +460,7 @@ func run(o *Options) error { antreaClientProvider, ofClient, ifaceStore, + afero.NewOsFs(), nodeKey, podUpdateChannel, externalEntityUpdateChannel, diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index efe4d254cb7..cf8f886e8de 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -551,13 +551,14 @@ func (c *ruleCache) addAddressGroupLocked(group *v1beta.AddressGroup) error { // PatchAddressGroup updates a cached *v1beta.AddressGroup. // The rules referencing it will be regarded as dirty. -func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { +// It returns a copy of the patched AddressGroup, or an error if the AddressGroup doesn't exist. +func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) (*v1beta.AddressGroup, error) { c.addressSetLock.Lock() defer c.addressSetLock.Unlock() groupMemberSet, exists := c.addressSetByGroup[patch.Name] if !exists { - return fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name) + return nil, fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name) } for i := range patch.AddedGroupMembers { groupMemberSet.Insert(&patch.AddedGroupMembers[i]) @@ -567,7 +568,16 @@ func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { } c.onAddressGroupUpdate(patch.Name) - return nil + + members := make([]v1beta.GroupMember, 0, len(groupMemberSet)) + for _, member := range groupMemberSet { + members = append(members, *member) + } + group := &v1beta.AddressGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + return group, nil } // DeleteAddressGroup deletes a cached *v1beta.AddressGroup. @@ -639,13 +649,14 @@ func (c *ruleCache) addAppliedToGroupLocked(group *v1beta.AppliedToGroup) error // PatchAppliedToGroup updates a cached *v1beta.AppliedToGroupPatch. // The rules referencing it will be regarded as dirty. -func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error { +// It returns a copy of the patched AppliedToGroup, or an error if the AppliedToGroup doesn't exist. +func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) (*v1beta.AppliedToGroup, error) { c.appliedToSetLock.Lock() defer c.appliedToSetLock.Unlock() memberSet, exists := c.appliedToSetByGroup[patch.Name] if !exists { - return fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name) + return nil, fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name) } for i := range patch.AddedGroupMembers { memberSet.Insert(&patch.AddedGroupMembers[i]) @@ -654,7 +665,16 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error memberSet.Delete(&patch.RemovedGroupMembers[i]) } c.onAppliedToGroupUpdate(patch.Name) - return nil + + members := make([]v1beta.GroupMember, 0, len(memberSet)) + for _, member := range memberSet { + members = append(members, *member) + } + group := &v1beta.AppliedToGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + return group, nil } // DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 0ced8235e26..dc68f2f5b13 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -1039,7 +1039,7 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) { for _, rule := range tt.rules { c.rules.Add(rule) } - err := c.PatchAppliedToGroup(tt.args) + ret, err := c.PatchAppliedToGroup(tt.args) if (err == nil) == tt.expectedErr { t.Fatalf("Got error %v, expected %t", err, tt.expectedErr) } @@ -1048,6 +1048,9 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) { } actualPods, _ := c.appliedToSetByGroup[tt.args.Name] assert.ElementsMatch(t, tt.expectedPods, actualPods.Items(), "stored Pods not equal") + if !tt.expectedErr { + assert.Equal(t, len(ret.GroupMembers), len(actualPods)) + } }) } } @@ -1116,7 +1119,7 @@ func TestRuleCachePatchAddressGroup(t *testing.T) { for _, rule := range tt.rules { c.rules.Add(rule) } - err := c.PatchAddressGroup(tt.args) + ret, err := c.PatchAddressGroup(tt.args) if (err == nil) == tt.expectedErr { t.Fatalf("Got error %v, expected %t", err, tt.expectedErr) } @@ -1125,6 +1128,9 @@ func TestRuleCachePatchAddressGroup(t *testing.T) { } actualAddresses, _ := c.addressSetByGroup[tt.args.Name] assert.ElementsMatch(t, tt.expectedAddresses, actualAddresses.Items(), "stored addresses not equal") + if !tt.expectedErr { + assert.Equal(t, len(ret.GroupMembers), len(actualAddresses)) + } }) } } diff --git a/pkg/agent/controller/networkpolicy/filestore.go b/pkg/agent/controller/networkpolicy/filestore.go new file mode 100644 index 00000000000..702a6b163f8 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/filestore.go @@ -0,0 +1,134 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + + "github.com/spf13/afero" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" +) + +// fileStore encodes and stores runtime.Objects in files. Each object will be stored in a separate file under the given +// directory. +type fileStore struct { + fs afero.Fs + // The directory to store the files. + dir string + // serializer knows how to encode and decode the objects. + serializer runtime.Serializer +} + +func newFileStore(fs afero.Fs, dir string, serializer runtime.Serializer) (*fileStore, error) { + s := &fileStore{ + fs: fs, + dir: dir, + serializer: serializer, + } + klog.V(2).InfoS("Creating directory for NetworkPolicy cache", "dir", dir) + if err := s.fs.MkdirAll(dir, 0o600); err != nil { + return nil, err + } + return s, nil +} + +// save stores the given object in file with the object's UID as the file name, overwriting any existing content if the +// file already exists. Note the method may update the object's GroupVersionKind in-place during serialization. +func (s fileStore) save(item runtime.Object) error { + object := item.(metav1.Object) + path := filepath.Join(s.dir, string(object.GetUID())) + file, err := s.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("error opening file for writing object %v: %w", object.GetUID(), err) + } + defer file.Close() + // Encode may update the object's GroupVersionKind in-place during serialization. + err = s.serializer.Encode(item, file) + if err != nil { + return fmt.Errorf("error writing object %v to file: %w", object.GetUID(), err) + } + return nil +} + +// delete removes the file with the object's UID as the file name if it exists. +func (s fileStore) delete(item runtime.Object) error { + object := item.(metav1.Object) + path := filepath.Join(s.dir, string(object.GetUID())) + err := s.fs.Remove(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + return nil +} + +// replaceAll replaces all files under the directory with the given objects. Existing files not in the given objects +// will be removed. Note the method may update the object's GroupVersionKind in-place during serialization. +func (s fileStore) replaceAll(items []runtime.Object) error { + if err := s.fs.RemoveAll(s.dir); err != nil { + return err + } + if err := s.fs.MkdirAll(s.dir, 0o600); err != nil { + return err + } + for _, item := range items { + if err := s.save(item); err != nil { + return err + } + } + return nil +} + +func (s fileStore) loadAll() ([]runtime.Object, error) { + var objects []runtime.Object + err := afero.Walk(s.fs, s.dir, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + file, err2 := s.fs.Open(path) + if err2 != nil { + return err2 + } + defer file.Close() + data, err2 := io.ReadAll(file) + if err2 != nil { + return err2 + } + + object, gkv, err2 := s.serializer.Decode(data, nil, nil) + // If the data is corrupted somehow, we still want to load other data and continue the process. + if err2 != nil { + klog.ErrorS(err2, "Failed to decode data from file, ignore it", "file", path) + return nil + } + // Note: we haven't stored a different version so far but version conversion should be performed when the used + // version is upgraded in the future. + klog.V(2).InfoS("Loaded object from file", "gkv", gkv, "object", object) + objects = append(objects, object) + return nil + }) + if err != nil { + return nil, err + } + return objects, nil +} diff --git a/pkg/agent/controller/networkpolicy/filestore_test.go b/pkg/agent/controller/networkpolicy/filestore_test.go new file mode 100644 index 00000000000..71ef6a59c13 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/filestore_test.go @@ -0,0 +1,190 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" + "k8s.io/apimachinery/pkg/types" + + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" +) + +const ( + testDataPath = "/var/run/antrea-test/file-store" +) + +// Set it to NewMemMapFs as the file system may be not writable. +// Change it to NewOsFs to evaluate performance when writing to disk. +var newFS = afero.NewMemMapFs + +func newFakeFileStore(tb testing.TB, dir string) *fileStore { + serializer := protobuf.NewSerializer(scheme, scheme) + codec := codecs.CodecForVersions(serializer, serializer, v1beta2.SchemeGroupVersion, v1beta2.SchemeGroupVersion) + // Create a new FS for every fileStore in case of interaction between tests. + fs := afero.NewBasePathFs(newFS(), testDataPath) + s, err := newFileStore(fs, dir, codec) + assert.NoError(tb, err) + return s +} + +func TestFileStore(t *testing.T) { + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + policy2 := newNetworkPolicy("policy2", "uid2", []string{"addressGroup2"}, nil, []string{"appliedToGroup2"}, nil) + policy3 := newNetworkPolicy("policy3", "uid3", []string{"addressGroup3"}, nil, []string{"appliedToGroup3"}, nil) + updatedPolicy2 := policy2.DeepCopy() + updatedPolicy2.AppliedToGroups = []string{"foo"} + + tests := []struct { + name string + ops func(*fileStore) + expectedObjects []runtime.Object + }{ + { + name: "add", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.save(policy3) + }, + expectedObjects: []runtime.Object{policy1, policy2, policy3}, + }, + { + name: "update", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.save(updatedPolicy2) + }, + expectedObjects: []runtime.Object{policy1, updatedPolicy2}, + }, + { + name: "delete", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.delete(policy2) + }, + expectedObjects: []runtime.Object{policy1}, + }, + { + name: "replace", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.replaceAll([]runtime.Object{updatedPolicy2, policy3}) + }, + expectedObjects: []runtime.Object{updatedPolicy2, policy3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newFakeFileStore(t, networkPoliciesDir) + tt.ops(s) + gotObjects, err := s.loadAll() + require.NoError(t, err) + assert.Equal(t, tt.expectedObjects, gotObjects) + }) + } +} + +func BenchmarkFileStoreAddNetworkPolicy(b *testing.B) { + policy := newNetworkPolicy("policy1", types.UID(uuid.New().String()), []string{uuid.New().String()}, nil, []string{uuid.New().String()}, nil) + s := newFakeFileStore(b, networkPoliciesDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(policy) + } +} + +func BenchmarkFileStoreAddAppliedToGroup(b *testing.B) { + members := make([]v1beta2.GroupMember, 0, 100) + for i := 0; i < 100; i++ { + members = append(members, *newAppliedToGroupMemberPod(fmt.Sprintf("pod-%d", i), "namespace")) + } + atg := newAppliedToGroup(uuid.New().String(), members) + s := newFakeFileStore(b, appliedToGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(atg) + } +} + +func BenchmarkFileStoreAddAddressGroup(b *testing.B) { + members := make([]v1beta2.GroupMember, 0, 1000) + for i := 0; i < 1000; i++ { + members = append(members, *newAddressGroupPodMember(fmt.Sprintf("pod-%d", i), "namespace", "192.168.0.1")) + } + ag := newAddressGroup(uuid.New().String(), members) + s := newFakeFileStore(b, addressGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(ag) + } +} + +func BenchmarkFileStoreReplaceAll(b *testing.B) { + nps := make([]runtime.Object, 0, 1000) + atgs := make([]runtime.Object, 0, 1000) + ags := make([]runtime.Object, 0, 1000) + for i := 0; i < 1000; i++ { + policyName := uuid.New().String() + addressGroupName := uuid.New().String() + appliedToGroupName := uuid.New().String() + nps = append(nps, newNetworkPolicy(policyName, types.UID(policyName), []string{addressGroupName}, nil, []string{appliedToGroupName}, nil)) + + var atgMembers []v1beta2.GroupMember + for j := 0; j < 100; j++ { + atgMembers = append(atgMembers, *newAppliedToGroupMemberPod(fmt.Sprintf("pod-%d", j), "namespace")) + } + atg := newAppliedToGroup(appliedToGroupName, atgMembers) + atgs = append(atgs, atg) + + var agMembers []v1beta2.GroupMember + podNum := 100 + if i < 10 { + podNum = 10000 + } else if i < 110 { + podNum = 1000 + } + for j := 0; j < podNum; j++ { + agMembers = append(agMembers, *newAddressGroupPodMember(fmt.Sprintf("pod-%d", j), "namespace", "192.168.0.1")) + } + ag := newAddressGroup(addressGroupName, agMembers) + ags = append(ags, ag) + } + + networkPolicyStore := newFakeFileStore(b, networkPoliciesDir) + appliedToGroupStore := newFakeFileStore(b, appliedToGroupsDir) + addressGroupStore := newFakeFileStore(b, addressGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + networkPolicyStore.replaceAll(nps) + appliedToGroupStore.replaceAll(atgs) + addressGroupStore.replaceAll(ags) + } +} diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index f25446e30d2..36d4361c281 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -23,9 +23,12 @@ import ( "time" "antrea.io/ofnet/ofctrl" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" @@ -39,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/openflow" proxytypes "antrea.io/antrea/pkg/agent/proxy/types" "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/controlplane/install" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" @@ -58,6 +62,13 @@ const ( dnsInterceptRuleID = uint32(1) ) +const ( + dataPath = "/var/run/antrea/networkpolicy" + networkPoliciesDir = "network-policies" + appliedToGroupsDir = "applied-to-groups" + addressGroupsDir = "address-groups" +) + type L7RuleReconciler interface { AddRule(ruleID, policyName string, vlanID uint32, l7Protocols []v1beta2.L7Protocol, enableLogging bool) error DeleteRule(ruleID string, vlanID uint32) error @@ -65,6 +76,15 @@ type L7RuleReconciler interface { var emptyWatch = watch.NewEmptyWatch() +var ( + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) +) + +func init() { + install.Install(scheme) +} + type packetInAction func(*ofctrl.PacketIn) error // Controller is responsible for watching Antrea AddressGroups, AppliedToGroups, @@ -128,6 +148,12 @@ type Controller struct { tunPort uint32 nodeConfig *config.NodeConfig + // The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect + // to antrea-controller on startup. + networkPolicyStore *fileStore + appliedToGroupStore *fileStore + addressGroupStore *fileStore + logPacketAction packetInAction rejectRequestAction packetInAction storeDenyConnectionAction packetInAction @@ -137,6 +163,7 @@ type Controller struct { func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, + fs afero.Fs, nodeName string, podUpdateSubscriber channel.Subscriber, externalEntityUpdateSubscriber channel.Subscriber, @@ -176,8 +203,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.l7VlanIDAllocator = newL7VlanIDAllocator() } + var err error if antreaPolicyEnabled { - var err error if c.fqdnController, err = newFQDNController(ofClient, idAllocator, dnsServerOverride, c.enqueueRule, v4Enabled, v6Enabled, gwPort); err != nil { return nil, err } @@ -189,6 +216,23 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled, multicastEnabled) c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalEntityUpdateSubscriber, groupIDUpdates, nodeType) + + serializer := protobuf.NewSerializer(scheme, scheme) + codec := codecs.CodecForVersions(serializer, serializer, v1beta2.SchemeGroupVersion, v1beta2.SchemeGroupVersion) + fs = afero.NewBasePathFs(fs, dataPath) + c.networkPolicyStore, err = newFileStore(fs, networkPoliciesDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for NetworkPolicy: %w", err) + } + c.appliedToGroupStore, err = newFileStore(fs, appliedToGroupsDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for AppliedToGroup: %w", err) + } + c.addressGroupStore, err = newFileStore(fs, addressGroupsDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for AddressGroup: %w", err) + } + if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } @@ -235,6 +279,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, "policyName", policy.SourceRef.ToString()) return nil } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicy to file", "policyName", policy.SourceRef.ToString()) + } c.ruleCache.AddNetworkPolicy(policy) klog.InfoS("NetworkPolicy applied to Pods on this Node", "policyName", policy.SourceRef.ToString()) return nil @@ -249,6 +298,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, "policyName", policy.SourceRef.ToString()) return nil } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicy to file", "policyName", policy.SourceRef.ToString()) + } updated := c.ruleCache.UpdateNetworkPolicy(policy) // If any rule or the generation changes, we ensure statusManager will resync the policy's status once, in // case the changes don't cause any actual rule update but the whole policy's generation is changed. @@ -269,6 +323,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } c.ruleCache.DeleteNetworkPolicy(policy) klog.InfoS("NetworkPolicy no longer applied to Pods on this Node", "policyName", policy.SourceRef.ToString()) + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to delete the NetworkPolicy from file", "policyName", policy.SourceRef.ToString()) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -293,9 +350,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.statusManager.Resync(policies[i].UID) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicies to files") + } c.ruleCache.ReplaceNetworkPolicies(policies) return nil }, + FallbackFunc: c.networkPolicyStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -314,15 +377,28 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, if !ok { return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.appliedToGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroup to file", "groupName", group.Name) + } c.ruleCache.AddAppliedToGroup(group) return nil }, UpdateFunc: func(obj runtime.Object) error { - group, ok := obj.(*v1beta2.AppliedToGroupPatch) + patch, ok := obj.(*v1beta2.AppliedToGroupPatch) if !ok { - return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) + return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroupPatch: %v", obj) + } + group, err := c.ruleCache.PatchAppliedToGroup(patch) + if err != nil { + return err + } + // It's fine to store the object to file after applying the patch to ruleCache because the returned object + // is newly created, and ruleCache itself doesn't use it. + if err := c.appliedToGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroup to file", "groupName", group.Name) } - c.ruleCache.PatchAppliedToGroup(group) return nil }, DeleteFunc: func(obj runtime.Object) error { @@ -331,6 +407,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) } c.ruleCache.DeleteAppliedToGroup(group) + if err := c.appliedToGroupStore.delete(group); err != nil { + klog.ErrorS(err, "Failed to delete the AppliedToGroup from file", "groupName", group.Name) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -342,9 +421,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", objs[i]) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if c.appliedToGroupStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroups to files") + } c.ruleCache.ReplaceAppliedToGroups(groups) return nil }, + FallbackFunc: c.appliedToGroupStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -363,15 +448,28 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, if !ok { return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.addressGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroup to file", "groupName", group.Name) + } c.ruleCache.AddAddressGroup(group) return nil }, UpdateFunc: func(obj runtime.Object) error { - group, ok := obj.(*v1beta2.AddressGroupPatch) + patch, ok := obj.(*v1beta2.AddressGroupPatch) if !ok { - return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) + return fmt.Errorf("cannot convert to *v1beta1.AddressGroupPatch: %v", obj) + } + group, err := c.ruleCache.PatchAddressGroup(patch) + if err != nil { + return err + } + // It's fine to store the object to file after applying the patch to ruleCache because the returned object + // is newly created, and ruleCache itself doesn't use it. + if err := c.addressGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroup to file", "groupName", group.Name) } - c.ruleCache.PatchAddressGroup(group) return nil }, DeleteFunc: func(obj runtime.Object) error { @@ -380,6 +478,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) } c.ruleCache.DeleteAddressGroup(group) + if err := c.addressGroupStore.delete(group); err != nil { + klog.ErrorS(err, "Failed to delete the AddressGroup from file", "groupName", group.Name) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -391,9 +492,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", objs[i]) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if c.addressGroupStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroups to files") + } c.ruleCache.ReplaceAddressGroups(groups) return nil }, + FallbackFunc: c.addressGroupStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -741,6 +848,8 @@ type watcher struct { DeleteFunc func(obj runtime.Object) error // ReplaceFunc is the function that handles init events. ReplaceFunc func(objs []runtime.Object) error + // FallbackFunc is the function that provides the data when it can't start the watch successfully. + FallbackFunc func() ([]runtime.Object, error) // connected represents whether the watch has connected to apiserver successfully. connected bool // lock protects connected. @@ -763,17 +872,46 @@ func (w *watcher) setConnected(connected bool) { w.connected = connected } +// fallback gets init events from the FallbackFunc if the watcher hasn't been synced once. +func (w *watcher) fallback() { + // If the watcher has been synced once, the fallback data source doesn't have newer data, do nothing. + if w.fullSynced { + return + } + klog.InfoS("Getting init events for %s from fallback", w.objectType) + objects, err := w.FallbackFunc() + if err != nil { + klog.ErrorS(err, "Failed to get init events for %s from fallback", w.objectType) + return + } + if err := w.ReplaceFunc(objects); err != nil { + klog.ErrorS(err, "Failed to handle init events") + return + } + w.onFullSync() +} + +func (w *watcher) onFullSync() { + if !w.fullSynced { + w.fullSynced = true + // Notify fullSyncWaitGroup that all events before bookmark is handled + w.fullSyncWaitGroup.Done() + } +} + func (w *watcher) watch() { klog.Infof("Starting watch for %s", w.objectType) watcher, err := w.watchFunc() if err != nil { klog.Warningf("Failed to start watch for %s: %v", w.objectType, err) + w.fallback() return } // Watch method doesn't return error but "emptyWatch" in case of some partial data errors, // e.g. timeout error. Make sure that watcher is not empty and log warning otherwise. if reflect.TypeOf(watcher) == reflect.TypeOf(emptyWatch) { klog.Warningf("Failed to start watch for %s, please ensure antrea service is reachable for the agent", w.objectType) + w.fallback() return } @@ -814,11 +952,7 @@ loop: klog.Errorf("Failed to handle init events: %v", err) return } - if !w.fullSynced { - w.fullSynced = true - // Notify fullSyncWaitGroup that all events before bookmark is handled - w.fullSyncWaitGroup.Done() - } + w.onFullSync() for { select { diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index d2179ce6a88..1552281039d 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -15,17 +15,21 @@ package networkpolicy import ( + "encoding/base64" "fmt" "net" + "os" "strings" "sync" "testing" "time" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" @@ -71,7 +75,8 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) + fs := afero.NewMemMapFs() + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) reconciler := newMockReconciler() controller.reconciler = reconciler controller.auditLogger = nil @@ -146,14 +151,16 @@ var _ Reconciler = &mockReconciler{} func newAddressGroup(name string, addresses []v1beta2.GroupMember) *v1beta2.AddressGroup { return &v1beta2.AddressGroup{ - ObjectMeta: v1.ObjectMeta{Name: name}, + TypeMeta: v1.TypeMeta{Kind: "AddressGroup", APIVersion: "controlplane.antrea.io/v1beta2"}, + ObjectMeta: v1.ObjectMeta{Name: name, UID: types.UID(name)}, GroupMembers: addresses, } } func newAppliedToGroup(name string, pods []v1beta2.GroupMember) *v1beta2.AppliedToGroup { return &v1beta2.AppliedToGroup{ - ObjectMeta: v1.ObjectMeta{Name: name}, + TypeMeta: v1.TypeMeta{Kind: "AppliedToGroup", APIVersion: "controlplane.antrea.io/v1beta2"}, + ObjectMeta: v1.ObjectMeta{Name: name, UID: types.UID(name)}, GroupMembers: pods, } } @@ -165,6 +172,7 @@ func newNetworkPolicy(name string, uid types.UID, from, to, appliedTo []string, } networkPolicyRule1 := newPolicyRule(dir, from, to, services) return &v1beta2.NetworkPolicy{ + TypeMeta: v1.TypeMeta{Kind: "NetworkPolicy", APIVersion: "controlplane.antrea.io/v1beta2"}, ObjectMeta: v1.ObjectMeta{UID: uid, Name: string(uid)}, Rules: []v1beta2.NetworkPolicyRule{networkPolicyRule1}, AppliedToGroups: appliedTo, @@ -507,6 +515,176 @@ func TestAddNetworkPolicyWithMultipleRules(t *testing.T) { assert.Equal(t, 1, controller.GetAppliedToGroupNum()) } +func writeToFile(t *testing.T, fs afero.Fs, dir, file string, base64Str string) { + data, err := base64.StdEncoding.DecodeString(base64Str) + require.NoError(t, err) + f, err := fs.OpenFile(dir+"/"+file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + require.NoError(t, err) + defer f.Close() + _, err = f.Write(data) + require.NoError(t, err) +} + +func TestFallbackToFileStore(t *testing.T) { + prepareMockTables() + tests := []struct { + name string + initFileStore func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) + expectedRule *CompletedRule + }{ + { + name: "same storage version", + initFileStore: func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) { + networkPolicyStore.save(newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil)) + appliedToGroupStore.save(newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*newAppliedToGroupMemberPod("pod1", "namespace")})) + addressGroupStore.save(newAddressGroup("addressGroup1", []v1beta2.GroupMember{*newAddressGroupPodMember("pod2", "namespace", "192.168.0.1")})) + }, + expectedRule: &CompletedRule{ + rule: &rule{ + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}}, + MaxPriority: -1, + AppliedToGroups: []string{"appliedToGroup1"}, + PolicyUID: "uid1", + PolicyName: "uid1", + SourceRef: &v1beta2.NetworkPolicyReference{ + Type: v1beta2.K8sNetworkPolicy, + Namespace: testNamespace, + Name: "policy1", + UID: "uid1", + }, + }, + FromAddresses: v1beta2.NewGroupMemberSet(newAddressGroupPodMember("pod2", "namespace", "192.168.0.1")), + TargetMembers: v1beta2.NewGroupMemberSet(newAppliedToGroupMemberPod("pod1", "namespace")), + }, + }, + { + // The test is to ensure compatibility with v1beta2 storage version if one day the used version is upgraded. + name: "compatible with v1beta2", + initFileStore: func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) { + // The bytes of v1beta2 objects serialized in protobuf. + // They are not supposed to be updated when bumping up the used version. + base64EncodedPolicy := "azhzAAovCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDU5ldHdvcmtQb2xpY3kSdAoYCgR1aWQxEgAaACIAKgR1aWQxMgA4AEIAEh8KAkluEg8KDWFkZHJlc3NHcm91cDEaACgAOABKAFoAGg9hcHBsaWVkVG9Hcm91cDEyJgoQSzhzTmV0d29ya1BvbGljeRIDbnMxGgdwb2xpY3kxIgR1aWQxGgAiAA==" + base64EncodedAppliedToGroup := "azhzAAowCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDkFwcGxpZWRUb0dyb3VwEkUKLgoPYXBwbGllZFRvR3JvdXAxEgAaACIAKg9hcHBsaWVkVG9Hcm91cDEyADgAQgASEwoRCgRwb2QxEgluYW1lc3BhY2UaACIA" + base64EncodedAddressGroup := "azhzAAouCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDEFkZHJlc3NHcm91cBJTCioKDWFkZHJlc3NHcm91cDESABoAIgAqDWFkZHJlc3NHcm91cDEyADgAQgASJQoRCgRwb2QyEgluYW1lc3BhY2UaEAAAAAAAAAAAAAD//8CoAAEaACIA" + writeToFile(t, networkPolicyStore.fs, networkPoliciesDir, "uid1", base64EncodedPolicy) + writeToFile(t, appliedToGroupStore.fs, appliedToGroupsDir, "appliedToGroup1", base64EncodedAppliedToGroup) + writeToFile(t, addressGroupStore.fs, addressGroupsDir, "addressGroup1", base64EncodedAddressGroup) + }, + expectedRule: &CompletedRule{ + rule: &rule{ + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}}, + MaxPriority: -1, + AppliedToGroups: []string{"appliedToGroup1"}, + PolicyUID: "uid1", + PolicyName: "uid1", + SourceRef: &v1beta2.NetworkPolicyReference{ + Type: v1beta2.K8sNetworkPolicy, + Namespace: testNamespace, + Name: "policy1", + UID: "uid1", + }, + }, + FromAddresses: v1beta2.NewGroupMemberSet( + &v1beta2.GroupMember{ + Pod: &v1beta2.PodReference{Name: "pod2", Namespace: "namespace"}, + IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("192.168.0.1"))}, + }, + ), + TargetMembers: v1beta2.NewGroupMemberSet( + &v1beta2.GroupMember{ + Pod: &v1beta2.PodReference{Name: "pod1", Namespace: "namespace"}, + }, + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + controller, clientset, reconciler := newTestController() + addressGroupWatcher := watch.NewFake() + appliedToGroupWatcher := watch.NewFake() + networkPolicyWatcher := watch.NewFake() + clientset.AddWatchReactor("addressgroups", k8stesting.DefaultWatchReactor(addressGroupWatcher, fmt.Errorf("network unavailable"))) + clientset.AddWatchReactor("appliedtogroups", k8stesting.DefaultWatchReactor(appliedToGroupWatcher, fmt.Errorf("network unavailable"))) + clientset.AddWatchReactor("networkpolicies", k8stesting.DefaultWatchReactor(networkPolicyWatcher, fmt.Errorf("network unavailable"))) + + tt.initFileStore(controller.networkPolicyStore, controller.appliedToGroupStore, controller.addressGroupStore) + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(stopCh) + + select { + case ruleID := <-reconciler.updated: + actualRule, _ := reconciler.getLastRealized(ruleID) + // Rule ID is a hash value, we don't care about its exact value. + actualRule.ID = "" + assert.Equal(t, tt.expectedRule, actualRule) + case <-time.After(time.Second): + t.Fatal("Expected one rule update, got timeout") + } + }) + } +} + +func TestOverrideFileStore(t *testing.T) { + prepareMockTables() + controller, clientset, reconciler := newTestController() + addressGroupWatcher := watch.NewFake() + appliedToGroupWatcher := watch.NewFake() + networkPolicyWatcher := watch.NewFake() + clientset.AddWatchReactor("addressgroups", k8stesting.DefaultWatchReactor(addressGroupWatcher, nil)) + clientset.AddWatchReactor("appliedtogroups", k8stesting.DefaultWatchReactor(appliedToGroupWatcher, nil)) + clientset.AddWatchReactor("networkpolicies", k8stesting.DefaultWatchReactor(networkPolicyWatcher, nil)) + + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + policy2 := newNetworkPolicy("policy2", "uid2", []string{"addressGroup2"}, nil, []string{"appliedToGroup2"}, nil) + atgMember1 := newAppliedToGroupMemberPod("pod1", "namespace") + atgMember2 := newAppliedToGroupMemberPod("pod2", "namespace") + agMember1 := newAddressGroupPodMember("pod3", "namespace", "192.168.0.1") + agMember2 := newAddressGroupPodMember("pod4", "namespace", "192.168.0.2") + atg1 := newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*atgMember1}) + atg2 := newAppliedToGroup("appliedToGroup2", []v1beta2.GroupMember{*atgMember2}) + ag1 := newAddressGroup("addressGroup1", []v1beta2.GroupMember{*agMember1}) + ag2 := newAddressGroup("addressGroup2", []v1beta2.GroupMember{*agMember2}) + controller.networkPolicyStore.save(policy1) + controller.appliedToGroupStore.save(atg1) + controller.addressGroupStore.save(ag1) + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(stopCh) + + networkPolicyWatcher.Add(policy2) + networkPolicyWatcher.Action(watch.Bookmark, nil) + addressGroupWatcher.Add(ag2) + addressGroupWatcher.Action(watch.Bookmark, nil) + appliedToGroupWatcher.Add(atg2) + appliedToGroupWatcher.Action(watch.Bookmark, nil) + + select { + case ruleID := <-reconciler.updated: + actualRule, _ := reconciler.getLastRealized(ruleID) + assert.Equal(t, v1beta2.NewGroupMemberSet(atgMember2), actualRule.TargetMembers) + assert.Equal(t, v1beta2.NewGroupMemberSet(agMember2), actualRule.FromAddresses) + assert.Equal(t, policy2.SourceRef, actualRule.SourceRef) + case <-time.After(time.Second): + t.Fatal("Expected one rule update, got timeout") + } + + objects, err := controller.appliedToGroupStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{atg2}, objects) + objects, err = controller.addressGroupStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{ag2}, objects) + objects, err = controller.networkPolicyStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{policy2}, objects) +} + func TestNetworkPolicyMetrics(t *testing.T) { prepareMockTables() // Initialize NetworkPolicy metrics (prometheus) diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index 70afc10b763..de169df2774 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -96,6 +96,10 @@ func TestNetworkPolicy(t *testing.T) { skipIfProxyDisabled(t, data) testAllowHairpinService(t, data) }) + t.Run("testNetworkPolicyAfterAgentRestart", func(t *testing.T) { + t.Cleanup(exportLogsForSubtest(t, data)) + testNetworkPolicyAfterAgentRestart(t, data) + }) } func testNetworkPolicyStats(t *testing.T, data *TestData) { @@ -704,6 +708,94 @@ func testNetworkPolicyResyncAfterRestart(t *testing.T, data *TestData) { } } +// The test validates that Pods can't bypass NetworkPolicy when antrea-agent restarts. +func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { + workerNode := workerNodeName(1) + var isolatedPod, deniedPod, allowedPod string + var isolatedPodIPs, deniedPodIPs, allowedPodIPs *PodIPs + var wg sync.WaitGroup + createTestPod := func(prefix string) (string, *PodIPs) { + defer wg.Done() + podName, podIPs, cleanup := createAndWaitForPod(t, data, data.createNginxPodOnNode, prefix, workerNode, data.testNamespace, false) + t.Cleanup(cleanup) + return podName, podIPs + } + wg.Add(3) + go func() { + isolatedPod, isolatedPodIPs = createTestPod("test-isolated") + }() + go func() { + deniedPod, deniedPodIPs = createTestPod("test-denied") + }() + go func() { + allowedPod, allowedPodIPs = createTestPod("test-allowed") + }() + wg.Wait() + + allowedPeer := networkingv1.NetworkPolicyPeer{ + PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"antrea-e2e": allowedPod}}, + } + netpol, err := data.createNetworkPolicy("test-isolated", &networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"antrea-e2e": isolatedPod}}, + Ingress: []networkingv1.NetworkPolicyIngressRule{{From: []networkingv1.NetworkPolicyPeer{allowedPeer}}}, + Egress: []networkingv1.NetworkPolicyEgressRule{{To: []networkingv1.NetworkPolicyPeer{allowedPeer}}}, + }) + require.NoError(t, err) + t.Cleanup(func() { data.deleteNetworkpolicy(netpol) }) + + checkFunc := func(testPod string, testPodIPs *PodIPs, expectErr bool) { + var wg sync.WaitGroup + checkOne := func(clientPod, serverPod string, serverIP *net.IP) { + defer wg.Done() + if serverIP != nil { + _, _, err := data.runWgetCommandFromTestPodWithRetry(clientPod, data.testNamespace, nginxContainerName, serverIP.String(), 1) + if expectErr && err == nil { + t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) + } else if !expectErr && err != nil { + t.Errorf("Pod %s should be able to connect %s, but was not able to connect, err: %v", clientPod, serverPod, err) + } + } + } + wg.Add(4) + go checkOne(isolatedPod, testPod, testPodIPs.IPv4) + go checkOne(isolatedPod, testPod, testPodIPs.IPv6) + go checkOne(testPod, isolatedPod, isolatedPodIPs.IPv4) + go checkOne(testPod, isolatedPod, isolatedPodIPs.IPv6) + wg.Wait() + } + + scaleFunc := func(replicas int32) { + scale, err := data.clientset.AppsV1().Deployments(antreaNamespace).GetScale(context.TODO(), antreaDeployment, metav1.GetOptions{}) + require.NoError(t, err) + scale.Spec.Replicas = replicas + _, err = data.clientset.AppsV1().Deployments(antreaNamespace).UpdateScale(context.TODO(), antreaDeployment, scale, metav1.UpdateOptions{}) + require.NoError(t, err) + } + + // Scale antrea-controller to 0 so antrea-agent will lose connection with antrea-controller. + scaleFunc(0) + t.Cleanup(func() { scaleFunc(1) }) + + // Restart the antrea-agent. + _, err = data.deleteAntreaAgentOnNode(workerNode, 30, defaultTimeout) + require.NoError(t, err) + antreaPod, err := data.getAntreaPodOnNode(workerNode) + require.NoError(t, err) + // Make sure the new antrea-agent disconnects from antrea-controller but connects to OVS. + waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionFalse) + waitForAgentCondition(t, data, antreaPod, v1beta1.OpenflowConnectionUp, corev1.ConditionTrue) + // Even the new antrea-agent can't connect to antrea-controller, the previous policy should continue working. + checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(allowedPod, allowedPodIPs, false) + + // Scale antrea-controller to 1 so antrea-agent will connect to antrea-controller. + scaleFunc(1) + // Make sure antrea-agent connects to antrea-controller. + waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionTrue) + checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(allowedPod, allowedPodIPs, false) +} + func testIngressPolicyWithoutPortNumber(t *testing.T, data *TestData) { serverPort := int32(80) _, serverIPs, cleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "test-server-", "", data.testNamespace, false) @@ -1039,8 +1131,9 @@ func waitForAgentCondition(t *testing.T, data *TestData, podName string, conditi t.Logf("cmds: %s", cmds) stdout, _, err := runAntctl(podName, cmds, data) + // The server may not be available yet. if err != nil { - return true, err + return false, nil } var agentInfo agentinfo.AntreaAgentInfoResponse err = json.Unmarshal([]byte(stdout), &agentInfo)