From e6c44e1347bae571ec07682959bf0c8b023f35b0 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Fri, 25 Feb 2022 15:29:26 +0800 Subject: [PATCH] Realize Egress for a Pod once its network is created Previously antrea-controller included a Pod in an EgressGroup only when its IP has been presented in K8s API. If a Pod tries to access external right after it's up, Node IP will be used as the SNAT IP even when an Egress applying to it has been created because its Pod IP may haven't been reported to K8s API or antrea-controller may haven't included the Pod in the EgressGroup. This patch fixes it by making CNIServer notify EgressController that it has processed CNI ADD request of a Pod, then EgressController can reconcile the corresponding Egress immediately, instead of waiting for the Pod to be reported to K8s API. As NetworkPolicyController relies on that event as well, we introduce a channel implementation which supports multiple subscribers. Fixes #3361 Signed-off-by: Quan Tian --- cmd/antrea-agent/agent.go | 20 ++-- pkg/agent/cniserver/pod_configuration.go | 31 +++-- .../cniserver/pod_configuration_windows.go | 6 +- pkg/agent/cniserver/server.go | 6 +- pkg/agent/cniserver/server_test.go | 8 +- .../controller/egress/egress_controller.go | 19 ++- .../egress/egress_controller_test.go | 51 ++++++++ pkg/agent/controller/networkpolicy/cache.go | 47 ++++---- .../controller/networkpolicy/cache_test.go | 26 +++-- .../networkpolicy/networkpolicy_controller.go | 5 +- .../networkpolicy_controller_test.go | 5 +- .../networkpolicy/status_controller_test.go | 4 +- pkg/agent/types/networkpolicy.go | 8 -- pkg/controller/egress/controller.go | 7 +- pkg/controller/egress/controller_test.go | 6 +- pkg/util/channel/channel.go | 95 +++++++++++++++ pkg/util/channel/channel_test.go | 109 ++++++++++++++++++ test/e2e/egress_test.go | 32 ++++- test/integration/agent/cniserver_test.go | 6 +- 19 files changed, 383 insertions(+), 108 deletions(-) create mode 100644 pkg/util/channel/channel.go create mode 100644 pkg/util/channel/channel_test.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index ed2d18d0274..9607f61588b 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -54,7 +54,6 @@ import ( "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/secondarynetwork/podwatch" "antrea.io/antrea/pkg/agent/stats" - "antrea.io/antrea/pkg/agent/types" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/features" @@ -63,6 +62,7 @@ import ( ofconfig "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/signals" + "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/cipher" "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/version" @@ -261,10 +261,10 @@ func run(o *Options) error { } } - // entityUpdates is a channel for receiving entity updates from CNIServer and - // notifying NetworkPolicyController to reconcile rules related to the - // updated entities. - entityUpdates := make(chan types.EntityReference, 100) + // podUpdateChannel is a channel for receiving Pod updates from CNIServer and + // notifying NetworkPolicyController and EgressController to reconcile rules + // related to the updated Pods. + podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) // We set flow poll interval as the time interval for rule deletion in the async // rule cache, which is implemented as part of the idAllocator. This is to preserve // the rule info for populating NetworkPolicy fields in the Flow Exporter even @@ -282,7 +282,7 @@ func run(o *Options) error { ofClient, ifaceStore, nodeConfig.Name, - entityUpdates, + podUpdateChannel, groupCounters, groupIDUpdates, antreaPolicyEnabled, @@ -333,7 +333,7 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.Egress) { egressController, err = egress.NewEgressController( ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeTransportIP, - memberlistCluster, egressInformer, nodeInformer, localIPDetector, + memberlistCluster, egressInformer, podUpdateChannel, localIPDetector, ) if err != nil { return fmt.Errorf("error creating new Egress controller: %v", err) @@ -372,12 +372,12 @@ func run(o *Options) error { var cniPodInfoStore cnipodcache.CNIPodInfoStore if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { cniPodInfoStore = cnipodcache.NewCNIPodInfoStore() - err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, entityUpdates, cniPodInfoStore) + err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, cniPodInfoStore) if err != nil { return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err) } } else { - err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, entityUpdates, nil) + err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, nil) if err != nil { return fmt.Errorf("error initializing CNI server: %v", err) } @@ -495,6 +495,8 @@ func run(o *Options) error { log.StartLogFileNumberMonitor(stopCh) + go podUpdateChannel.Run(stopCh) + go routeClient.Run(stopCh) go cniServer.Run(stopCh) diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 930dae05f17..25930b0220c 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -30,10 +30,9 @@ import ( "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" - "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" - "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" ) @@ -63,9 +62,9 @@ type podConfigurator struct { ifaceStore interfacestore.InterfaceStore gatewayMAC net.HardwareAddr ifConfigurator *ifConfigurator - // entityUpdates is a channel for notifying updates of local endpoints / entities (most notably Pod) - // to other components which may benefit from this information, i.e NetworkPolicyController. - entityUpdates chan<- types.EntityReference + // podUpdateNotifier is used for notifying updates of local Pods to other components which may benefit from this + // information, i.e. NetworkPolicyController, EgressController. + podUpdateNotifier channel.Notifier // consumed by secondary network creation. podInfoStore cnipodcache.CNIPodInfoStore } @@ -78,7 +77,7 @@ func newPodConfigurator( gatewayMAC net.HardwareAddr, ovsDatapathType ovsconfig.OVSDatapathType, isOvsHardwareOffloadEnabled bool, - entityUpdates chan<- types.EntityReference, + podUpdateNotifier channel.Notifier, podInfoStore cnipodcache.CNIPodInfoStore, ) (*podConfigurator, error) { ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled) @@ -86,14 +85,14 @@ func newPodConfigurator( return nil, err } return &podConfigurator{ - ovsBridgeClient: ovsBridgeClient, - ofClient: ofClient, - routeClient: routeClient, - ifaceStore: ifaceStore, - gatewayMAC: gatewayMAC, - ifConfigurator: ifConfigurator, - entityUpdates: entityUpdates, - podInfoStore: podInfoStore, + ovsBridgeClient: ovsBridgeClient, + ofClient: ofClient, + routeClient: routeClient, + ifaceStore: ifaceStore, + gatewayMAC: gatewayMAC, + ifConfigurator: ifConfigurator, + podUpdateNotifier: podUpdateNotifier, + podInfoStore: podInfoStore, }, nil } @@ -486,9 +485,7 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta // Add containerConfig into local cache pc.ifaceStore.AddInterface(containerConfig) // Notify the Pod update event to required components. - pc.entityUpdates <- types.EntityReference{ - Pod: &v1beta2.PodReference{Name: containerConfig.PodName, Namespace: containerConfig.PodNamespace}, - } + pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName)) return nil } diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index d42578e1ec4..6f61e241dcb 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -24,9 +24,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/interfacestore" - "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" - "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/k8s" ) @@ -51,9 +49,7 @@ func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.I // Update interface config with the ofPort. ifConfig.OVSPortConfig.OFPort = ofPort // Notify the Pod update event to required components. - pc.entityUpdates <- types.EntityReference{ - Pod: &v1beta2.PodReference{Name: ifConfig.PodName, Namespace: ifConfig.PodNamespace}, - } + pc.podUpdateNotifier.Notify(k8s.NamespacedName(ifConfig.PodNamespace, ifConfig.PodName)) return nil }) } diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 6059ebd0e3b..82c9c8d0dac 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -40,11 +40,11 @@ import ( "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" - "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" ) const ( @@ -589,7 +589,7 @@ func (s *CNIServer) Initialize( ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, - entityUpdates chan<- types.EntityReference, + podUpdateNotifier channel.Notifier, podInfoStore cnipodcache.CNIPodInfoStore, ) error { var err error @@ -602,7 +602,7 @@ func (s *CNIServer) Initialize( s.podConfigurator, err = newPodConfigurator( ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, - ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), entityUpdates, + ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), podUpdateNotifier, podInfoStore, ) if err != nil { diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index bf4cd8a975e..afbbd2119fe 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -42,12 +42,12 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" - antreatypes "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/channel" ) const ( @@ -400,7 +400,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, make(chan antreatypes.EntityReference, 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, channel.NewSubscribableChannel("PodUpdate", 100), nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, k8sPodArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -411,7 +411,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "0000:03:00.6" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, make(chan antreatypes.EntityReference, 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, channel.NewSubscribableChannel("PodUpdate", 100), nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, k8sPodArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -531,7 +531,7 @@ func TestRemoveInterface(t *testing.T) { ifaceStore := interfacestore.NewInterfaceStore() routeMock := routetest.NewMockInterface(controller) gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, routeMock, ifaceStore, gwMAC, "system", false, make(chan antreatypes.EntityReference, 100), nil) + podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, routeMock, ifaceStore, gwMAC, "system", false, channel.NewSubscribableChannel("PodUpdate", 100), nil) require.Nil(t, err, "No error expected in podConfigurator constructor") containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index 29f317030a2..c3cca040b20 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" @@ -48,6 +47,7 @@ import ( crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2" crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" "antrea.io/antrea/pkg/controller/metrics" + "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" ) @@ -154,7 +154,7 @@ func NewEgressController( nodeTransportIP net.IP, cluster *memberlist.Cluster, egressInformer crdinformers.EgressInformer, - nodeInformer coreinformers.NodeInformer, + podUpdateSubscriber channel.Subscriber, localIPDetector ipassigner.LocalIPDetector, ) (*EgressController, error) { c := &EgressController{ @@ -208,11 +208,26 @@ func NewEgressController( }, resyncPeriod, ) + // Subscribe Pod update events from CNIServer to enforce Egress earlier, instead of waiting for their IPs are + // reported to kube-apiserver and processed by antrea-controller. + podUpdateSubscriber.Subscribe(c.processPodUpdate) c.localIPDetector.AddEventHandler(c.onLocalIPUpdate) c.cluster.AddClusterEventHandler(c.enqueueEgressesByExternalIPPool) return c, nil } +// processPodUpdate will be called when CNIServer publishes a Pod update event. +// It triggers reconciling the effective Egress of the Pod. +func (c *EgressController) processPodUpdate(pod string) { + c.egressBindingsMutex.Lock() + defer c.egressBindingsMutex.Unlock() + binding, exists := c.egressBindings[pod] + if !exists { + return + } + c.queue.Add(binding.effectiveEgress) +} + // addEgress processes Egress ADD events. func (c *EgressController) addEgress(obj interface{}) { egress := obj.(*crdv1a2.Egress) diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index 5d43a42275e..9c448dd1cf6 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -44,6 +44,7 @@ import ( "antrea.io/antrea/pkg/client/clientset/versioned" fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" ) @@ -92,6 +93,7 @@ type fakeController struct { crdClient *fakeversioned.Clientset crdInformerFactory crdinformers.SharedInformerFactory mockIPAssigner *ipassignertest.MockIPAssigner + podUpdateChannel *channel.SubscribableChannel } func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController { @@ -114,6 +116,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll addPodInterface(ifaceStore, "ns3", "pod3", 3) addPodInterface(ifaceStore, "ns4", "pod4", 4) + podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) + egressController := &EgressController{ ofClient: mockOFClient, routeClient: mockRouteClient, @@ -133,6 +137,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll egressIPStates: map[string]*egressIPState{}, ipAssigner: mockIPAssigner, } + podUpdateChannel.Subscribe(egressController.processPodUpdate) return &fakeController{ EgressController: egressController, mockController: controller, @@ -141,6 +146,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll crdClient: crdClient, crdInformerFactory: crdInformerFactory, mockIPAssigner: mockIPAssigner, + podUpdateChannel: podUpdateChannel, } } @@ -541,6 +547,51 @@ func TestSyncEgress(t *testing.T) { } } +func TestPodUpdateShouldSyncEgress(t *testing.T) { + egress := &crdv1a2.Egress{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + Spec: crdv1a2.EgressSpec{EgressIP: fakeLocalEgressIP1}, + } + egressGroup := &cpv1b2.EgressGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, + GroupMembers: []cpv1b2.GroupMember{ + {Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}}, + {Pod: &cpv1b2.PodReference{Name: "pendingPod", Namespace: "ns1"}}, + }, + } + c := newFakeController(t, []runtime.Object{egress}) + defer c.mockController.Finish() + stopCh := make(chan struct{}) + defer close(stopCh) + go c.podUpdateChannel.Run(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + c.mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + c.mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1)) + c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + c.addEgressGroup(egressGroup) + require.Equal(t, 1, c.queue.Len()) + item, _ := c.queue.Get() + require.Equal(t, egress.Name, item) + require.NoError(t, c.syncEgress(item.(string))) + c.queue.Done(item) + + c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(10), net.ParseIP(fakeLocalEgressIP1), uint32(1)) + c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) + // Mock CNIServer + addPodInterface(c.ifaceStore, "ns1", "pendingPod", 10) + c.podUpdateChannel.Notify("ns1/pendingPod") + require.NoError(t, wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) { + return c.queue.Len() == 1, nil + })) + item, _ = c.queue.Get() + require.Equal(t, egress.Name, item) + require.NoError(t, c.syncEgress(item.(string))) + c.queue.Done(item) +} + func TestSyncOverlappingEgress(t *testing.T) { egress1 := &crdv1a2.Egress{ ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index 8d3ad10b9a4..1adc8866423 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -28,10 +28,10 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/metrics" - antreatypes "antrea.io/antrea/pkg/agent/types" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/querier" + "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" ) @@ -156,9 +156,6 @@ type ruleCache struct { // dirtyRuleHandler is a callback that is run upon finding a rule out-of-sync. dirtyRuleHandler func(string) - // entityUpdates is a channel for receiving entity (e.g. Pod) updates from CNIServer. - entityUpdates <-chan antreatypes.EntityReference - // groupIDUpdates is a channel for receiving groupID for Service is assigned // or released events from groupCounters. groupIDUpdates <-chan string @@ -337,7 +334,7 @@ func toServicesIndexFunc(obj interface{}) ([]string, error) { } // newRuleCache returns a new *ruleCache. -func newRuleCache(dirtyRuleHandler func(string), podUpdate <-chan antreatypes.EntityReference, serviceGroupIDUpdate <-chan string) *ruleCache { +func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, serviceGroupIDUpdate <-chan string) *ruleCache { rules := cache.NewIndexer( ruleKeyFunc, cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc, toServicesIndex: toServicesIndexFunc}, @@ -348,38 +345,34 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdate <-chan antreatypes.En policyMap: make(map[string]*v1beta.NetworkPolicy), rules: rules, dirtyRuleHandler: dirtyRuleHandler, - entityUpdates: podUpdate, groupIDUpdates: serviceGroupIDUpdate, } - go cache.processEntityUpdates() + // Subscribe Pod update events from CNIServer. + podUpdateSubscriber.Subscribe(cache.processPodUpdate) go cache.processGroupIDUpdates() return cache } -// processEntityUpdates is an infinite loop that takes entity (e.g. Pod) -// update events from the channel, finds out AppliedToGroups that contains -// this Pod and trigger reconciling of related rules. +// processPodUpdate will be called when CNIServer publishes a Pod update event. +// It finds out AppliedToGroups that contains this Pod and trigger reconciling +// of related rules. // It can enforce NetworkPolicies to newly added Pods right after CNI ADD is // done if antrea-controller has computed the Pods' policies and propagated // them to this Node by their labels and NodeName, instead of waiting for their // IPs are reported to kube-apiserver and processed by antrea-controller. -func (c *ruleCache) processEntityUpdates() { - for { - select { - case entity := <-c.entityUpdates: - func() { - member := &v1beta.GroupMember{ - Pod: entity.Pod, - ExternalEntity: entity.ExternalEntity, - } - c.appliedToSetLock.RLock() - defer c.appliedToSetLock.RUnlock() - for group, memberSet := range c.appliedToSetByGroup { - if memberSet.Has(member) { - c.onAppliedToGroupUpdate(group) - } - } - }() +func (c *ruleCache) processPodUpdate(pod string) { + namespace, name := k8s.SplitNamespacedName(pod) + member := &v1beta.GroupMember{ + Pod: &v1beta.PodReference{ + Name: name, + Namespace: namespace, + }, + } + c.appliedToSetLock.RLock() + defer c.appliedToSetLock.RUnlock() + for group, memberSet := range c.appliedToSetByGroup { + if memberSet.Has(member) { + c.onAppliedToGroupUpdate(group) } } } diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 7f47d9fc802..c57b894c315 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -25,8 +25,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" + "antrea.io/antrea/pkg/util/channel" ) var ( @@ -260,12 +260,12 @@ func TestRuleCacheAddAddressGroup(t *testing.T) { } } -func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, chan types.EntityReference) { +func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, *channel.SubscribableChannel) { recorder := newDirtyRuleRecorder() - ch := make(chan types.EntityReference, 100) + podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) ch2 := make(chan string, 100) - c := newRuleCache(recorder.Record, ch, ch2) - return c, recorder, ch + c := newRuleCache(recorder.Record, podUpdateChannel, ch2) + return c, recorder, podUpdateChannel } func TestRuleCacheReplaceAppliedToGroups(t *testing.T) { @@ -1134,21 +1134,21 @@ func TestRuleCacheProcessPodUpdates(t *testing.T) { name string rules []*rule podSetByGroup map[string]v1beta2.GroupMemberSet - podUpdate types.EntityReference + podUpdate string expectedDirtyRules sets.String }{ { "non-matching-group", nil, nil, - types.EntityReference{Pod: &v1beta2.PodReference{Name: "foo", Namespace: "bar"}}, + "bar/foo", sets.NewString(), }, { "matching-one-group-affecting-one-rule", []*rule{rule1, rule2}, map[string]v1beta2.GroupMemberSet{"group2": v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1"))}, - types.EntityReference{Pod: &v1beta2.PodReference{Name: "pod1", Namespace: "ns1"}}, + "ns1/pod1", sets.NewString("rule2"), }, { @@ -1158,19 +1158,21 @@ func TestRuleCacheProcessPodUpdates(t *testing.T) { "group1": v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1")), "group2": v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1")), }, - types.EntityReference{Pod: &v1beta2.PodReference{Name: "pod1", Namespace: "ns1"}}, + "ns1/pod1", sets.NewString("rule1", "rule2"), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c, recorder, ch := newFakeRuleCache() + c, recorder, podUpdateNotifier := newFakeRuleCache() c.appliedToSetByGroup = tt.podSetByGroup for _, rule := range tt.rules { c.rules.Add(rule) } - ch <- tt.podUpdate - + stopCh := make(chan struct{}) + defer close(stopCh) + go podUpdateNotifier.Run(stopCh) + podUpdateNotifier.Notify(tt.podUpdate) func() { // Drain the channel with 10 ms timeout so we can know it's done. for { diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 2bb74f56292..4d3902bf217 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -37,6 +37,7 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/querier" + "antrea.io/antrea/pkg/util/channel" ) const ( @@ -112,7 +113,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, nodeName string, - entityUpdates <-chan types.EntityReference, + podUpdateSubscriber channel.Subscriber, groupCounters []proxytypes.GroupCounter, groupIDUpdates <-chan string, antreaPolicyEnabled bool, @@ -141,7 +142,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } } c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters) - c.ruleCache = newRuleCache(c.enqueueRule, entityUpdates, groupIDUpdates) + c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, groupIDUpdates) if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 545fb338ece..6b37d4ab2b6 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -38,6 +38,7 @@ import ( "antrea.io/antrea/pkg/client/clientset/versioned" "antrea.io/antrea/pkg/client/clientset/versioned/fake" "antrea.io/antrea/pkg/querier" + "antrea.io/antrea/pkg/util/channel" ) const testNamespace = "ns1" @@ -52,10 +53,10 @@ func (g *antreaClientGetter) GetAntreaClient() (versioned.Interface, error) { func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { clientset := &fake.Clientset{} - ch := make(chan agenttypes.EntityReference, 100) + podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) ch2 := make(chan string, 100) groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(false, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, groupCounters, ch2, + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, groupCounters, ch2, true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53") reconciler := newMockReconciler() controller.reconciler = reconciler diff --git a/pkg/agent/controller/networkpolicy/status_controller_test.go b/pkg/agent/controller/networkpolicy/status_controller_test.go index 37c9173fa4c..d719e8baa10 100644 --- a/pkg/agent/controller/networkpolicy/status_controller_test.go +++ b/pkg/agent/controller/networkpolicy/status_controller_test.go @@ -24,8 +24,8 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" + "antrea.io/antrea/pkg/util/channel" ) const ( @@ -51,7 +51,7 @@ func (c *fakeNetworkPolicyControl) getNetworkPolicyStatus() *v1beta2.NetworkPoli } func newTestStatusController() (*StatusController, *ruleCache, *fakeNetworkPolicyControl) { - ruleCache := newRuleCache(func(s string) {}, make(<-chan types.EntityReference), make(chan string, 100)) + ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), make(chan string, 100)) statusControl := &fakeNetworkPolicyControl{} statusController := newStatusController(nil, testNode1, ruleCache) statusController.statusControlInterface = statusControl diff --git a/pkg/agent/types/networkpolicy.go b/pkg/agent/types/networkpolicy.go index 38d6e81d3fc..4bac731c221 100644 --- a/pkg/agent/types/networkpolicy.go +++ b/pkg/agent/types/networkpolicy.go @@ -149,11 +149,3 @@ type BitRange struct { Value uint16 Mask *uint16 } - -// EntityReference represents a reference to either a Pod or an ExternalEntity. -type EntityReference struct { - // Pod maintains the reference to the Pod. - Pod *v1beta2.PodReference - // ExternalEntity maintains the reference to the ExternalEntity. - ExternalEntity *v1beta2.ExternalEntityReference -} diff --git a/pkg/controller/egress/controller.go b/pkg/controller/egress/controller.go index d7df808c0a2..0cd162ffe08 100644 --- a/pkg/controller/egress/controller.go +++ b/pkg/controller/egress/controller.go @@ -358,12 +358,7 @@ func (c *EgressController) syncEgress(key string) error { for _, pod := range pods { // Ignore Pod if it's not scheduled or not running. And Egress does not support HostNetwork Pods, so also ignore // Pod if it's HostNetwork Pod. - // TODO: If a Pod is scheduled but not running, it can be included in the EgressGroup so that the agent can - // install its SNAT rule right after the Pod's CNI request is processed, which just requires a notification from - // CNIServer to the agent's EgressController. However the current notification mechanism (the entityUpdate - // channel) allows only single consumer. Once it allows multiple consumers, we can change the condition to - // include scheduled Pods that have no IPs. - if pod.Spec.NodeName == "" || len(pod.Status.PodIPs) == 0 || pod.Spec.HostNetwork { + if pod.Spec.NodeName == "" || pod.Spec.HostNetwork { continue } podNum++ diff --git a/pkg/controller/egress/controller_test.go b/pkg/controller/egress/controller_test.go index 025e3af3758..d5880ced740 100644 --- a/pkg/controller/egress/controller_test.go +++ b/pkg/controller/egress/controller_test.go @@ -55,7 +55,7 @@ var ( podBar1 = newPod("default", "podBar1", map[string]string{"app": "bar"}, node1, "1.1.1.2", false) podFoo1InOtherNamespace = newPod("other", "podFoo1", map[string]string{"app": "foo"}, node1, "1.1.1.3", false) podUnscheduled = newPod("default", "podUnscheduled", map[string]string{"app": "foo"}, "", "", false) - podNonIP = newPod("default", "podNonIP", map[string]string{"app": "foo"}, "node1", "", false) + podNonIP = newPod("default", "podNonIP", map[string]string{"app": "foo"}, node1, "", false) podWithHostNetwork = newPod("default", "podHostNetwork", map[string]string{"app": "bar"}, node1, "172.16.100.1", true) // Fake Namespaces nsDefault = newNamespace("default", map[string]string{"company": "default"}) @@ -188,6 +188,7 @@ func TestAddEgress(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"}, GroupMembers: []controlplane.GroupMember{ {Pod: &controlplane.PodReference{Name: podFoo1.Name, Namespace: podFoo1.Namespace}}, + {Pod: &controlplane.PodReference{Name: podNonIP.Name, Namespace: podNonIP.Namespace}}, }, }, node2: { @@ -219,6 +220,7 @@ func TestAddEgress(t *testing.T) { GroupMembers: []controlplane.GroupMember{ {Pod: &controlplane.PodReference{Name: podFoo1.Name, Namespace: podFoo1.Namespace}}, {Pod: &controlplane.PodReference{Name: podBar1.Name, Namespace: podBar1.Namespace}}, + {Pod: &controlplane.PodReference{Name: podNonIP.Name, Namespace: podNonIP.Namespace}}, }, }, node2: { @@ -250,6 +252,7 @@ func TestAddEgress(t *testing.T) { GroupMembers: []controlplane.GroupMember{ {Pod: &controlplane.PodReference{Name: podFoo1.Name, Namespace: podFoo1.Namespace}}, {Pod: &controlplane.PodReference{Name: podFoo1InOtherNamespace.Name, Namespace: podFoo1InOtherNamespace.Namespace}}, + {Pod: &controlplane.PodReference{Name: podNonIP.Name, Namespace: podNonIP.Namespace}}, }, }, node2: { @@ -282,6 +285,7 @@ func TestAddEgress(t *testing.T) { GroupMembers: []controlplane.GroupMember{ {Pod: &controlplane.PodReference{Name: podFoo1.Name, Namespace: podFoo1.Namespace}}, {Pod: &controlplane.PodReference{Name: podFoo1InOtherNamespace.Name, Namespace: podFoo1InOtherNamespace.Namespace}}, + {Pod: &controlplane.PodReference{Name: podNonIP.Name, Namespace: podNonIP.Namespace}}, }, }, node2: { diff --git a/pkg/util/channel/channel.go b/pkg/util/channel/channel.go new file mode 100644 index 00000000000..b10e9b4d4f3 --- /dev/null +++ b/pkg/util/channel/channel.go @@ -0,0 +1,95 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package channel + +import ( + "time" + + "k8s.io/klog/v2" +) + +const ( + // notifyTimeout is the timeout for failing to publish an event to the channel. + notifyTimeout = time.Second +) + +type eventHandler func(string) + +type Subscriber interface { + // Subscribe registers an eventHandler which will be called when an event is sent to the channel. + // It's not thread-safe and it's supposed to be called serially before first event is published. + // The eventHandler is supposed to execute quickly and not perform blocking operation. Blocking operation should be + // deferred to a routine that is triggered by the eventHandler. + Subscribe(h eventHandler) +} + +type Notifier interface { + // Notify sends an event to the channel. + Notify(string) bool +} + +// SubscribableChannel is different from the Go channel which dispatches every event to only single consumer regardless +// of the number of consumers. Instead, it dispatches every event to all consumers by calling the eventHandlers they +// have registered. +type SubscribableChannel struct { + // The name of the channel, used for logging purpose to differentiate multiple channels. + name string + // eventCh is the channel used for buffering the pending events. + eventCh chan string + // handlers is a slice of callbacks registered by consumers. + handlers []eventHandler +} + +func NewSubscribableChannel(name string, bufferSize int) *SubscribableChannel { + n := &SubscribableChannel{ + name: name, + eventCh: make(chan string, bufferSize), + } + return n +} + +func (n *SubscribableChannel) Subscribe(h eventHandler) { + n.handlers = append(n.handlers, h) +} + +func (n *SubscribableChannel) Notify(e string) bool { + timer := time.NewTimer(notifyTimeout) + defer timer.Stop() + select { + case n.eventCh <- e: + return true + case <-timer.C: + // This shouldn't happen as we expect handlers to execute quickly and eventCh can buffer some messages. + // If the error is ever seen, either the buffer is too small, or some handlers have improper workload blocking + // the event consumption. + klog.ErrorS(nil, "Failed to send event to channel, will discard it", "name", n.name, "event", e) + return false + } +} + +func (n *SubscribableChannel) Run(stopCh <-chan struct{}) { + klog.InfoS("Starting SubscribableChannel", "name", n.name) + for { + select { + case <-stopCh: + klog.InfoS("Stopping SubscribableChannel", "name", n.name) + return + case obj := <-n.eventCh: + for _, h := range n.handlers { + h(obj) + } + } + } +} diff --git a/pkg/util/channel/channel_test.go b/pkg/util/channel/channel_test.go new file mode 100644 index 00000000000..2eee503f71f --- /dev/null +++ b/pkg/util/channel/channel_test.go @@ -0,0 +1,109 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package channel + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" +) + +type eventReceiver struct { + receivedEvents sets.String + mutex sync.RWMutex +} + +func newEventReceiver() *eventReceiver { + return &eventReceiver{ + receivedEvents: sets.NewString(), + } +} + +func (r *eventReceiver) receive(e string) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.receivedEvents.Insert(e) +} + +func (r *eventReceiver) received() sets.String { + r.mutex.RLock() + defer r.mutex.RUnlock() + // Return a copy to prevent race condition + return r.receivedEvents.Union(nil) +} + +func TestSubscribe(t *testing.T) { + c := NewSubscribableChannel("foo", 100) + stopCh := make(chan struct{}) + defer close(stopCh) + go c.Run(stopCh) + + var eventReceivers []*eventReceiver + for i := 0; i < 100; i++ { + receiver := newEventReceiver() + c.Subscribe(receiver.receive) + eventReceivers = append(eventReceivers, receiver) + } + + desiredEvents := sets.NewString() + for i := 0; i < 1000; i++ { + e := fmt.Sprintf("event-%d", i) + c.Notify(e) + desiredEvents.Insert(e) + } + + var errReceiver int + var errReceivedEvents sets.String + assert.NoError(t, wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (done bool, err error) { + for i, r := range eventReceivers { + receivedEvents := r.received() + if !receivedEvents.Equal(desiredEvents) { + errReceiver = i + errReceivedEvents = receivedEvents + return false, nil + } + } + return true, nil + }), "Receiver %d failed to receive all events, expected %d events, got %d events", errReceiver, len(desiredEvents), len(errReceivedEvents)) +} + +func TestNotify(t *testing.T) { + bufferSize := 100 + c := NewSubscribableChannel("foo", bufferSize) + stopCh := make(chan struct{}) + defer close(stopCh) + // Do not run the channel so first N events should be published successfully and later events should fail. + for i := 0; i < bufferSize; i++ { + e := fmt.Sprintf("event-%d", i) + assert.True(t, c.Notify(e), "Failed to publish event when it doesn't exceed the buffer's capacity") + } + + notifyRes := make(chan bool) + defer close(notifyRes) + go func() { + notifyRes <- c.Notify("foo") + }() + select { + case res := <-notifyRes: + assert.False(t, res) + case <-time.After(notifyTimeout + time.Second): + t.Errorf("Notify() didn't return in time") + } +} diff --git a/test/e2e/egress_test.go b/test/e2e/egress_test.go index 403f0c4d64e..163bcb434ee 100644 --- a/test/e2e/egress_test.go +++ b/test/e2e/egress_test.go @@ -161,12 +161,13 @@ ip netns exec %[1]s /agnhost netexec t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", remotePod) } + serverIPStr := tt.serverIP + if utilnet.IsIPv6String(tt.localIP0) { + serverIPStr = fmt.Sprintf("[%s]", tt.serverIP) + } + // getClientIP gets the translated client IP by accessing the API that replies the request's client IP. getClientIP := func(pod string) (string, string, error) { - serverIPStr := tt.serverIP - if utilnet.IsIPv6String(tt.localIP0) { - serverIPStr = fmt.Sprintf("[%s]", tt.serverIP) - } cmd := []string{"wget", "-T", "3", "-O", "-", fmt.Sprintf("%s:8080/clientip", serverIPStr)} return data.runCommandFromPod(testNamespace, pod, busyboxContainerName, cmd) } @@ -217,7 +218,7 @@ ip netns exec %[1]s /agnhost netexec assertClientIP(localPod, tt.localIP0, tt.localIP1) assertConnError(remotePod) - t.Logf("Creating an Egress applying to both Pods") + t.Logf("Creating an Egress applying to all e2e Pods") matchExpressions := []metav1.LabelSelectorRequirement{ { Key: "antrea-e2e", @@ -239,6 +240,27 @@ ip netns exec %[1]s /agnhost netexec }) assert.NoError(t, err, "Egress failed to reach expected status") + t.Log("Checking the client IP of a Pod whose Egress has been created in advance") + initialIPChecker := "initial-ip-checker" + clientIPStr := egress.Spec.EgressIP + if utilnet.IsIPv6String(clientIPStr) { + clientIPStr = fmt.Sprintf("[%s]", clientIPStr) + } + cmd = fmt.Sprintf("wget -T 3 -O - %s:8080/clientip | grep %s:", serverIPStr, clientIPStr) + if err := data.createPodOnNode(initialIPChecker, testNamespace, egressNode, busyboxImage, []string{"sh", "-c", cmd}, nil, nil, nil, false, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + }); err != nil { + t.Fatalf("Failed to create Pod initial-ip-checker: %v", err) + } + defer data.deletePod(testNamespace, initialIPChecker) + _, err = data.podWaitFor(timeout, initialIPChecker, testNamespace, func(pod *v1.Pod) (bool, error) { + if pod.Status.Phase == v1.PodFailed { + return false, fmt.Errorf("Pod terminated with failure") + } + return pod.Status.Phase == v1.PodSucceeded, nil + }) + assert.NoError(t, err, "Failed to get expected client IP %s for Pod initial-ip-checker", initialIPChecker) + t.Log("Updating the Egress's AppliedTo to remotePod only") egress.Spec.AppliedTo = v1alpha2.AppliedTo{ PodSelector: &metav1.LabelSelector{ diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 7117f7cb4d6..28ef31969d4 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -53,11 +53,11 @@ import ( "antrea.io/antrea/pkg/agent/metrics" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" - antreatypes "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" cnimsg "antrea.io/antrea/pkg/apis/cni/v1beta1" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/channel" ) const ( @@ -575,7 +575,7 @@ func newTester() *cmdAddDelTester { false, routeMock, tester.networkReadyCh) - tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, make(chan antreatypes.EntityReference, 100), nil) + tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) ctx := context.Background() tester.ctx = ctx return tester @@ -799,7 +799,7 @@ func TestCNIServerChaining(t *testing.T) { ifaceStore := interfacestore.NewInterfaceStore() ovsServiceMock.EXPECT().IsHardwareOffloadEnabled().Return(false).AnyTimes() ovsServiceMock.EXPECT().GetOVSDatapathType().Return(ovsconfig.OVSDatapathSystem).AnyTimes() - err = server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, make(chan antreatypes.EntityReference, 100), nil) + err = server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) testRequire.Nil(err) }