diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 9f2eaa63eda..4de729b0c59 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -79,6 +79,7 @@ import ( "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/lazy" "antrea.io/antrea/pkg/util/podstore" + utilwait "antrea.io/antrea/pkg/util/wait" "antrea.io/antrea/pkg/version" ) @@ -226,9 +227,12 @@ func run(o *Options) error { // Create an ifaceStore that caches network interfaces managed by this node. ifaceStore := interfacestore.NewInterfaceStore() - // networkReadyCh is used to notify that the Node's network is ready. - // Functions that rely on the Node's network should wait for the channel to close. - networkReadyCh := make(chan struct{}) + // podNetworkWait is used to wait and notify that preconditions for Pod network are ready. + // Processes that are supposed to finish before enabling Pod network should increment the wait group and decrement + // it when finished. + // Processes that enable Pod network should wait for it. + podNetworkWait := utilwait.NewGroup() + // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will // cause the stopCh channel to be closed; if another signal is received before the program // exits, we will force exit. @@ -275,7 +279,7 @@ func run(o *Options) error { wireguardConfig, egressConfig, serviceConfig, - networkReadyCh, + podNetworkWait, stopCh, o.nodeType, o.config.ExternalNode.ExternalNodeNamespace, @@ -479,6 +483,7 @@ func run(o *Options) error { gwPort, tunPort, nodeConfig, + podNetworkWait, ) if err != nil { return fmt.Errorf("error creating new NetworkPolicy controller: %v", err) @@ -550,7 +555,7 @@ func run(o *Options) error { enableAntreaIPAM, o.config.DisableTXChecksumOffload, networkConfig, - networkReadyCh) + podNetworkWait) err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel) if err != nil { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 1b43a4ff89a..791e18b890f 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -23,7 +23,6 @@ import ( "os" "strconv" "strings" - "sync" "time" "github.com/containernetworking/plugins/pkg/ip" @@ -57,6 +56,7 @@ import ( "antrea.io/antrea/pkg/util/env" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -121,9 +121,9 @@ type Initializer struct { enableL7NetworkPolicy bool connectUplinkToBridge bool enableAntreaProxy bool - // networkReadyCh should be closed once the Node's network is ready. + // podNetworkWait should be decremented once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - networkReadyCh chan<- struct{} + podNetworkWait *utilwait.Group stopCh <-chan struct{} nodeType config.NodeType externalNodeNamespace string @@ -144,7 +144,7 @@ func NewInitializer( wireGuardConfig *config.WireGuardConfig, egressConfig *config.EgressConfig, serviceConfig *config.ServiceConfig, - networkReadyCh chan<- struct{}, + podNetworkWait *utilwait.Group, stopCh <-chan struct{}, nodeType config.NodeType, externalNodeNamespace string, @@ -168,7 +168,7 @@ func NewInitializer( egressConfig: egressConfig, serviceConfig: serviceConfig, l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{}, - networkReadyCh: networkReadyCh, + podNetworkWait: podNetworkWait, stopCh: stopCh, nodeType: nodeType, externalNodeNamespace: externalNodeNamespace, @@ -407,9 +407,6 @@ func (i *Initializer) restorePortConfigs() error { // Initialize sets up agent initial configurations. func (i *Initializer) Initialize() error { klog.Info("Setting up node network") - // wg is used to wait for the asynchronous initialization. - var wg sync.WaitGroup - if err := i.initNodeLocalConfig(); err != nil { return err } @@ -485,10 +482,10 @@ func (i *Initializer) Initialize() error { } if i.nodeType == config.K8sNode { - wg.Add(1) + i.podNetworkWait.Increment() // routeClient.Initialize() should be after i.setupOVSBridge() which // creates the host gateway interface. - if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil { + if err := i.routeClient.Initialize(i.nodeConfig, i.podNetworkWait.Done); err != nil { return err } @@ -496,12 +493,6 @@ func (i *Initializer) Initialize() error { if err := i.initOpenFlowPipeline(); err != nil { return err } - - // The Node's network is ready only when both synchronous and asynchronous initialization are done. - go func() { - wg.Wait() - close(i.networkReadyCh) - }() } else { // Install OpenFlow entries on OVS bridge. if err := i.initOpenFlowPipeline(); err != nil { diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 9f6398af48b..908ee6eb30c 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -36,6 +36,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" + "antrea.io/antrea/pkg/util/wait" ) type vethPair struct { @@ -413,7 +414,7 @@ func parsePrevResult(conf *types.NetworkConfig) error { return nil } -func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator) error { +func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error { // desiredPods is the set of Pods that should be present, based on the // current list of Pods got from the Kubernetes API. desiredPods := sets.New[string]() @@ -445,21 +446,34 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain missingIfConfigs = append(missingIfConfigs, containerConfig) continue } - // This interface matches an existing Pod. - // We rely on the interface cache / store - which is initialized from the persistent - // OVSDB - to map the Pod to its interface configuration. The interface - // configuration includes the parameters we need to replay the flows. - klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) - if err := pc.ofClient.InstallPodFlows( - containerConfig.InterfaceName, - containerConfig.IPs, - containerConfig.MAC, - uint32(containerConfig.OFPort), - containerConfig.VLANID, - nil, - ); err != nil { - klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name)) - } + go func(containerID, pod, namespace string) { + // Do not install Pod flows until all preconditions are met. + podNetworkWait.Wait() + // To avoid race condition with CNIServer CNI event handlers. + containerAccess.lockContainer(containerID) + defer containerAccess.unlockContainer(containerID) + + containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID) + if !exists { + klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, name), "containerID", containerID) + return + } + // This interface matches an existing Pod. + // We rely on the interface cache / store - which is initialized from the persistent + // OVSDB - to map the Pod to its interface configuration. The interface + // configuration includes the parameters we need to replay the flows. + klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) + if err := pc.ofClient.InstallPodFlows( + containerConfig.InterfaceName, + containerConfig.IPs, + containerConfig.MAC, + uint32(containerConfig.OFPort), + containerConfig.VLANID, + nil, + ); err != nil { + klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name)) + } + }(containerConfig.ContainerID, name, namespace) } else { // clean-up and delete interface klog.V(4).InfoS("Deleting interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 1426fdc131c..b24cf00ab94 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -44,6 +44,7 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -115,8 +116,8 @@ type CNIServer struct { enableSecondaryNetworkIPAM bool disableTXChecksumOffload bool networkConfig *config.NetworkConfig - // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. - networkReadyCh <-chan struct{} + // podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. + podNetworkWait *wait.Group } var supportedCNIVersionSet map[string]bool @@ -434,11 +435,9 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* return resp, err } - select { - case <-time.After(networkReadyTimeout): - klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) + if err := s.podNetworkWait.WaitWithTimeout(networkReadyTimeout); err != nil { + klog.ErrorS(err, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) return s.tryAgainLaterResponse(), nil - case <-s.networkReadyCh: } result := &ipam.IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}} @@ -610,7 +609,7 @@ func New( routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, networkConfig *config.NetworkConfig, - networkReadyCh <-chan struct{}, + podNetworkWait *wait.Group, ) *CNIServer { return &CNIServer{ cniSocket: cniSocket, @@ -625,7 +624,7 @@ func New( disableTXChecksumOffload: disableTXChecksumOffload, enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, networkConfig: networkConfig, - networkReadyCh: networkReadyCh, + podNetworkWait: podNetworkWait, } } @@ -739,7 +738,7 @@ func (s *CNIServer) reconcile() error { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } - return s.podConfigurator.reconcile(pods.Items, s.containerAccess) + return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait) } func init() { diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index f5ff7056e3f..ce6348c6203 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -43,6 +43,7 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -663,15 +664,13 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[ } func newCNIServer(t *testing.T) *CNIServer { - networkReadyCh := make(chan struct{}) cniServer := &CNIServer{ cniSocket: testSocket, nodeConfig: testNodeConfig, serverVersion: cni.AntreaCNIVersion, containerAccess: newContainerAccessArbitrator(), - networkReadyCh: networkReadyCh, + podNetworkWait: wait.NewGroup(), } - close(networkReadyCh) cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 36d4361c281..9ce8e0b4343 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -46,6 +46,7 @@ import ( "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -143,10 +144,11 @@ type Controller struct { fullSyncGroup sync.WaitGroup ifaceStore interfacestore.InterfaceStore // denyConnStore is for storing deny connections for flow exporter. - denyConnStore *connections.DenyConnectionStore - gwPort uint32 - tunPort uint32 - nodeConfig *config.NodeConfig + denyConnStore *connections.DenyConnectionStore + gwPort uint32 + tunPort uint32 + nodeConfig *config.NodeConfig + podNetworkWait *utilwait.Group // The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect // to antrea-controller on startup. @@ -181,7 +183,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, v4Enabled bool, v6Enabled bool, gwPort, tunPort uint32, - nodeConfig *config.NodeConfig) (*Controller, error) { + nodeConfig *config.NodeConfig, + podNetworkWait *utilwait.Group) (*Controller, error) { idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID) c := &Controller{ antreaClientProvider: antreaClientGetter, @@ -196,6 +199,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, gwPort: gwPort, tunPort: tunPort, nodeConfig: nodeConfig, + podNetworkWait: podNetworkWait.Increment(), } if l7NetworkPolicyEnabled { @@ -610,6 +614,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { klog.Infof("All watchers have completed full sync, installing flows for init events") // Batch install all rules in queue after fullSync is finished. c.processAllItemsInQueue() + c.podNetworkWait.Done() klog.Infof("Starting NetworkPolicy workers now") defer c.queue.ShutDown() diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 1552281039d..58df524a48c 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -47,6 +47,7 @@ import ( "antrea.io/antrea/pkg/client/clientset/versioned/fake" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const testNamespace = "ns1" @@ -76,7 +77,30 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} fs := afero.NewMemMapFs() - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, + nil, + nil, + fs, + "node1", + podUpdateChannel, + nil, + groupCounters, + ch2, + true, + true, + true, + true, + false, + nil, + testAsyncDeleteInterval, + "8.8.8.8:53", + config.K8sNode, + true, + false, + config.HostGatewayOFPort, + config.DefaultTunOFPort, + &config.NodeConfig{}, + wait.NewGroup()) reconciler := newMockReconciler() controller.reconciler = reconciler controller.auditLogger = nil diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go new file mode 100644 index 00000000000..6897ec2fb24 --- /dev/null +++ b/pkg/util/wait/wait.go @@ -0,0 +1,85 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "fmt" + "sync" + "time" + + "k8s.io/utils/clock" +) + +// Group allows to wait for a collection of goroutines to finish with a timeout or a stop channel. +type Group struct { + wg *sync.WaitGroup + doneCh chan struct{} + once sync.Once + clock clock.Clock +} + +func NewGroup() *Group { + return newGroupWithClock(clock.RealClock{}) +} + +func newGroupWithClock(clock clock.Clock) *Group { + return &Group{ + wg: &sync.WaitGroup{}, + doneCh: make(chan struct{}), + clock: clock, + } +} + +func (g *Group) Increment() *Group { + g.wg.Add(1) + return g +} + +func (g *Group) Done() { + g.wg.Done() +} + +func (g *Group) wait() { + g.once.Do(func() { + go func() { + g.wg.Wait() + close(g.doneCh) + }() + }) +} + +func (g *Group) WaitWithTimeout(timeout time.Duration) error { + g.wait() + select { + case <-g.doneCh: + return nil + case <-g.clock.After(timeout): + return fmt.Errorf("timeout waiting for group") + } +} + +func (g *Group) WaitUntil(stopCh <-chan struct{}) error { + g.wait() + select { + case <-g.doneCh: + return nil + case <-stopCh: + return fmt.Errorf("stopCh closed, stop waiting") + } +} + +func (g *Group) Wait() { + g.wg.Wait() +} diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go new file mode 100644 index 00000000000..f471685b1d9 --- /dev/null +++ b/pkg/util/wait/wait_test.go @@ -0,0 +1,133 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clock "k8s.io/utils/clock/testing" +) + +func TestGroupWaitWithTimeout(t *testing.T) { + const timeout = 100 * time.Millisecond + tests := []struct { + name string + add int + processFn func(group *Group, fakeClock *clock.FakeClock) + expectWaitErr bool + }{ + { + name: "add only", + add: 1, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + fakeClock.Step(timeout) + }, + expectWaitErr: true, + }, + { + name: "add greater than done", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout) + }, + expectWaitErr: true, + }, + { + name: "add equal to done", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout / 2) + group.Done() + }, + expectWaitErr: false, + }, + { + name: "add with delay", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout * 2) + group.Done() + }, + expectWaitErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + g := newGroupWithClock(fakeClock) + for i := 0; i < tt.add; i++ { + g.Increment() + } + resCh := make(chan error, 1) + go func() { + resCh <- g.WaitWithTimeout(timeout) + }() + require.Eventually(t, func() bool { + return fakeClock.HasWaiters() + }, 1*time.Second, 10*time.Millisecond) + tt.processFn(g, fakeClock) + err := <-resCh + if tt.expectWaitErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGroupWait(t *testing.T) { + g := NewGroup() + g.Increment() + returnedCh := make(chan struct{}) + go func() { + g.Wait() + close(returnedCh) + }() + select { + case <-time.After(100 * time.Millisecond): + case <-returnedCh: + t.Errorf("Wait should not return before it's done") + } + g.Done() + select { + case <-time.After(500 * time.Millisecond): + t.Errorf("Wait should return after it's done") + case <-returnedCh: + } +} + +func TestGroupWaitUntil(t *testing.T) { + g := NewGroup() + g.Increment() + stopCh := make(chan struct{}) + go func() { + time.Sleep(100 * time.Millisecond) + close(stopCh) + }() + err := g.WaitUntil(stopCh) + assert.Error(t, err) + + stopCh = make(chan struct{}) + g.Done() + err = g.WaitUntil(stopCh) + assert.NoError(t, err) +} diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index de169df2774..56fb0845e75 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -748,7 +748,8 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { checkOne := func(clientPod, serverPod string, serverIP *net.IP) { defer wg.Done() if serverIP != nil { - _, _, err := data.runWgetCommandFromTestPodWithRetry(clientPod, data.testNamespace, nginxContainerName, serverIP.String(), 1) + cmd := []string{"wget", "-O", "-", serverIP.String(), "-T", "1"} + _, _, err := data.RunCommandFromPod(data.testNamespace, clientPod, nginxContainerName, cmd) if expectErr && err == nil { t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) } else if !expectErr && err != nil { @@ -779,6 +780,12 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { // Restart the antrea-agent. _, err = data.deleteAntreaAgentOnNode(workerNode, 30, defaultTimeout) require.NoError(t, err) + + // While the new antrea-agent starts, the denied Pod should never connect to the isolated Pod successfully. + for i := 0; i < 5; i++ { + checkFunc(deniedPod, deniedPodIPs, true) + } + antreaPod, err := data.getAntreaPodOnNode(workerNode) require.NoError(t, err) // Make sure the new antrea-agent disconnects from antrea-controller but connects to OVS. diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 1852f4dbea1..290eb3cdbeb 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -60,6 +60,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -297,7 +298,7 @@ type cmdAddDelTester struct { targetNS ns.NetNS request *cnimsg.CniCmdRequest vethName string - networkReadyCh chan struct{} + podNetworkWait *wait.Group } func (tester *cmdAddDelTester) setNS(testNS ns.NetNS, targetNS ns.NetNS) { @@ -566,14 +567,14 @@ func (tester *cmdAddDelTester) cmdDelTest(tc testCase, dataDir string) { func newTester() *cmdAddDelTester { tester := &cmdAddDelTester{} ifaceStore := interfacestore.NewInterfaceStore() - tester.networkReadyCh = make(chan struct{}) + tester.podNetworkWait = wait.NewGroup() tester.server = cniserver.New(testSock, "", getTestNodeConfig(false), k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - tester.networkReadyCh) + tester.podNetworkWait.Increment()) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) ctx := context.Background() tester.ctx = ctx @@ -609,7 +610,7 @@ func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { ovsServiceMock.EXPECT().GetOFPort(ovsPortname, false).Return(int32(10), nil).AnyTimes() ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, mock.Any(), mock.Any(), mock.Any(), uint16(0), nil).Return(nil) - close(tester.networkReadyCh) + tester.podNetworkWait.Done() // Test ips allocation prevResult, err := tester.cmdAddTest(tc, dataDir) testRequire.Nil(err) @@ -727,15 +728,14 @@ func setupChainTest( ) (server *cniserver.CNIServer, hostVeth, containerVeth net.Interface, err error) { if newServer { routeMock = routetest.NewMockInterface(controller) - networkReadyCh := make(chan struct{}) - close(networkReadyCh) + podNetworkWait := wait.NewGroup() server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - networkReadyCh) + podNetworkWait) } else { server = inServer } @@ -915,7 +915,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { ofServiceMock := openflowtest.NewMockClient(controller) routeMock := routetest.NewMockInterface(controller) ifaceStore := interfacestore.NewInterfaceStore() - networkReadyCh := make(chan struct{}) + podNetworkWait := wait.NewGroup() k8sClient := k8sFake.NewSimpleClientset(pod) server := cniserver.New( testSock, @@ -924,7 +924,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { k8sClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - networkReadyCh, + podNetworkWait, ) // call Initialize, which will run reconciliation and perform host-local IPAM garbage collection