From b97ddf6bf47c38c02dbeccb61c3304ded7b6dedc Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Wed, 22 Nov 2023 18:50:53 +0800 Subject: [PATCH] Enable Pod network after realizing initial NetworkPolicies Pod network should only be enabled after realizing initial NetworkPolicies, otherwise traffic from/to Pods could bypass NetworkPolicy when antrea-agent restarts. After commit f9fc979345bf ("Store NetworkPolicy in filesystem as fallback data source"), antrea-agent can realize either the latest NetworkPolicies got from antrea-controller or the ones got from filesystem as fallback. Therefore, waiting for NetworkPolicies to be realized should not add marked delay or make antrea-controller a failure point of Pod network. This commit adds an implementation of wait group capable of waiting with a timeout, and uses it to wait for common initialization and NetworkPolicy realization before installing any flows for Pods. More preconditions can be added via the wait group if needed in the future. Signed-off-by: Quan Tian --- cmd/antrea-agent/agent.go | 15 +- pkg/agent/agent.go | 23 +-- pkg/agent/cniserver/pod_configuration.go | 46 +++--- pkg/agent/cniserver/server.go | 17 ++- pkg/agent/cniserver/server_test.go | 5 +- .../networkpolicy/networkpolicy_controller.go | 15 +- .../networkpolicy_controller_test.go | 26 +++- pkg/util/wait/wait.go | 85 +++++++++++ pkg/util/wait/wait_test.go | 133 ++++++++++++++++++ test/e2e/networkpolicy_test.go | 9 +- test/integration/agent/cniserver_test.go | 18 +-- 11 files changed, 327 insertions(+), 65 deletions(-) create mode 100644 pkg/util/wait/wait.go create mode 100644 pkg/util/wait/wait_test.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index ee294a77672..bbd6c986c2f 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -79,6 +79,7 @@ import ( "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/lazy" "antrea.io/antrea/pkg/util/podstore" + utilwait "antrea.io/antrea/pkg/util/wait" "antrea.io/antrea/pkg/version" ) @@ -226,9 +227,12 @@ func run(o *Options) error { // Create an ifaceStore that caches network interfaces managed by this node. ifaceStore := interfacestore.NewInterfaceStore() - // networkReadyCh is used to notify that the Node's network is ready. - // Functions that rely on the Node's network should wait for the channel to close. - networkReadyCh := make(chan struct{}) + // podNetworkWait is used to wait and notify that preconditions for Pod network are ready. + // Processes that are supposed to finish before enabling Pod network should increment the wait group and decrement + // it when finished. + // Processes that enable Pod network should wait for it. + podNetworkWait := utilwait.NewGroup() + // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will // cause the stopCh channel to be closed; if another signal is received before the program // exits, we will force exit. @@ -275,7 +279,7 @@ func run(o *Options) error { wireguardConfig, egressConfig, serviceConfig, - networkReadyCh, + podNetworkWait, stopCh, o.nodeType, o.config.ExternalNode.ExternalNodeNamespace, @@ -479,6 +483,7 @@ func run(o *Options) error { gwPort, tunPort, nodeConfig, + podNetworkWait, ) if err != nil { return fmt.Errorf("error creating new NetworkPolicy controller: %v", err) @@ -550,7 +555,7 @@ func run(o *Options) error { enableAntreaIPAM, o.config.DisableTXChecksumOffload, networkConfig, - networkReadyCh) + podNetworkWait) err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel) if err != nil { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 1b43a4ff89a..791e18b890f 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -23,7 +23,6 @@ import ( "os" "strconv" "strings" - "sync" "time" "github.com/containernetworking/plugins/pkg/ip" @@ -57,6 +56,7 @@ import ( "antrea.io/antrea/pkg/util/env" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -121,9 +121,9 @@ type Initializer struct { enableL7NetworkPolicy bool connectUplinkToBridge bool enableAntreaProxy bool - // networkReadyCh should be closed once the Node's network is ready. + // podNetworkWait should be decremented once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - networkReadyCh chan<- struct{} + podNetworkWait *utilwait.Group stopCh <-chan struct{} nodeType config.NodeType externalNodeNamespace string @@ -144,7 +144,7 @@ func NewInitializer( wireGuardConfig *config.WireGuardConfig, egressConfig *config.EgressConfig, serviceConfig *config.ServiceConfig, - networkReadyCh chan<- struct{}, + podNetworkWait *utilwait.Group, stopCh <-chan struct{}, nodeType config.NodeType, externalNodeNamespace string, @@ -168,7 +168,7 @@ func NewInitializer( egressConfig: egressConfig, serviceConfig: serviceConfig, l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{}, - networkReadyCh: networkReadyCh, + podNetworkWait: podNetworkWait, stopCh: stopCh, nodeType: nodeType, externalNodeNamespace: externalNodeNamespace, @@ -407,9 +407,6 @@ func (i *Initializer) restorePortConfigs() error { // Initialize sets up agent initial configurations. func (i *Initializer) Initialize() error { klog.Info("Setting up node network") - // wg is used to wait for the asynchronous initialization. - var wg sync.WaitGroup - if err := i.initNodeLocalConfig(); err != nil { return err } @@ -485,10 +482,10 @@ func (i *Initializer) Initialize() error { } if i.nodeType == config.K8sNode { - wg.Add(1) + i.podNetworkWait.Increment() // routeClient.Initialize() should be after i.setupOVSBridge() which // creates the host gateway interface. - if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil { + if err := i.routeClient.Initialize(i.nodeConfig, i.podNetworkWait.Done); err != nil { return err } @@ -496,12 +493,6 @@ func (i *Initializer) Initialize() error { if err := i.initOpenFlowPipeline(); err != nil { return err } - - // The Node's network is ready only when both synchronous and asynchronous initialization are done. - go func() { - wg.Wait() - close(i.networkReadyCh) - }() } else { // Install OpenFlow entries on OVS bridge. if err := i.initOpenFlowPipeline(); err != nil { diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 9f6398af48b..908ee6eb30c 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -36,6 +36,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" + "antrea.io/antrea/pkg/util/wait" ) type vethPair struct { @@ -413,7 +414,7 @@ func parsePrevResult(conf *types.NetworkConfig) error { return nil } -func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator) error { +func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error { // desiredPods is the set of Pods that should be present, based on the // current list of Pods got from the Kubernetes API. desiredPods := sets.New[string]() @@ -445,21 +446,34 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain missingIfConfigs = append(missingIfConfigs, containerConfig) continue } - // This interface matches an existing Pod. - // We rely on the interface cache / store - which is initialized from the persistent - // OVSDB - to map the Pod to its interface configuration. The interface - // configuration includes the parameters we need to replay the flows. - klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) - if err := pc.ofClient.InstallPodFlows( - containerConfig.InterfaceName, - containerConfig.IPs, - containerConfig.MAC, - uint32(containerConfig.OFPort), - containerConfig.VLANID, - nil, - ); err != nil { - klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name)) - } + go func(containerID, pod, namespace string) { + // Do not install Pod flows until all preconditions are met. + podNetworkWait.Wait() + // To avoid race condition with CNIServer CNI event handlers. + containerAccess.lockContainer(containerID) + defer containerAccess.unlockContainer(containerID) + + containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID) + if !exists { + klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, name), "containerID", containerID) + return + } + // This interface matches an existing Pod. + // We rely on the interface cache / store - which is initialized from the persistent + // OVSDB - to map the Pod to its interface configuration. The interface + // configuration includes the parameters we need to replay the flows. + klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) + if err := pc.ofClient.InstallPodFlows( + containerConfig.InterfaceName, + containerConfig.IPs, + containerConfig.MAC, + uint32(containerConfig.OFPort), + containerConfig.VLANID, + nil, + ); err != nil { + klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name)) + } + }(containerConfig.ContainerID, name, namespace) } else { // clean-up and delete interface klog.V(4).InfoS("Deleting interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 1426fdc131c..b24cf00ab94 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -44,6 +44,7 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -115,8 +116,8 @@ type CNIServer struct { enableSecondaryNetworkIPAM bool disableTXChecksumOffload bool networkConfig *config.NetworkConfig - // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. - networkReadyCh <-chan struct{} + // podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. + podNetworkWait *wait.Group } var supportedCNIVersionSet map[string]bool @@ -434,11 +435,9 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* return resp, err } - select { - case <-time.After(networkReadyTimeout): - klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) + if err := s.podNetworkWait.WaitWithTimeout(networkReadyTimeout); err != nil { + klog.ErrorS(err, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) return s.tryAgainLaterResponse(), nil - case <-s.networkReadyCh: } result := &ipam.IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}} @@ -610,7 +609,7 @@ func New( routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, networkConfig *config.NetworkConfig, - networkReadyCh <-chan struct{}, + podNetworkWait *wait.Group, ) *CNIServer { return &CNIServer{ cniSocket: cniSocket, @@ -625,7 +624,7 @@ func New( disableTXChecksumOffload: disableTXChecksumOffload, enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, networkConfig: networkConfig, - networkReadyCh: networkReadyCh, + podNetworkWait: podNetworkWait, } } @@ -739,7 +738,7 @@ func (s *CNIServer) reconcile() error { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } - return s.podConfigurator.reconcile(pods.Items, s.containerAccess) + return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait) } func init() { diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index f5ff7056e3f..ce6348c6203 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -43,6 +43,7 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -663,15 +664,13 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[ } func newCNIServer(t *testing.T) *CNIServer { - networkReadyCh := make(chan struct{}) cniServer := &CNIServer{ cniSocket: testSocket, nodeConfig: testNodeConfig, serverVersion: cni.AntreaCNIVersion, containerAccess: newContainerAccessArbitrator(), - networkReadyCh: networkReadyCh, + podNetworkWait: wait.NewGroup(), } - close(networkReadyCh) cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 36d4361c281..9ce8e0b4343 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -46,6 +46,7 @@ import ( "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -143,10 +144,11 @@ type Controller struct { fullSyncGroup sync.WaitGroup ifaceStore interfacestore.InterfaceStore // denyConnStore is for storing deny connections for flow exporter. - denyConnStore *connections.DenyConnectionStore - gwPort uint32 - tunPort uint32 - nodeConfig *config.NodeConfig + denyConnStore *connections.DenyConnectionStore + gwPort uint32 + tunPort uint32 + nodeConfig *config.NodeConfig + podNetworkWait *utilwait.Group // The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect // to antrea-controller on startup. @@ -181,7 +183,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, v4Enabled bool, v6Enabled bool, gwPort, tunPort uint32, - nodeConfig *config.NodeConfig) (*Controller, error) { + nodeConfig *config.NodeConfig, + podNetworkWait *utilwait.Group) (*Controller, error) { idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID) c := &Controller{ antreaClientProvider: antreaClientGetter, @@ -196,6 +199,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, gwPort: gwPort, tunPort: tunPort, nodeConfig: nodeConfig, + podNetworkWait: podNetworkWait.Increment(), } if l7NetworkPolicyEnabled { @@ -610,6 +614,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { klog.Infof("All watchers have completed full sync, installing flows for init events") // Batch install all rules in queue after fullSync is finished. c.processAllItemsInQueue() + c.podNetworkWait.Done() klog.Infof("Starting NetworkPolicy workers now") defer c.queue.ShutDown() diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 1552281039d..58df524a48c 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -47,6 +47,7 @@ import ( "antrea.io/antrea/pkg/client/clientset/versioned/fake" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const testNamespace = "ns1" @@ -76,7 +77,30 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} fs := afero.NewMemMapFs() - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, + nil, + nil, + fs, + "node1", + podUpdateChannel, + nil, + groupCounters, + ch2, + true, + true, + true, + true, + false, + nil, + testAsyncDeleteInterval, + "8.8.8.8:53", + config.K8sNode, + true, + false, + config.HostGatewayOFPort, + config.DefaultTunOFPort, + &config.NodeConfig{}, + wait.NewGroup()) reconciler := newMockReconciler() controller.reconciler = reconciler controller.auditLogger = nil diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go new file mode 100644 index 00000000000..6897ec2fb24 --- /dev/null +++ b/pkg/util/wait/wait.go @@ -0,0 +1,85 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "fmt" + "sync" + "time" + + "k8s.io/utils/clock" +) + +// Group allows to wait for a collection of goroutines to finish with a timeout or a stop channel. +type Group struct { + wg *sync.WaitGroup + doneCh chan struct{} + once sync.Once + clock clock.Clock +} + +func NewGroup() *Group { + return newGroupWithClock(clock.RealClock{}) +} + +func newGroupWithClock(clock clock.Clock) *Group { + return &Group{ + wg: &sync.WaitGroup{}, + doneCh: make(chan struct{}), + clock: clock, + } +} + +func (g *Group) Increment() *Group { + g.wg.Add(1) + return g +} + +func (g *Group) Done() { + g.wg.Done() +} + +func (g *Group) wait() { + g.once.Do(func() { + go func() { + g.wg.Wait() + close(g.doneCh) + }() + }) +} + +func (g *Group) WaitWithTimeout(timeout time.Duration) error { + g.wait() + select { + case <-g.doneCh: + return nil + case <-g.clock.After(timeout): + return fmt.Errorf("timeout waiting for group") + } +} + +func (g *Group) WaitUntil(stopCh <-chan struct{}) error { + g.wait() + select { + case <-g.doneCh: + return nil + case <-stopCh: + return fmt.Errorf("stopCh closed, stop waiting") + } +} + +func (g *Group) Wait() { + g.wg.Wait() +} diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go new file mode 100644 index 00000000000..f471685b1d9 --- /dev/null +++ b/pkg/util/wait/wait_test.go @@ -0,0 +1,133 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clock "k8s.io/utils/clock/testing" +) + +func TestGroupWaitWithTimeout(t *testing.T) { + const timeout = 100 * time.Millisecond + tests := []struct { + name string + add int + processFn func(group *Group, fakeClock *clock.FakeClock) + expectWaitErr bool + }{ + { + name: "add only", + add: 1, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + fakeClock.Step(timeout) + }, + expectWaitErr: true, + }, + { + name: "add greater than done", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout) + }, + expectWaitErr: true, + }, + { + name: "add equal to done", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout / 2) + group.Done() + }, + expectWaitErr: false, + }, + { + name: "add with delay", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout * 2) + group.Done() + }, + expectWaitErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + g := newGroupWithClock(fakeClock) + for i := 0; i < tt.add; i++ { + g.Increment() + } + resCh := make(chan error, 1) + go func() { + resCh <- g.WaitWithTimeout(timeout) + }() + require.Eventually(t, func() bool { + return fakeClock.HasWaiters() + }, 1*time.Second, 10*time.Millisecond) + tt.processFn(g, fakeClock) + err := <-resCh + if tt.expectWaitErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGroupWait(t *testing.T) { + g := NewGroup() + g.Increment() + returnedCh := make(chan struct{}) + go func() { + g.Wait() + close(returnedCh) + }() + select { + case <-time.After(100 * time.Millisecond): + case <-returnedCh: + t.Errorf("Wait should not return before it's done") + } + g.Done() + select { + case <-time.After(500 * time.Millisecond): + t.Errorf("Wait should return after it's done") + case <-returnedCh: + } +} + +func TestGroupWaitUntil(t *testing.T) { + g := NewGroup() + g.Increment() + stopCh := make(chan struct{}) + go func() { + time.Sleep(100 * time.Millisecond) + close(stopCh) + }() + err := g.WaitUntil(stopCh) + assert.Error(t, err) + + stopCh = make(chan struct{}) + g.Done() + err = g.WaitUntil(stopCh) + assert.NoError(t, err) +} diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index de169df2774..56fb0845e75 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -748,7 +748,8 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { checkOne := func(clientPod, serverPod string, serverIP *net.IP) { defer wg.Done() if serverIP != nil { - _, _, err := data.runWgetCommandFromTestPodWithRetry(clientPod, data.testNamespace, nginxContainerName, serverIP.String(), 1) + cmd := []string{"wget", "-O", "-", serverIP.String(), "-T", "1"} + _, _, err := data.RunCommandFromPod(data.testNamespace, clientPod, nginxContainerName, cmd) if expectErr && err == nil { t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) } else if !expectErr && err != nil { @@ -779,6 +780,12 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { // Restart the antrea-agent. _, err = data.deleteAntreaAgentOnNode(workerNode, 30, defaultTimeout) require.NoError(t, err) + + // While the new antrea-agent starts, the denied Pod should never connect to the isolated Pod successfully. + for i := 0; i < 5; i++ { + checkFunc(deniedPod, deniedPodIPs, true) + } + antreaPod, err := data.getAntreaPodOnNode(workerNode) require.NoError(t, err) // Make sure the new antrea-agent disconnects from antrea-controller but connects to OVS. diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 1852f4dbea1..290eb3cdbeb 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -60,6 +60,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -297,7 +298,7 @@ type cmdAddDelTester struct { targetNS ns.NetNS request *cnimsg.CniCmdRequest vethName string - networkReadyCh chan struct{} + podNetworkWait *wait.Group } func (tester *cmdAddDelTester) setNS(testNS ns.NetNS, targetNS ns.NetNS) { @@ -566,14 +567,14 @@ func (tester *cmdAddDelTester) cmdDelTest(tc testCase, dataDir string) { func newTester() *cmdAddDelTester { tester := &cmdAddDelTester{} ifaceStore := interfacestore.NewInterfaceStore() - tester.networkReadyCh = make(chan struct{}) + tester.podNetworkWait = wait.NewGroup() tester.server = cniserver.New(testSock, "", getTestNodeConfig(false), k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - tester.networkReadyCh) + tester.podNetworkWait.Increment()) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) ctx := context.Background() tester.ctx = ctx @@ -609,7 +610,7 @@ func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { ovsServiceMock.EXPECT().GetOFPort(ovsPortname, false).Return(int32(10), nil).AnyTimes() ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, mock.Any(), mock.Any(), mock.Any(), uint16(0), nil).Return(nil) - close(tester.networkReadyCh) + tester.podNetworkWait.Done() // Test ips allocation prevResult, err := tester.cmdAddTest(tc, dataDir) testRequire.Nil(err) @@ -727,15 +728,14 @@ func setupChainTest( ) (server *cniserver.CNIServer, hostVeth, containerVeth net.Interface, err error) { if newServer { routeMock = routetest.NewMockInterface(controller) - networkReadyCh := make(chan struct{}) - close(networkReadyCh) + podNetworkWait := wait.NewGroup() server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - networkReadyCh) + podNetworkWait) } else { server = inServer } @@ -915,7 +915,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { ofServiceMock := openflowtest.NewMockClient(controller) routeMock := routetest.NewMockInterface(controller) ifaceStore := interfacestore.NewInterfaceStore() - networkReadyCh := make(chan struct{}) + podNetworkWait := wait.NewGroup() k8sClient := k8sFake.NewSimpleClientset(pod) server := cniserver.New( testSock, @@ -924,7 +924,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { k8sClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - networkReadyCh, + podNetworkWait, ) // call Initialize, which will run reconciliation and perform host-local IPAM garbage collection