From 7ea44b57df10f62550f3e1024bf505cc0d41897b Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Fri, 17 May 2024 18:36:12 -0700 Subject: [PATCH] Delay removal of flow-restore-wait Until a set of "essential" flows has been installed. At the moment, we include NetworkPolicy flows (using podNetworkWait as the signal), Pod forwarding flows (reconciled by the CNIServer), and Node routing flows (installed by the NodeRouteController). This set can be extended in the future if desired. We leverage the wrapper around sync.WaitGroup which was introduced previously in #5777. It simplifies unit testing, and we can achieve some symmetry with podNetworkWait. We can also start leveraging this new wait group (flowRestoreCompleteWait) as the signal to delete flows from previous rounds. However, at the moment this is incomplete, as we don't wait for all controllers to signal that they have installed initial flows. Because the NodeRouteController does not have an initial "reconcile" operation (like the CNIServer) to install flows for the initial Node list, we instead rely on a different mechanims provided by upstream K8s for controllers. When registering event handlers, we can request for the ADD handler to include a boolean flag indicating whether the object is part of the initial list retrieved by the informer. Using this mechanism, we can reliably signal through flowRestoreCompleteWait when this initial list of Nodes has been synced at least once. Fixes #6338 Signed-off-by: Antonin Bas --- cmd/antrea-agent/agent.go | 48 ++++++++---- pkg/agent/agent.go | 73 ++++++++++--------- pkg/agent/cniserver/pod_configuration.go | 13 +++- pkg/agent/cniserver/server.go | 9 ++- pkg/agent/cniserver/server_test.go | 11 +-- .../noderoute/node_route_controller.go | 52 ++++++++++--- .../noderoute/node_route_controller_test.go | 21 +++++- test/e2e/connectivity_test.go | 10 +-- test/integration/agent/cniserver_test.go | 31 +++++--- 9 files changed, 182 insertions(+), 86 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 92cfae5c5be..241ba6b1fa3 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -246,6 +246,20 @@ func run(o *Options) error { // Processes that enable Pod network should wait for it. podNetworkWait := utilwait.NewGroup() + // flowRestoreCompleteWait is used to wait until "essential" flows have been installed + // successfully in OVS. These flows include NetworkPolicy flows (guaranteed by + // podNetworkWait), Pod forwarding flows and flows installed by the + // NodeRouteController. Additional requirements may be added in the future. + flowRestoreCompleteWait := utilwait.NewGroup() + // We ensure that flowRestoreCompleteWait.Wait() cannot return until podNetworkWait.Wait() + // returns. This is not strictly necessary because it is guatanteed by the CNIServer Pod + // reconciliation logic but it helps with readability. + flowRestoreCompleteWait.Increment() + go func() { + defer flowRestoreCompleteWait.Done() + podNetworkWait.Wait() + }() + // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will // cause the stopCh channel to be closed; if another signal is received before the program // exits, we will force exit. @@ -293,6 +307,7 @@ func run(o *Options) error { egressConfig, serviceConfig, podNetworkWait, + flowRestoreCompleteWait, stopCh, o.nodeType, o.config.ExternalNode.ExternalNodeNamespace, @@ -326,6 +341,7 @@ func run(o *Options) error { nodeConfig, agentInitializer.GetWireGuardClient(), ipsecCertController, + flowRestoreCompleteWait, ) } @@ -590,7 +606,8 @@ func run(o *Options) error { enableAntreaIPAM, o.config.DisableTXChecksumOffload, networkConfig, - podNetworkWait) + podNetworkWait, + flowRestoreCompleteWait) err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel) if err != nil { @@ -632,20 +649,6 @@ func run(o *Options) error { o.enableAntreaProxy) } - // TODO: we should call this after installing flows for initial node routes - // and initial NetworkPolicies so that no packets will be mishandled. - if err := agentInitializer.FlowRestoreComplete(); err != nil { - return err - } - // ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete - if connectUplinkToBridge { - // Restore network config before shutdown. ovsdbConnection must be alive when restore. - defer agentInitializer.RestoreOVSBridge() - if err := agentInitializer.ConnectUplinkToOVSBridge(); err != nil { - return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err) - } - } - if err := antreaClientProvider.RunOnce(); err != nil { return err } @@ -844,6 +847,21 @@ func run(o *Options) error { go mcStrechedNetworkPolicyController.Run(stopCh) } + klog.InfoS("Waiting for flow restoration to complete") + flowRestoreCompleteWait.Wait() + if err := agentInitializer.FlowRestoreComplete(); err != nil { + return err + } + klog.InfoS("Flow restoration complete") + // ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete + if connectUplinkToBridge { + // Restore network config before shutdown. ovsdbConnection must be alive when restore. + defer agentInitializer.RestoreOVSBridge() + if err := agentInitializer.ConnectUplinkToOVSBridge(); err != nil { + return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err) + } + } + // statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for // NetworkPolicy stats and Multicast stats. if features.DefaultFeatureGate.Enabled(features.NetworkPolicyStats) { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index c79ecad84b5..608e48a4e30 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -132,10 +132,14 @@ type Initializer struct { enableAntreaProxy bool // podNetworkWait should be decremented once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - podNetworkWait *utilwait.Group - stopCh <-chan struct{} - nodeType config.NodeType - externalNodeNamespace string + podNetworkWait *utilwait.Group + // flowRestoreCompleteWait is used to indicate that required flows have + // been installed. We use it to determine whether flows from previous + // rounds can be deleted. + flowRestoreCompleteWait *utilwait.Group + stopCh <-chan struct{} + nodeType config.NodeType + externalNodeNamespace string } func NewInitializer( @@ -154,6 +158,7 @@ func NewInitializer( egressConfig *config.EgressConfig, serviceConfig *config.ServiceConfig, podNetworkWait *utilwait.Group, + flowRestoreCompleteWait *utilwait.Group, stopCh <-chan struct{}, nodeType config.NodeType, externalNodeNamespace string, @@ -163,29 +168,30 @@ func NewInitializer( enableL7FlowExporter bool, ) *Initializer { return &Initializer{ - ovsBridgeClient: ovsBridgeClient, - ovsCtlClient: ovsCtlClient, - client: k8sClient, - crdClient: crdClient, - ifaceStore: ifaceStore, - ofClient: ofClient, - routeClient: routeClient, - ovsBridge: ovsBridge, - hostGateway: hostGateway, - mtu: mtu, - networkConfig: networkConfig, - wireGuardConfig: wireGuardConfig, - egressConfig: egressConfig, - serviceConfig: serviceConfig, - l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{}, - podNetworkWait: podNetworkWait, - stopCh: stopCh, - nodeType: nodeType, - externalNodeNamespace: externalNodeNamespace, - connectUplinkToBridge: connectUplinkToBridge, - enableAntreaProxy: enableAntreaProxy, - enableL7NetworkPolicy: enableL7NetworkPolicy, - enableL7FlowExporter: enableL7FlowExporter, + ovsBridgeClient: ovsBridgeClient, + ovsCtlClient: ovsCtlClient, + client: k8sClient, + crdClient: crdClient, + ifaceStore: ifaceStore, + ofClient: ofClient, + routeClient: routeClient, + ovsBridge: ovsBridge, + hostGateway: hostGateway, + mtu: mtu, + networkConfig: networkConfig, + wireGuardConfig: wireGuardConfig, + egressConfig: egressConfig, + serviceConfig: serviceConfig, + l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{}, + podNetworkWait: podNetworkWait, + flowRestoreCompleteWait: flowRestoreCompleteWait, + stopCh: stopCh, + nodeType: nodeType, + externalNodeNamespace: externalNodeNamespace, + connectUplinkToBridge: connectUplinkToBridge, + enableAntreaProxy: enableAntreaProxy, + enableL7NetworkPolicy: enableL7NetworkPolicy, + enableL7FlowExporter: enableL7FlowExporter, } } @@ -529,13 +535,14 @@ func (i *Initializer) initOpenFlowPipeline() error { // the new round number), otherwise we would disrupt the dataplane. Unfortunately, // the time required for convergence may be large and there is no simple way to // determine when is a right time to perform the cleanup task. - // TODO: introduce a deterministic mechanism through which the different entities - // responsible for installing flows can notify the agent that this deletion - // operation can take place. A waitGroup can be created here and notified when - // full sync in agent networkpolicy controller is complete. This would signal NP - // flows have been synced once. Other mechanisms are still needed for node flows - // fullSync check. + // We took a first step towards introducing a deterministic mechanism through which + // the different entities responsible for installing flows can notify the agent that + // this deletion operation can take place. i.flowRestoreCompleteWait.Wait() will + // block until some key flows (NetworkPolicy flows, Pod flows, Node route flows) + // have been installed. But not all entities responsible for installing flows + // currently use this wait group, so we block for a minimum of 10 seconds. time.Sleep(10 * time.Second) + i.flowRestoreCompleteWait.Wait() klog.Info("Deleting stale flows from previous round if any") if err := i.ofClient.DeleteStaleFlows(); err != nil { klog.Errorf("Error when deleting stale flows from previous round: %v", err) diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index d65533da3e1..e44e739fd61 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -19,6 +19,7 @@ import ( "fmt" "net" "strings" + "sync" current "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/cni/pkg/version" @@ -404,7 +405,7 @@ func parsePrevResult(conf *types.NetworkConfig) error { return nil } -func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error { +func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait, flowRestoreCompleteWait *wait.Group) error { // desiredPods is the set of Pods that should be present, based on the // current list of Pods got from the Kubernetes API. desiredPods := sets.New[string]() @@ -413,6 +414,8 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // knownInterfaces is the list of interfaces currently in the local cache. knownInterfaces := pc.ifaceStore.GetInterfacesByType(interfacestore.ContainerInterface) + var podWg sync.WaitGroup + for _, pod := range pods { // Skip Pods for which we are not in charge of the networking. if pod.Spec.HostNetwork { @@ -436,7 +439,9 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain missingIfConfigs = append(missingIfConfigs, containerConfig) continue } + podWg.Add(1) go func(containerID, pod, namespace string) { + defer podWg.Done() // Do not install Pod flows until all preconditions are met. podNetworkWait.Wait() // To avoid race condition with CNIServer CNI event handlers. @@ -452,7 +457,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // We rely on the interface cache / store - which is initialized from the persistent // OVSDB - to map the Pod to its interface configuration. The interface // configuration includes the parameters we need to replay the flows. - klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) + klog.InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName) if err := pc.ofClient.InstallPodFlows( containerConfig.InterfaceName, containerConfig.IPs, @@ -473,6 +478,10 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // interface should no longer be in store after the call to removeInterfaces } } + go func() { + defer flowRestoreCompleteWait.Done() + podWg.Wait() + }() if len(missingIfConfigs) > 0 { pc.reconcileMissingPods(missingIfConfigs, containerAccess) } diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index efaa1873c14..bc77dcc7224 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -123,6 +123,8 @@ type CNIServer struct { networkConfig *config.NetworkConfig // podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. podNetworkWait *wait.Group + // flowRestoreCompleteWait will be decremented and Pod reconciliation is completed. + flowRestoreCompleteWait *wait.Group } var supportedCNIVersionSet map[string]bool @@ -630,7 +632,7 @@ func New( routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, networkConfig *config.NetworkConfig, - podNetworkWait *wait.Group, + podNetworkWait, flowRestoreCompleteWait *wait.Group, ) *CNIServer { return &CNIServer{ cniSocket: cniSocket, @@ -646,6 +648,7 @@ func New( enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, networkConfig: networkConfig, podNetworkWait: podNetworkWait, + flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(), } } @@ -748,7 +751,7 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, // installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the // K8s apiserver and replay the necessary flows. func (s *CNIServer) reconcile() error { - klog.InfoS("Reconciliation for CNI server") + klog.InfoS("Starting reconciliation for CNI server") // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from // the watch cache in kube-apiserver. pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ @@ -759,7 +762,7 @@ func (s *CNIServer) reconcile() error { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } - return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait) + return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait, s.flowRestoreCompleteWait) } func init() { diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index b1f0f4b6699..58c10459aa2 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -761,11 +761,12 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[ func newCNIServer(t *testing.T) *CNIServer { cniServer := &CNIServer{ - cniSocket: testSocket, - nodeConfig: testNodeConfig, - serverVersion: cni.AntreaCNIVersion, - containerAccess: newContainerAccessArbitrator(), - podNetworkWait: wait.NewGroup(), + cniSocket: testSocket, + nodeConfig: testNodeConfig, + serverVersion: cni.AntreaCNIVersion, + containerAccess: newContainerAccessArbitrator(), + podNetworkWait: wait.NewGroup(), + flowRestoreCompleteWait: wait.NewGroup(), } cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 559d9b68498..914294a9c3b 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -15,6 +15,7 @@ package noderoute import ( + "context" "fmt" "net" "time" @@ -26,6 +27,7 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache/synctrack" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -41,6 +43,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsctl" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -80,6 +83,12 @@ type Controller struct { // or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate // to be configured before installing routes/flows to peer Nodes to prevent unencrypted traffic across Nodes. ipsecCertificateManager ipseccertificate.Manager + // flowRestoreCompleteWait is to be decremented after installing flows for initial Nodes. + flowRestoreCompleteWait *utilwait.Group + // hasProcessedInitialList keeps track of whether the initial informer list as been + // processed by workers. + // See https://github.com/kubernetes/apiserver/blob/v0.30.1/pkg/admission/plugin/policy/internal/generic/controller.go + hasProcessedInitialList synctrack.AsyncTracker[string] } // NewNodeRouteController instantiates a new Controller object which will process Node events @@ -95,6 +104,7 @@ func NewNodeRouteController( nodeConfig *config.NodeConfig, wireguardClient wireguard.Interface, ipsecCertificateManager ipseccertificate.Manager, + flowRestoreCompleteWait *utilwait.Group, ) *Controller { controller := &Controller{ ovsBridgeClient: ovsBridgeClient, @@ -111,21 +121,25 @@ func NewNodeRouteController( installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}), wireGuardClient: wireguardClient, ipsecCertificateManager: ipsecCertificateManager, + flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(), } - nodeInformer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(cur interface{}) { - controller.enqueueNode(cur) + registration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerDetailedFuncs{ + AddFunc: func(cur interface{}, isInInitialList bool) { + controller.enqueueNode(cur, isInInitialList) }, UpdateFunc: func(old, cur interface{}) { - controller.enqueueNode(cur) + controller.enqueueNode(cur, false) }, DeleteFunc: func(old interface{}) { - controller.enqueueNode(old) + controller.enqueueNode(old, false) }, }, nodeResyncPeriod, ) + // UpstreamHasSynced is used by hasProcessedInitialList to determine whether even handlers + // have been called for the initial list. + controller.hasProcessedInitialList.UpstreamHasSynced = registration.HasSynced return controller } @@ -153,7 +167,7 @@ type nodeRouteInfo struct { // enqueueNode adds an object to the controller work queue // obj could be a *corev1.Node, or a DeletionFinalStateUnknown item. -func (c *Controller) enqueueNode(obj interface{}) { +func (c *Controller) enqueueNode(obj interface{}, isInInitialList bool) { node, isNode := obj.(*corev1.Node) if !isNode { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) @@ -170,6 +184,9 @@ func (c *Controller) enqueueNode(obj interface{}) { // Ignore notifications for this Node, no need to establish connectivity to itself. if node.Name != c.nodeConfig.Name { + if isInInitialList { + c.hasProcessedInitialList.Start(node.Name) + } c.queue.Add(node.Name) } } @@ -327,6 +344,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { // underlying network. Therefore it needs not know the routes to // peer Pod CIDRs. if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { + c.flowRestoreCompleteWait.Done() <-stopCh return } @@ -352,6 +370,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) { for i := 0; i < defaultWorkers; i++ { go wait.Until(c.worker, time.Second, stopCh) } + + go func() { + // When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait. + defer c.flowRestoreCompleteWait.Done() + wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + return c.hasProcessedInitialList.HasSynced(), nil + }) + }() + <-stopCh } @@ -380,14 +407,21 @@ func (c *Controller) processNextWorkItem() bool { defer c.queue.Done(obj) // We expect strings (Node name) to come off the workqueue. - if key, ok := obj.(string); !ok { + key, ok := obj.(string) + if !ok { // As the item in the workqueue is actually invalid, we call Forget here else we'd // go into a loop of attempting to process a work item that is invalid. // This should not happen: enqueueNode only enqueues strings. c.queue.Forget(obj) klog.Errorf("Expected string in work queue but got %#v", obj) return true - } else if err := c.syncNodeRoute(key); err == nil { + } + + // We call Finished unconditionally even if this only matters for the initial list of + // Nodes. There is no harm in calling Finished without a corresponding call to Start. + defer c.hasProcessedInitialList.Finished(key) + + if err := c.syncNodeRoute(key); err == nil { // If no error occurs we Forget this item so it does not get queued again until // another change happens. c.queue.Forget(key) diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index a3f6df54f3f..b48838f87e7 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -41,6 +41,7 @@ import ( ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing" utilip "antrea.io/antrea/pkg/util/ip" + utilwait "antrea.io/antrea/pkg/util/wait" ) var ( @@ -109,7 +110,7 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig, objects .. c := NewNodeRouteController(informerFactory.Core().V1().Nodes(), ofClient, ovsCtlClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{ IPv4: nil, MAC: gatewayMAC, - }}, wireguardClient, ipsecCertificateManager) + }}, wireguardClient, ipsecCertificateManager, utilwait.NewGroup()) return &fakeController{ Controller: c, clientset: clientset, @@ -733,3 +734,21 @@ func TestDeleteNodeRoute(t *testing.T) { }) } } + +func TestInitialListHasSynced(t *testing.T) { + c := newController(t, &config.NetworkConfig{}, node1) + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + + require.Error(t, c.flowRestoreCompleteWait.WaitWithTimeout(100*time.Millisecond)) + + c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.processNextWorkItem() + + assert.True(t, c.hasProcessedInitialList.HasSynced()) +} diff --git a/test/e2e/connectivity_test.go b/test/e2e/connectivity_test.go index 64c69d4d9a8..6c6c33c0ff3 100644 --- a/test/e2e/connectivity_test.go +++ b/test/e2e/connectivity_test.go @@ -60,7 +60,6 @@ func TestConnectivity(t *testing.T) { t.Run("testOVSRestartSameNode", func(t *testing.T) { skipIfNotIPv4Cluster(t) skipIfHasWindowsNodes(t) - t.Skip("Skipping test for now as it fails consistently") testOVSRestartSameNode(t, data, data.testNamespace) }) t.Run("testOVSFlowReplay", func(t *testing.T) { @@ -327,12 +326,7 @@ func testPodConnectivityAfterAntreaRestart(t *testing.T, data *TestData, namespa // testOVSRestartSameNode verifies that datapath flows are not removed when the Antrea Agent Pod is // stopped gracefully (e.g. as part of a RollingUpdate). The test sends ARP requests every 1s and -// checks that there is no packet loss during the restart. This test does not apply to the userspace -// ndetdev datapath, since in this case the datapath functionality is implemented by the -// ovs-vswitchd daemon itself. When ovs-vswitchd restarts, datapath flows are flushed and it may -// take some time for the Agent to replay the flows. This will not impact this test, since we are -// just testing L2 connectivity between 2 Pods on the same Node, and the default behavior of the -// br-int bridge is to implement normal L2 forwarding. +// checks that there is no packet loss during the restart. func testOVSRestartSameNode(t *testing.T, data *TestData, namespace string) { workerNode := workerNodeName(1) t.Logf("Creating two toolbox test Pods on '%s'", workerNode) @@ -364,7 +358,7 @@ func testOVSRestartSameNode(t *testing.T, data *TestData, namespace string) { maxLossRate = 10 } if lossRate > maxLossRate { - t.Logf(stdout) + t.Log(stdout) return fmt.Errorf("arping loss rate is %f%%", lossRate) } return nil diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 290eb3cdbeb..aaf580514c6 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -27,6 +27,7 @@ import ( "strings" "testing" "text/template" + "time" "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" @@ -292,13 +293,14 @@ func ipVersion(ip net.IP) string { } type cmdAddDelTester struct { - server *cniserver.CNIServer - ctx context.Context - testNS ns.NetNS - targetNS ns.NetNS - request *cnimsg.CniCmdRequest - vethName string - podNetworkWait *wait.Group + server *cniserver.CNIServer + ctx context.Context + testNS ns.NetNS + targetNS ns.NetNS + request *cnimsg.CniCmdRequest + vethName string + podNetworkWait *wait.Group + flowRestoreCompleteWait *wait.Group } func (tester *cmdAddDelTester) setNS(testNS ns.NetNS, targetNS ns.NetNS) { @@ -568,13 +570,16 @@ func newTester() *cmdAddDelTester { tester := &cmdAddDelTester{} ifaceStore := interfacestore.NewInterfaceStore() tester.podNetworkWait = wait.NewGroup() + tester.flowRestoreCompleteWait = wait.NewGroup() tester.server = cniserver.New(testSock, "", getTestNodeConfig(false), k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - tester.podNetworkWait.Increment()) + tester.podNetworkWait.Increment(), + tester.flowRestoreCompleteWait, + ) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) ctx := context.Background() tester.ctx = ctx @@ -583,6 +588,7 @@ func newTester() *cmdAddDelTester { func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { testRequire := require.New(tc.t) + testAssert := assert.New(tc.t) testRequire.Equal(cniVersion, tc.CNIVersion) @@ -611,6 +617,9 @@ func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, mock.Any(), mock.Any(), mock.Any(), uint16(0), nil).Return(nil) tester.podNetworkWait.Done() + + testAssert.NoError(tester.flowRestoreCompleteWait.WaitWithTimeout(1 * time.Second)) + // Test ips allocation prevResult, err := tester.cmdAddTest(tc, dataDir) testRequire.Nil(err) @@ -729,13 +738,14 @@ func setupChainTest( if newServer { routeMock = routetest.NewMockInterface(controller) podNetworkWait := wait.NewGroup() + flowRestoreCompleteWait := wait.NewGroup() server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - podNetworkWait) + podNetworkWait, flowRestoreCompleteWait) } else { server = inServer } @@ -916,6 +926,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { routeMock := routetest.NewMockInterface(controller) ifaceStore := interfacestore.NewInterfaceStore() podNetworkWait := wait.NewGroup() + flowRestoreCompleteWait := wait.NewGroup() k8sClient := k8sFake.NewSimpleClientset(pod) server := cniserver.New( testSock, @@ -924,7 +935,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { k8sClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - podNetworkWait, + podNetworkWait, flowRestoreCompleteWait, ) // call Initialize, which will run reconciliation and perform host-local IPAM garbage collection