diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 92cfae5c5be..241ba6b1fa3 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -246,6 +246,20 @@ func run(o *Options) error { // Processes that enable Pod network should wait for it. podNetworkWait := utilwait.NewGroup() + // flowRestoreCompleteWait is used to wait until "essential" flows have been installed + // successfully in OVS. These flows include NetworkPolicy flows (guaranteed by + // podNetworkWait), Pod forwarding flows and flows installed by the + // NodeRouteController. Additional requirements may be added in the future. + flowRestoreCompleteWait := utilwait.NewGroup() + // We ensure that flowRestoreCompleteWait.Wait() cannot return until podNetworkWait.Wait() + // returns. This is not strictly necessary because it is guatanteed by the CNIServer Pod + // reconciliation logic but it helps with readability. + flowRestoreCompleteWait.Increment() + go func() { + defer flowRestoreCompleteWait.Done() + podNetworkWait.Wait() + }() + // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will // cause the stopCh channel to be closed; if another signal is received before the program // exits, we will force exit. @@ -293,6 +307,7 @@ func run(o *Options) error { egressConfig, serviceConfig, podNetworkWait, + flowRestoreCompleteWait, stopCh, o.nodeType, o.config.ExternalNode.ExternalNodeNamespace, @@ -326,6 +341,7 @@ func run(o *Options) error { nodeConfig, agentInitializer.GetWireGuardClient(), ipsecCertController, + flowRestoreCompleteWait, ) } @@ -590,7 +606,8 @@ func run(o *Options) error { enableAntreaIPAM, o.config.DisableTXChecksumOffload, networkConfig, - podNetworkWait) + podNetworkWait, + flowRestoreCompleteWait) err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel) if err != nil { @@ -632,20 +649,6 @@ func run(o *Options) error { o.enableAntreaProxy) } - // TODO: we should call this after installing flows for initial node routes - // and initial NetworkPolicies so that no packets will be mishandled. - if err := agentInitializer.FlowRestoreComplete(); err != nil { - return err - } - // ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete - if connectUplinkToBridge { - // Restore network config before shutdown. ovsdbConnection must be alive when restore. - defer agentInitializer.RestoreOVSBridge() - if err := agentInitializer.ConnectUplinkToOVSBridge(); err != nil { - return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err) - } - } - if err := antreaClientProvider.RunOnce(); err != nil { return err } @@ -844,6 +847,21 @@ func run(o *Options) error { go mcStrechedNetworkPolicyController.Run(stopCh) } + klog.InfoS("Waiting for flow restoration to complete") + flowRestoreCompleteWait.Wait() + if err := agentInitializer.FlowRestoreComplete(); err != nil { + return err + } + klog.InfoS("Flow restoration complete") + // ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete + if connectUplinkToBridge { + // Restore network config before shutdown. ovsdbConnection must be alive when restore. + defer agentInitializer.RestoreOVSBridge() + if err := agentInitializer.ConnectUplinkToOVSBridge(); err != nil { + return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err) + } + } + // statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for // NetworkPolicy stats and Multicast stats. if features.DefaultFeatureGate.Enabled(features.NetworkPolicyStats) { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index c79ecad84b5..608e48a4e30 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -132,10 +132,14 @@ type Initializer struct { enableAntreaProxy bool // podNetworkWait should be decremented once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - podNetworkWait *utilwait.Group - stopCh <-chan struct{} - nodeType config.NodeType - externalNodeNamespace string + podNetworkWait *utilwait.Group + // flowRestoreCompleteWait is used to indicate that required flows have + // been installed. We use it to determine whether flows from previous + // rounds can be deleted. + flowRestoreCompleteWait *utilwait.Group + stopCh <-chan struct{} + nodeType config.NodeType + externalNodeNamespace string } func NewInitializer( @@ -154,6 +158,7 @@ func NewInitializer( egressConfig *config.EgressConfig, serviceConfig *config.ServiceConfig, podNetworkWait *utilwait.Group, + flowRestoreCompleteWait *utilwait.Group, stopCh <-chan struct{}, nodeType config.NodeType, externalNodeNamespace string, @@ -163,29 +168,30 @@ func NewInitializer( enableL7FlowExporter bool, ) *Initializer { return &Initializer{ - ovsBridgeClient: ovsBridgeClient, - ovsCtlClient: ovsCtlClient, - client: k8sClient, - crdClient: crdClient, - ifaceStore: ifaceStore, - ofClient: ofClient, - routeClient: routeClient, - ovsBridge: ovsBridge, - hostGateway: hostGateway, - mtu: mtu, - networkConfig: networkConfig, - wireGuardConfig: wireGuardConfig, - egressConfig: egressConfig, - serviceConfig: serviceConfig, - l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{}, - podNetworkWait: podNetworkWait, - stopCh: stopCh, - nodeType: nodeType, - externalNodeNamespace: externalNodeNamespace, - connectUplinkToBridge: connectUplinkToBridge, - enableAntreaProxy: enableAntreaProxy, - enableL7NetworkPolicy: enableL7NetworkPolicy, - enableL7FlowExporter: enableL7FlowExporter, + ovsBridgeClient: ovsBridgeClient, + ovsCtlClient: ovsCtlClient, + client: k8sClient, + crdClient: crdClient, + ifaceStore: ifaceStore, + ofClient: ofClient, + routeClient: routeClient, + ovsBridge: ovsBridge, + hostGateway: hostGateway, + mtu: mtu, + networkConfig: networkConfig, + wireGuardConfig: wireGuardConfig, + egressConfig: egressConfig, + serviceConfig: serviceConfig, + l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{}, + podNetworkWait: podNetworkWait, + flowRestoreCompleteWait: flowRestoreCompleteWait, + stopCh: stopCh, + nodeType: nodeType, + externalNodeNamespace: externalNodeNamespace, + connectUplinkToBridge: connectUplinkToBridge, + enableAntreaProxy: enableAntreaProxy, + enableL7NetworkPolicy: enableL7NetworkPolicy, + enableL7FlowExporter: enableL7FlowExporter, } } @@ -529,13 +535,14 @@ func (i *Initializer) initOpenFlowPipeline() error { // the new round number), otherwise we would disrupt the dataplane. Unfortunately, // the time required for convergence may be large and there is no simple way to // determine when is a right time to perform the cleanup task. - // TODO: introduce a deterministic mechanism through which the different entities - // responsible for installing flows can notify the agent that this deletion - // operation can take place. A waitGroup can be created here and notified when - // full sync in agent networkpolicy controller is complete. This would signal NP - // flows have been synced once. Other mechanisms are still needed for node flows - // fullSync check. + // We took a first step towards introducing a deterministic mechanism through which + // the different entities responsible for installing flows can notify the agent that + // this deletion operation can take place. i.flowRestoreCompleteWait.Wait() will + // block until some key flows (NetworkPolicy flows, Pod flows, Node route flows) + // have been installed. But not all entities responsible for installing flows + // currently use this wait group, so we block for a minimum of 10 seconds. time.Sleep(10 * time.Second) + i.flowRestoreCompleteWait.Wait() klog.Info("Deleting stale flows from previous round if any") if err := i.ofClient.DeleteStaleFlows(); err != nil { klog.Errorf("Error when deleting stale flows from previous round: %v", err) diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index d65533da3e1..e44e739fd61 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -19,6 +19,7 @@ import ( "fmt" "net" "strings" + "sync" current "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/cni/pkg/version" @@ -404,7 +405,7 @@ func parsePrevResult(conf *types.NetworkConfig) error { return nil } -func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error { +func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait, flowRestoreCompleteWait *wait.Group) error { // desiredPods is the set of Pods that should be present, based on the // current list of Pods got from the Kubernetes API. desiredPods := sets.New[string]() @@ -413,6 +414,8 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // knownInterfaces is the list of interfaces currently in the local cache. knownInterfaces := pc.ifaceStore.GetInterfacesByType(interfacestore.ContainerInterface) + var podWg sync.WaitGroup + for _, pod := range pods { // Skip Pods for which we are not in charge of the networking. if pod.Spec.HostNetwork { @@ -436,7 +439,9 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain missingIfConfigs = append(missingIfConfigs, containerConfig) continue } + podWg.Add(1) go func(containerID, pod, namespace string) { + defer podWg.Done() // Do not install Pod flows until all preconditions are met. podNetworkWait.Wait() // To avoid race condition with CNIServer CNI event handlers. @@ -452,7 +457,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // We rely on the interface cache / store - which is initialized from the persistent // OVSDB - to map the Pod to its interface configuration. The interface // configuration includes the parameters we need to replay the flows. - klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) + klog.InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) if err := pc.ofClient.InstallPodFlows( containerConfig.InterfaceName, containerConfig.IPs, @@ -473,6 +478,10 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // interface should no longer be in store after the call to removeInterfaces } } + go func() { + defer flowRestoreCompleteWait.Done() + podWg.Wait() + }() if len(missingIfConfigs) > 0 { pc.reconcileMissingPods(missingIfConfigs, containerAccess) } diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index efaa1873c14..bc77dcc7224 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -123,6 +123,8 @@ type CNIServer struct { networkConfig *config.NetworkConfig // podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. podNetworkWait *wait.Group + // flowRestoreCompleteWait will be decremented and Pod reconciliation is completed. + flowRestoreCompleteWait *wait.Group } var supportedCNIVersionSet map[string]bool @@ -630,7 +632,7 @@ func New( routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, networkConfig *config.NetworkConfig, - podNetworkWait *wait.Group, + podNetworkWait, flowRestoreCompleteWait *wait.Group, ) *CNIServer { return &CNIServer{ cniSocket: cniSocket, @@ -646,6 +648,7 @@ func New( enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, networkConfig: networkConfig, podNetworkWait: podNetworkWait, + flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(), } } @@ -748,7 +751,7 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, // installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the // K8s apiserver and replay the necessary flows. func (s *CNIServer) reconcile() error { - klog.InfoS("Reconciliation for CNI server") + klog.InfoS("Starting reconciliation for CNI server") // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from // the watch cache in kube-apiserver. pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ @@ -759,7 +762,7 @@ func (s *CNIServer) reconcile() error { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } - return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait) + return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait, s.flowRestoreCompleteWait) } func init() { diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index b1f0f4b6699..58c10459aa2 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -761,11 +761,12 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[ func newCNIServer(t *testing.T) *CNIServer { cniServer := &CNIServer{ - cniSocket: testSocket, - nodeConfig: testNodeConfig, - serverVersion: cni.AntreaCNIVersion, - containerAccess: newContainerAccessArbitrator(), - podNetworkWait: wait.NewGroup(), + cniSocket: testSocket, + nodeConfig: testNodeConfig, + serverVersion: cni.AntreaCNIVersion, + containerAccess: newContainerAccessArbitrator(), + podNetworkWait: wait.NewGroup(), + flowRestoreCompleteWait: wait.NewGroup(), } cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 559d9b68498..914294a9c3b 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -15,6 +15,7 @@ package noderoute import ( + "context" "fmt" "net" "time" @@ -26,6 +27,7 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache/synctrack" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -41,6 +43,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsctl" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -80,6 +83,12 @@ type Controller struct { // or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate // to be configured before installing routes/flows to peer Nodes to prevent unencrypted traffic across Nodes. ipsecCertificateManager ipseccertificate.Manager + // flowRestoreCompleteWait is to be decremented after installing flows for initial Nodes. + flowRestoreCompleteWait *utilwait.Group + // hasProcessedInitialList keeps track of whether the initial informer list as been + // processed by workers. + // See https://github.com/kubernetes/apiserver/blob/v0.30.1/pkg/admission/plugin/policy/internal/generic/controller.go + hasProcessedInitialList synctrack.AsyncTracker[string] } // NewNodeRouteController instantiates a new Controller object which will process Node events @@ -95,6 +104,7 @@ func NewNodeRouteController( nodeConfig *config.NodeConfig, wireguardClient wireguard.Interface, ipsecCertificateManager ipseccertificate.Manager, + flowRestoreCompleteWait *utilwait.Group, ) *Controller { controller := &Controller{ ovsBridgeClient: ovsBridgeClient, @@ -111,21 +121,25 @@ func NewNodeRouteController( installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}), wireGuardClient: wireguardClient, ipsecCertificateManager: ipsecCertificateManager, + flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(), } - nodeInformer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(cur interface{}) { - controller.enqueueNode(cur) + registration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerDetailedFuncs{ + AddFunc: func(cur interface{}, isInInitialList bool) { + controller.enqueueNode(cur, isInInitialList) }, UpdateFunc: func(old, cur interface{}) { - controller.enqueueNode(cur) + controller.enqueueNode(cur, false) }, DeleteFunc: func(old interface{}) { - controller.enqueueNode(old) + controller.enqueueNode(old, false) }, }, nodeResyncPeriod, ) + // UpstreamHasSynced is used by hasProcessedInitialList to determine whether even handlers + // have been called for the initial list. + controller.hasProcessedInitialList.UpstreamHasSynced = registration.HasSynced return controller } @@ -153,7 +167,7 @@ type nodeRouteInfo struct { // enqueueNode adds an object to the controller work queue // obj could be a *corev1.Node, or a DeletionFinalStateUnknown item. -func (c *Controller) enqueueNode(obj interface{}) { +func (c *Controller) enqueueNode(obj interface{}, isInInitialList bool) { node, isNode := obj.(*corev1.Node) if !isNode { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) @@ -170,6 +184,9 @@ func (c *Controller) enqueueNode(obj interface{}) { // Ignore notifications for this Node, no need to establish connectivity to itself. if node.Name != c.nodeConfig.Name { + if isInInitialList { + c.hasProcessedInitialList.Start(node.Name) + } c.queue.Add(node.Name) } } @@ -327,6 +344,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { // underlying network. Therefore it needs not know the routes to // peer Pod CIDRs. if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { + c.flowRestoreCompleteWait.Done() <-stopCh return } @@ -352,6 +370,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) { for i := 0; i < defaultWorkers; i++ { go wait.Until(c.worker, time.Second, stopCh) } + + go func() { + // When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait. + defer c.flowRestoreCompleteWait.Done() + wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + return c.hasProcessedInitialList.HasSynced(), nil + }) + }() + <-stopCh } @@ -380,14 +407,21 @@ func (c *Controller) processNextWorkItem() bool { defer c.queue.Done(obj) // We expect strings (Node name) to come off the workqueue. - if key, ok := obj.(string); !ok { + key, ok := obj.(string) + if !ok { // As the item in the workqueue is actually invalid, we call Forget here else we'd // go into a loop of attempting to process a work item that is invalid. // This should not happen: enqueueNode only enqueues strings. c.queue.Forget(obj) klog.Errorf("Expected string in work queue but got %#v", obj) return true - } else if err := c.syncNodeRoute(key); err == nil { + } + + // We call Finished unconditionally even if this only matters for the initial list of + // Nodes. There is no harm in calling Finished without a corresponding call to Start. + defer c.hasProcessedInitialList.Finished(key) + + if err := c.syncNodeRoute(key); err == nil { // If no error occurs we Forget this item so it does not get queued again until // another change happens. c.queue.Forget(key) diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index a3f6df54f3f..b48838f87e7 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -41,6 +41,7 @@ import ( ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing" utilip "antrea.io/antrea/pkg/util/ip" + utilwait "antrea.io/antrea/pkg/util/wait" ) var ( @@ -109,7 +110,7 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig, objects .. c := NewNodeRouteController(informerFactory.Core().V1().Nodes(), ofClient, ovsCtlClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{ IPv4: nil, MAC: gatewayMAC, - }}, wireguardClient, ipsecCertificateManager) + }}, wireguardClient, ipsecCertificateManager, utilwait.NewGroup()) return &fakeController{ Controller: c, clientset: clientset, @@ -733,3 +734,21 @@ func TestDeleteNodeRoute(t *testing.T) { }) } } + +func TestInitialListHasSynced(t *testing.T) { + c := newController(t, &config.NetworkConfig{}, node1) + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + + require.Error(t, c.flowRestoreCompleteWait.WaitWithTimeout(100*time.Millisecond)) + + c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.processNextWorkItem() + + assert.True(t, c.hasProcessedInitialList.HasSynced()) +} diff --git a/test/e2e/connectivity_test.go b/test/e2e/connectivity_test.go index 64c69d4d9a8..6c6c33c0ff3 100644 --- a/test/e2e/connectivity_test.go +++ b/test/e2e/connectivity_test.go @@ -60,7 +60,6 @@ func TestConnectivity(t *testing.T) { t.Run("testOVSRestartSameNode", func(t *testing.T) { skipIfNotIPv4Cluster(t) skipIfHasWindowsNodes(t) - t.Skip("Skipping test for now as it fails consistently") testOVSRestartSameNode(t, data, data.testNamespace) }) t.Run("testOVSFlowReplay", func(t *testing.T) { @@ -327,12 +326,7 @@ func testPodConnectivityAfterAntreaRestart(t *testing.T, data *TestData, namespa // testOVSRestartSameNode verifies that datapath flows are not removed when the Antrea Agent Pod is // stopped gracefully (e.g. as part of a RollingUpdate). The test sends ARP requests every 1s and -// checks that there is no packet loss during the restart. This test does not apply to the userspace -// ndetdev datapath, since in this case the datapath functionality is implemented by the -// ovs-vswitchd daemon itself. When ovs-vswitchd restarts, datapath flows are flushed and it may -// take some time for the Agent to replay the flows. This will not impact this test, since we are -// just testing L2 connectivity between 2 Pods on the same Node, and the default behavior of the -// br-int bridge is to implement normal L2 forwarding. +// checks that there is no packet loss during the restart. func testOVSRestartSameNode(t *testing.T, data *TestData, namespace string) { workerNode := workerNodeName(1) t.Logf("Creating two toolbox test Pods on '%s'", workerNode) @@ -364,7 +358,7 @@ func testOVSRestartSameNode(t *testing.T, data *TestData, namespace string) { maxLossRate = 10 } if lossRate > maxLossRate { - t.Logf(stdout) + t.Log(stdout) return fmt.Errorf("arping loss rate is %f%%", lossRate) } return nil diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 290eb3cdbeb..aaf580514c6 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -27,6 +27,7 @@ import ( "strings" "testing" "text/template" + "time" "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" @@ -292,13 +293,14 @@ func ipVersion(ip net.IP) string { } type cmdAddDelTester struct { - server *cniserver.CNIServer - ctx context.Context - testNS ns.NetNS - targetNS ns.NetNS - request *cnimsg.CniCmdRequest - vethName string - podNetworkWait *wait.Group + server *cniserver.CNIServer + ctx context.Context + testNS ns.NetNS + targetNS ns.NetNS + request *cnimsg.CniCmdRequest + vethName string + podNetworkWait *wait.Group + flowRestoreCompleteWait *wait.Group } func (tester *cmdAddDelTester) setNS(testNS ns.NetNS, targetNS ns.NetNS) { @@ -568,13 +570,16 @@ func newTester() *cmdAddDelTester { tester := &cmdAddDelTester{} ifaceStore := interfacestore.NewInterfaceStore() tester.podNetworkWait = wait.NewGroup() + tester.flowRestoreCompleteWait = wait.NewGroup() tester.server = cniserver.New(testSock, "", getTestNodeConfig(false), k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - tester.podNetworkWait.Increment()) + tester.podNetworkWait.Increment(), + tester.flowRestoreCompleteWait, + ) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) ctx := context.Background() tester.ctx = ctx @@ -583,6 +588,7 @@ func newTester() *cmdAddDelTester { func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { testRequire := require.New(tc.t) + testAssert := assert.New(tc.t) testRequire.Equal(cniVersion, tc.CNIVersion) @@ -611,6 +617,9 @@ func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, mock.Any(), mock.Any(), mock.Any(), uint16(0), nil).Return(nil) tester.podNetworkWait.Done() + + testAssert.NoError(tester.flowRestoreCompleteWait.WaitWithTimeout(1 * time.Second)) + // Test ips allocation prevResult, err := tester.cmdAddTest(tc, dataDir) testRequire.Nil(err) @@ -729,13 +738,14 @@ func setupChainTest( if newServer { routeMock = routetest.NewMockInterface(controller) podNetworkWait := wait.NewGroup() + flowRestoreCompleteWait := wait.NewGroup() server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - podNetworkWait) + podNetworkWait, flowRestoreCompleteWait) } else { server = inServer } @@ -916,6 +926,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { routeMock := routetest.NewMockInterface(controller) ifaceStore := interfacestore.NewInterfaceStore() podNetworkWait := wait.NewGroup() + flowRestoreCompleteWait := wait.NewGroup() k8sClient := k8sFake.NewSimpleClientset(pod) server := cniserver.New( testSock, @@ -924,7 +935,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { k8sClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - podNetworkWait, + podNetworkWait, flowRestoreCompleteWait, ) // call Initialize, which will run reconciliation and perform host-local IPAM garbage collection