From 0266c6ca854f0e4fdf77cfe0ab070684b4f0893a Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Tue, 27 Jun 2023 11:24:31 -0700 Subject: [PATCH] Add Pod store for Flow Exporter and Flow Aggregator 1. Add Pod store which can be used to store current and previous Pods information. 2. Modify unit test Signed-off-by: Yun-Tang Hsu --- cmd/antrea-agent/agent.go | 13 +- cmd/flow-aggregator/flow-aggregator.go | 4 +- hack/update-codegen-dockerized.sh | 1 + .../flowexporter/connections/connections.go | 28 +- .../connections/connections_test.go | 13 +- .../connections/conntrack_connections.go | 6 +- .../conntrack_connections_perf_test.go | 23 +- .../connections/conntrack_connections_test.go | 57 +- .../connections/deny_connections.go | 6 +- .../connections/deny_connections_test.go | 10 +- pkg/agent/flowexporter/exporter/exporter.go | 11 +- pkg/flowaggregator/flowaggregator.go | 91 ++- pkg/flowaggregator/flowaggregator_test.go | 135 ++--- pkg/util/podstore/podstore.go | 232 ++++++++ pkg/util/podstore/podstore_test.go | 546 ++++++++++++++++++ pkg/util/podstore/testing/mock_podstore.go | 77 +++ test/integration/agent/flowexporter_test.go | 62 +- 17 files changed, 1057 insertions(+), 258 deletions(-) create mode 100644 pkg/util/podstore/podstore.go create mode 100644 pkg/util/podstore/podstore_test.go create mode 100644 pkg/util/podstore/testing/mock_podstore.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 0c6b5c3945e..b6b8ad0924e 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -77,6 +77,7 @@ import ( "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" + "antrea.io/antrea/pkg/util/podstore" "antrea.io/antrea/pkg/version" ) @@ -139,6 +140,7 @@ func run(o *Options) error { l7NetworkPolicyEnabled := features.DefaultFeatureGate.Enabled(features.L7NetworkPolicy) enableMulticlusterGW := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableGateway enableMulticlusterNP := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableStretchedNetworkPolicy + enableFLowExporter := features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable nodeIPTracker := nodeip.NewTracker(nodeInformer) // Bridging mode will connect the uplink interface to the OVS bridge. @@ -156,7 +158,7 @@ func run(o *Options) error { features.DefaultFeatureGate.Enabled(features.AntreaPolicy), l7NetworkPolicyEnabled, o.enableEgress, - features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable, + enableFLowExporter, o.config.AntreaProxy.ProxyAll, features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR), connectUplinkToBridge, @@ -317,7 +319,7 @@ func run(o *Options) error { // Initialize localPodInformer for NPLAgent, AntreaIPAMController, // StretchedNetworkPolicyController, and secondary network controller. var localPodInformer cache.SharedIndexInformer - if enableNodePortLocal || enableBridgingMode || enableMulticlusterNP || + if enableNodePortLocal || enableBridgingMode || enableMulticlusterNP || enableFLowExporter || features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) || features.DefaultFeatureGate.Enabled(features.TrafficControl) { listOptions := func(options *metav1.ListOptions) { @@ -602,7 +604,8 @@ func run(o *Options) error { } var flowExporter *exporter.FlowExporter - if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable { + if enableFLowExporter { + podStore := podstore.NewPodStore(localPodInformer) flowExporterOptions := &flowexporter.FlowExporterOptions{ FlowCollectorAddr: o.flowCollectorAddr, FlowCollectorProto: o.flowCollectorProto, @@ -612,7 +615,7 @@ func run(o *Options) error { PollInterval: o.pollInterval, ConnectUplinkToBridge: connectUplinkToBridge} flowExporter, err = exporter.NewFlowExporter( - ifaceStore, + podStore, proxier, k8sClient, nodeRouteController, @@ -877,7 +880,7 @@ func run(o *Options) error { go ofClient.StartPacketInHandler(stopCh) // Start the goroutine to periodically export IPFIX flow records. - if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable { + if enableFLowExporter { go flowExporter.Run(stopCh) } diff --git a/cmd/flow-aggregator/flow-aggregator.go b/cmd/flow-aggregator/flow-aggregator.go index 4de82a2da22..df61ce1e2ed 100644 --- a/cmd/flow-aggregator/flow-aggregator.go +++ b/cmd/flow-aggregator/flow-aggregator.go @@ -29,6 +29,7 @@ import ( "antrea.io/antrea/pkg/log" "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/cipher" + "antrea.io/antrea/pkg/util/podstore" ) const informerDefaultResync = 12 * time.Hour @@ -49,10 +50,11 @@ func run(configFile string) error { informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync) podInformer := informerFactory.Core().V1().Pods() + podStore := podstore.NewPodStore(podInformer.Informer()) flowAggregator, err := aggregator.NewFlowAggregator( k8sClient, - podInformer, + podStore, configFile, ) diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index 0afa851b15a..091693f3ca7 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -70,6 +70,7 @@ MOCKGEN_TARGETS=( "pkg/querier AgentNetworkPolicyInfoQuerier,AgentMulticastInfoQuerier,EgressQuerier testing" "pkg/flowaggregator/querier FlowAggregatorQuerier testing" "pkg/flowaggregator/s3uploader S3UploaderAPI testing" + "pkg/util/podstore Interface testing" "third_party/proxy Provider testing" ) diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index 718cc874da1..926d804cd90 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -24,8 +24,8 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/priorityqueue" - "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/proxy" + "antrea.io/antrea/pkg/util/podstore" ) const ( @@ -34,7 +34,7 @@ const ( type connectionStore struct { connections map[flowexporter.ConnectionKey]*flowexporter.Connection - ifaceStore interfacestore.InterfaceStore + podStore podstore.Interface antreaProxier proxy.Proxier expirePriorityQueue *priorityqueue.ExpirePriorityQueue staleConnectionTimeout time.Duration @@ -42,12 +42,12 @@ type connectionStore struct { } func NewConnectionStore( - ifaceStore interfacestore.InterfaceStore, + podStore podstore.Interface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) connectionStore { return connectionStore{ connections: make(map[flowexporter.ConnectionKey]*flowexporter.Connection), - ifaceStore: ifaceStore, + podStore: podStore, antreaProxier: proxier, expirePriorityQueue: priorityqueue.NewExpirePriorityQueue(o.ActiveFlowTimeout, o.IdleFlowTimeout), staleConnectionTimeout: o.StaleConnectionTimeout, @@ -98,26 +98,26 @@ func (cs *connectionStore) AddConnToMap(connKey *flowexporter.ConnectionKey, con } func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) { - if cs.ifaceStore == nil { - klog.V(4).Info("Interface store is not available to retrieve local Pods information.") + if cs.podStore == nil { + klog.V(4).Info("Pod store is not available to retrieve local Pods information.") return } // sourceIP/destinationIP are mapped only to local pods and not remote pods. srcIP := conn.FlowKey.SourceAddress.String() dstIP := conn.FlowKey.DestinationAddress.String() - sIface, srcFound := cs.ifaceStore.GetInterfaceByIP(srcIP) - dIface, dstFound := cs.ifaceStore.GetInterfaceByIP(dstIP) + srcPod, srcFound := cs.podStore.GetPodByIPAndTime(srcIP, conn.StartTime) + dstPod, dstFound := cs.podStore.GetPodByIPAndTime(dstIP, conn.StartTime) if !srcFound && !dstFound { klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", srcIP, dstIP) } - if srcFound && sIface.Type == interfacestore.ContainerInterface { - conn.SourcePodName = sIface.ContainerInterfaceConfig.PodName - conn.SourcePodNamespace = sIface.ContainerInterfaceConfig.PodNamespace + if srcFound { + conn.SourcePodName = srcPod.Name + conn.SourcePodNamespace = srcPod.Namespace } - if dstFound && dIface.Type == interfacestore.ContainerInterface { - conn.DestinationPodName = dIface.ContainerInterfaceConfig.PodName - conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace + if dstFound { + conn.DestinationPodName = dstPod.Name + conn.DestinationPodNamespace = dstPod.Namespace } } diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index 4e303cb037d..edfc4c68e8a 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -24,7 +24,8 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" - interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" + podstoretest "antrea.io/antrea/pkg/util/podstore/testing" + "antrea.io/antrea/pkg/agent/metrics" ) @@ -79,8 +80,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { testFlowKeys[i] = &connKey } // Create connectionStore - mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) - connStore := NewConnectionStore(mockIfaceStore, nil, testFlowExporterOptions) + mockPodStore := podstoretest.NewMockInterface(ctrl) + connStore := NewConnectionStore(mockPodStore, nil, testFlowExporterOptions) // Add flows to the Connection store for i, flow := range testFlows { connStore.connections[*testFlowKeys[i]] = flow @@ -105,8 +106,8 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) { ctrl := gomock.NewController(t) metrics.InitializeConnectionMetrics() // test on deny connection store - mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) - denyConnStore := NewDenyConnectionStore(mockIfaceStore, nil, testFlowExporterOptions) + mockPodStore := podstoretest.NewMockInterface(ctrl) + denyConnStore := NewDenyConnectionStore(mockPodStore, nil, testFlowExporterOptions) tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} conn := &flowexporter.Connection{ FlowKey: tuple, @@ -123,7 +124,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) { // test on conntrack connection store mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions) + conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockPodStore, nil, testFlowExporterOptions) conntrackConnStore.connections[connKey] = conn metrics.TotalAntreaConnectionsInConnTrackTable.Set(1) diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index bfa1e174534..1f729ff062d 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -25,11 +25,11 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/priorityqueue" - "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/proxy" "antrea.io/antrea/pkg/querier" + "antrea.io/antrea/pkg/util/podstore" ) var serviceProtocolMap = map[uint8]corev1.Protocol{ @@ -53,7 +53,7 @@ func NewConntrackConnectionStore( v4Enabled bool, v6Enabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, - ifaceStore interfacestore.InterfaceStore, + podStore podstore.Interface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions, ) *ConntrackConnectionStore { @@ -63,7 +63,7 @@ func NewConntrackConnectionStore( v6Enabled: v6Enabled, networkPolicyQuerier: npQuerier, pollInterval: o.PollInterval, - connectionStore: NewConnectionStore(ifaceStore, proxier, o), + connectionStore: NewConnectionStore(podStore, proxier, o), connectUplinkToBridge: o.ConnectUplinkToBridge, } } diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index a1b823d2b13..f7304b49c89 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -28,16 +28,16 @@ import ( "github.com/golang/mock/gomock" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/flowexporter" connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" - "antrea.io/antrea/pkg/agent/interfacestore" - interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" "antrea.io/antrea/pkg/agent/openflow" proxytest "antrea.io/antrea/pkg/agent/proxy/testing" queriertest "antrea.io/antrea/pkg/querier/testing" + podstoretest "antrea.io/antrea/pkg/util/podstore/testing" k8sproxy "antrea.io/antrea/third_party/proxy" ) @@ -78,15 +78,18 @@ func BenchmarkPoll(b *testing.B) { func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connectionstest.MockConnTrackDumper) { ctrl := gomock.NewController(b) defer ctrl.Finish() - mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) - testInterface := &interfacestore.InterfaceConfig{ - Type: interfacestore.ContainerInterface, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "pod", - PodNamespace: "pod-ns", + mockPodStore := podstoretest.NewMockInterface(ctrl) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "pod-ns", + Name: "pod", + UID: "pod", + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, }, } - mockIfaceStore.EXPECT().GetInterfaceByIP(gomock.Any()).Return(testInterface, true).AnyTimes() + mockPodStore.EXPECT().GetPodByIPAndTime(gomock.Any(), gomock.Any()).Return(pod, true).AnyTimes() mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) mockConnDumper.EXPECT().GetMaxConnections().Return(100000, nil).AnyTimes() @@ -104,7 +107,7 @@ func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connecti mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true).AnyTimes() npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) - return NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, mockIfaceStore, nil, testFlowExporterOptions), mockConnDumper + return NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, mockPodStore, nil, testFlowExporterOptions), mockConnDumper } func generateConns() []*flowexporter.Connection { diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_test.go index f037aecce7f..65214459a37 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_test.go @@ -27,13 +27,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/component-base/metrics/legacyregistry" "antrea.io/antrea/pkg/agent/flowexporter" connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" - "antrea.io/antrea/pkg/agent/interfacestore" - interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow" proxytest "antrea.io/antrea/pkg/agent/proxy/testing" @@ -41,22 +40,30 @@ import ( cpv1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" secv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" queriertest "antrea.io/antrea/pkg/querier/testing" + podstoretest "antrea.io/antrea/pkg/util/podstore/testing" k8sproxy "antrea.io/antrea/third_party/proxy" ) var ( - tuple1 = flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200} - tuple2 = flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} - tuple3 = flowexporter.Tuple{SourceAddress: net.IP{10, 10, 10, 10}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 60000, DestinationPort: 100} - podConfigFlow1 = &interfacestore.ContainerInterfaceConfig{ - ContainerID: "1", - PodName: "pod1", - PodNamespace: "ns1", - } - interfaceFlow1 = &interfacestore.InterfaceConfig{ - InterfaceName: "interface1", - IPs: []net.IP{{8, 7, 6, 5}}, - ContainerInterfaceConfig: podConfigFlow1, + tuple1 = flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200} + tuple2 = flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple3 = flowexporter.Tuple{SourceAddress: net.IP{10, 10, 10, 10}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 60000, DestinationPort: 100} + pod1 = &v1.Pod{ + Status: v1.PodStatus{ + PodIPs: []v1.PodIP{ + { + IP: net.IP{8, 7, 6, 5}.String(), + }, + { + IP: net.IP{4, 3, 2, 1}.String(), + }, + }, + Phase: v1.PodRunning, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "ns1", + }, } servicePortName = k8sproxy.ServicePortName{ NamespacedName: types.NamespacedName{ @@ -208,12 +215,12 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { }, }, } - // Mock interface store with one of the couple of IPs correspond to Pods - mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) + + mockPodStore := podstoretest.NewMockInterface(ctrl) mockProxier := proxytest.NewMockProxier(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) - conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, mockIfaceStore, mockProxier, testFlowExporterOptions) + conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, mockPodStore, mockProxier, testFlowExporterOptions) for _, c := range tc { t.Run(c.name, func(t *testing.T) { @@ -221,7 +228,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { if c.oldConn != nil { addConnToStore(conntrackConnStore, c.oldConn) } else { - testAddNewConn(mockIfaceStore, mockProxier, npQuerier, c.newConn) + testAddNewConn(mockPodStore, mockProxier, npQuerier, c.newConn) } conntrackConnStore.AddOrUpdateConn(&c.newConn) actualConn, exist := conntrackConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.newConn)) @@ -234,9 +241,9 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { } // testAddNewConn tests podInfo, Services, network policy mapping. -func testAddNewConn(mockIfaceStore *interfacestoretest.MockInterfaceStore, mockProxier *proxytest.MockProxier, npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier, conn flowexporter.Connection) { - mockIfaceStore.EXPECT().GetInterfaceByIP(conn.FlowKey.SourceAddress.String()).Return(nil, false) - mockIfaceStore.EXPECT().GetInterfaceByIP(conn.FlowKey.DestinationAddress.String()).Return(interfaceFlow1, true) +func testAddNewConn(mockPodStore *podstoretest.MockInterface, mockProxier *proxytest.MockProxier, npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier, conn flowexporter.Connection) { + mockPodStore.EXPECT().GetPodByIPAndTime(conn.FlowKey.SourceAddress.String(), gomock.Any()).Return(nil, false) + mockPodStore.EXPECT().GetPodByIPAndTime(conn.FlowKey.DestinationAddress.String(), gomock.Any()).Return(pod1, true) protocol, _ := lookupServiceProtocol(conn.FlowKey.Protocol) serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress.String(), conn.DestinationServicePort, protocol) @@ -294,8 +301,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { // For testing purposes, set the metric metrics.TotalAntreaConnectionsInConnTrackTable.Set(float64(len(testFlows))) // Create connectionStore - mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) - connStore := NewConntrackConnectionStore(nil, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions) + mockPodStore := podstoretest.NewMockInterface(ctrl) + connStore := NewConntrackConnectionStore(nil, true, false, nil, mockPodStore, nil, testFlowExporterOptions) // Add flows to the connection store. for i, flow := range testFlows { connStore.connections[*testFlowKeys[i]] = flow @@ -316,9 +323,9 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) { testFlows := make([]*flowexporter.Connection, 0) // Create connectionStore - mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockIfaceStore, nil, testFlowExporterOptions) + conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockPodStore, nil, testFlowExporterOptions) // Hard-coded conntrack occupancy metrics for test TotalConnections := 0 MaxConnections := 300000 diff --git a/pkg/agent/flowexporter/connections/deny_connections.go b/pkg/agent/flowexporter/connections/deny_connections.go index 534481a249f..0f60ff0daeb 100644 --- a/pkg/agent/flowexporter/connections/deny_connections.go +++ b/pkg/agent/flowexporter/connections/deny_connections.go @@ -22,19 +22,19 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/priorityqueue" - "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/proxy" "antrea.io/antrea/pkg/util/ip" + "antrea.io/antrea/pkg/util/podstore" ) type DenyConnectionStore struct { connectionStore } -func NewDenyConnectionStore(ifaceStore interfacestore.InterfaceStore, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore { +func NewDenyConnectionStore(podStore podstore.Interface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore { return &DenyConnectionStore{ - connectionStore: NewConnectionStore(ifaceStore, proxier, o), + connectionStore: NewConnectionStore(podStore, proxier, o), } } diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index 25087ec491c..c3a7cb3cb9e 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -29,9 +29,9 @@ import ( "k8s.io/component-base/metrics/legacyregistry" "antrea.io/antrea/pkg/agent/flowexporter" - interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" "antrea.io/antrea/pkg/agent/metrics" proxytest "antrea.io/antrea/pkg/agent/proxy/testing" + podstoretest "antrea.io/antrea/pkg/util/podstore/testing" k8sproxy "antrea.io/antrea/third_party/proxy" ) @@ -60,15 +60,15 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { OriginalPackets: uint64(1), IsActive: true, } - mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockProxier := proxytest.NewMockProxier(ctrl) protocol, _ := lookupServiceProtocol(tuple.Protocol) serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol) mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true) - mockIfaceStore.EXPECT().GetInterfaceByIP(tuple.SourceAddress.String()).Return(nil, false) - mockIfaceStore.EXPECT().GetInterfaceByIP(tuple.DestinationAddress.String()).Return(nil, false) + mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(nil, false) + mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(nil, false) - denyConnStore := NewDenyConnectionStore(mockIfaceStore, mockProxier, testFlowExporterOptions) + denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions) denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 20)), uint64(60)) expConn := testFlow diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 622165fdb7c..fb853f895c5 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -33,7 +33,6 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/connections" "antrea.io/antrea/pkg/agent/flowexporter/priorityqueue" - "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/proxy" @@ -42,6 +41,7 @@ import ( "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/env" k8sutil "antrea.io/antrea/pkg/util/k8s" + "antrea.io/antrea/pkg/util/podstore" ) // When initializing flowExporter, a slice is allocated with a fixed size to @@ -129,6 +129,7 @@ type FlowExporter struct { denyPriorityQueue *priorityqueue.ExpirePriorityQueue expiredConns []flowexporter.Connection egressQuerier querier.EgressQuerier + podStore podstore.Interface } func genObservationID(nodeName string) uint32 { @@ -153,7 +154,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter return expInput } -func NewFlowExporter(ifaceStore interfacestore.InterfaceStore, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller, +func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller, trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet, ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions, egressQuerier querier.EgressQuerier) (*FlowExporter, error) { @@ -169,8 +170,8 @@ func NewFlowExporter(ifaceStore interfacestore.InterfaceStore, proxier proxy.Pro expInput := prepareExporterInputArgs(o.FlowCollectorProto, nodeName) connTrackDumper := connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, proxyEnabled) - denyConnStore := connections.NewDenyConnectionStore(ifaceStore, proxier, o) - conntrackConnStore := connections.NewConntrackConnectionStore(connTrackDumper, v4Enabled, v6Enabled, npQuerier, ifaceStore, proxier, o) + denyConnStore := connections.NewDenyConnectionStore(podStore, proxier, o) + conntrackConnStore := connections.NewConntrackConnectionStore(connTrackDumper, v4Enabled, v6Enabled, npQuerier, podStore, proxier, o) return &FlowExporter{ collectorAddr: o.FlowCollectorAddr, @@ -189,6 +190,7 @@ func NewFlowExporter(ifaceStore interfacestore.InterfaceStore, proxier proxy.Pro denyPriorityQueue: denyConnStore.GetPriorityQueue(), expiredConns: make([]flowexporter.Connection, 0, maxConnsToExport*2), egressQuerier: egressQuerier, + podStore: podStore, }, nil } @@ -197,6 +199,7 @@ func (exp *FlowExporter) GetDenyConnStore() *connections.DenyConnectionStore { } func (exp *FlowExporter) Run(stopCh <-chan struct{}) { + go exp.podStore.Run(stopCh) // Start the goroutine to periodically delete stale deny connections. go exp.denyConnStore.RunPeriodicDeletion(stopCh) diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index a1b91b84828..bc4b36fc714 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -28,11 +28,7 @@ import ( ipfixentities "github.com/vmware/go-ipfix/pkg/entities" ipfixintermediate "github.com/vmware/go-ipfix/pkg/intermediate" ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" - corev1 "k8s.io/api/core/v1" - coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator" @@ -41,6 +37,7 @@ import ( "antrea.io/antrea/pkg/flowaggregator/options" "antrea.io/antrea/pkg/flowaggregator/querier" "antrea.io/antrea/pkg/ipfix" + "antrea.io/antrea/pkg/util/podstore" ) var ( @@ -84,9 +81,6 @@ const ( udpTransport = "udp" tcpTransport = "tcp" collectorAddress = "0.0.0.0:4739" - - // PodInfo index name for Pod cache. - podInfoIndex = "podInfo" ) // these are used for unit testing @@ -115,7 +109,7 @@ type flowAggregator struct { flowAggregatorAddress string includePodLabels bool k8sClient kubernetes.Interface - podInformer coreinformers.PodInformer + podStore podstore.Interface numRecordsExported int64 updateCh chan *options.Options configFile string @@ -127,12 +121,11 @@ type flowAggregator struct { s3Exporter exporter.Interface logExporter exporter.Interface logTickerDuration time.Duration - podLister corelisters.PodLister } func NewFlowAggregator( k8sClient kubernetes.Interface, - podInformer coreinformers.PodInformer, + podStore podstore.Interface, configFile string, ) (*flowAggregator, error) { if len(configFile) == 0 { @@ -170,14 +163,13 @@ func NewFlowAggregator( flowAggregatorAddress: opt.Config.FlowAggregatorAddress, includePodLabels: opt.Config.RecordContents.PodLabels, k8sClient: k8sClient, - podInformer: podInformer, + podStore: podStore, updateCh: make(chan *options.Options), configFile: configFile, configWatcher: configWatcher, configData: data, APIServer: opt.Config.APIServer, logTickerDuration: time.Minute, - podLister: podInformer.Lister(), } err = fa.InitCollectingProcess() if err != nil { @@ -211,25 +203,9 @@ func NewFlowAggregator( if opt.Config.FlowCollector.Enable { fa.ipfixExporter = newIPFIXExporter(k8sClient, opt, registry) } - podInformer.Informer().AddIndexers(cache.Indexers{podInfoIndex: podInfoIndexFunc}) return fa, nil } -func podInfoIndexFunc(obj interface{}) ([]string, error) { - pod, ok := obj.(*corev1.Pod) - if !ok { - return nil, fmt.Errorf("obj is not pod: %+v", obj) - } - if len(pod.Status.PodIPs) > 0 && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed { - indexes := make([]string, len(pod.Status.PodIPs)) - for i := range pod.Status.PodIPs { - indexes[i] = pod.Status.PodIPs[i].IP - } - return indexes, nil - } - return nil, nil -} - func (fa *flowAggregator) InitCollectingProcess() error { var cpInput collector.CollectorInput if fa.aggregatorTransportProtocol == flowaggregatorconfig.AggregatorTransportProtocolTLS { @@ -315,6 +291,7 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}) { if fa.logExporter != nil { fa.logExporter.Start() } + go fa.podStore.Run(stopCh) var wg sync.WaitGroup wg.Add(1) go func() { @@ -403,12 +380,16 @@ func (fa *flowAggregator) flowExportLoop(stopCh <-chan struct{}) { func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, record *ipfixintermediate.AggregationFlowRecord) error { isRecordIPv4 := fa.aggregationProcess.IsAggregatedRecordIPv4(*record) + startTime, err := fa.getRecordStartTime(record.Record) + if err != nil { + return fmt.Errorf("cannot find record start time: %v", err) + } if !fa.aggregationProcess.AreCorrelatedFieldsFilled(*record) { - fa.fillK8sMetadata(key, record.Record) + fa.fillK8sMetadata(key, record.Record, *startTime) fa.aggregationProcess.SetCorrelatedFieldsFilled(record, true) } if fa.includePodLabels && !fa.aggregationProcess.AreExternalFieldsFilled(*record) { - fa.fillPodLabels(record.Record) + fa.fillPodLabels(key, record.Record, *startTime) fa.aggregationProcess.SetExternalFieldsFilled(record, true) } if fa.ipfixExporter != nil { @@ -440,16 +421,12 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor // fillK8sMetadata fills Pod name, Pod namespace and Node name for inter-Node flows // that have incomplete info due to deny network policy. -func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record ipfixentities.Record) { +func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record ipfixentities.Record, startTime time.Time) { // fill source Pod info when sourcePodName is empty if sourcePodName, _, exist := record.GetInfoElementWithValue("sourcePodName"); exist { if sourcePodName.GetStringValue() == "" { - pods, err := fa.podInformer.Informer().GetIndexer().ByIndex(podInfoIndex, key.SourceAddress) - if err == nil && len(pods) > 0 { - pod, ok := pods[0].(*corev1.Pod) - if !ok { - klog.Warningf("Invalid Pod obj in cache") - } + pod, exist := fa.podStore.GetPodByIPAndTime(key.SourceAddress, startTime) + if exist { sourcePodName.SetStringValue(pod.Name) if sourcePodNamespace, _, exist := record.GetInfoElementWithValue("sourcePodNamespace"); exist { sourcePodNamespace.SetStringValue(pod.Namespace) @@ -458,19 +435,15 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record sourceNodeName.SetStringValue(pod.Spec.NodeName) } } else { - klog.Warning(err) + klog.ErrorS(nil, "Cannot find Pod information", "sourceAddress", key.SourceAddress, "flowStartTime", startTime) } } } // fill destination Pod info when destinationPodName is empty if destinationPodName, _, exist := record.GetInfoElementWithValue("destinationPodName"); exist { if destinationPodName.GetStringValue() == "" { - pods, err := fa.podInformer.Informer().GetIndexer().ByIndex(podInfoIndex, key.DestinationAddress) - if len(pods) > 0 && err == nil { - pod, ok := pods[0].(*corev1.Pod) - if !ok { - klog.Warningf("Invalid Pod obj in cache") - } + pod, exist := fa.podStore.GetPodByIPAndTime(key.DestinationAddress, startTime) + if exist { destinationPodName.SetStringValue(pod.Name) if destinationPodNamespace, _, exist := record.GetInfoElementWithValue("destinationPodNamespace"); exist { destinationPodNamespace.SetStringValue(pod.Namespace) @@ -479,16 +452,25 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record destinationNodeName.SetStringValue(pod.Spec.NodeName) } } else { - klog.Warning(err) + klog.ErrorS(nil, "Cannot find Pod information", "destinationAddress", key.DestinationAddress, "flowStartTime", startTime) } } } } -func (fa *flowAggregator) fetchPodLabels(podNamespace string, podName string) string { - pod, err := fa.podLister.Pods(podNamespace).Get(podName) - if err != nil { - klog.InfoS("Failed to get Pod", "namespace", podNamespace, "name", podName, "err", err) +func (fa *flowAggregator) getRecordStartTime(record ipfixentities.Record) (*time.Time, error) { + flowStartSeconds, _, exist := record.GetInfoElementWithValue("flowStartSeconds") + if !exist { + return nil, fmt.Errorf("flowStartSeconds filed is empty") + } + startTime := time.Unix(int64(flowStartSeconds.GetUnsigned32Value()), 0) + return &startTime, nil +} + +func (fa *flowAggregator) fetchPodLabels(ip string, startTime time.Time) string { + pod, exist := fa.podStore.GetPodByIPAndTime(ip, startTime) + if !exist { + klog.ErrorS(nil, "Error when getting Pod information from podInformer", "ip", ip, "startTime", startTime) return "" } labelsJSON, err := json.Marshal(pod.GetLabels()) @@ -499,17 +481,18 @@ func (fa *flowAggregator) fetchPodLabels(podNamespace string, podName string) st return string(labelsJSON) } -func (fa *flowAggregator) fillPodLabelsForSide(record ipfixentities.Record, podNamespaceIEName, podNameIEName, podLabelsIEName string) error { +func (fa *flowAggregator) fillPodLabelsForSide(ip string, record ipfixentities.Record, startTime time.Time, podNamespaceIEName, podNameIEName, podLabelsIEName string) error { podLabelsString := "" if podName, _, ok := record.GetInfoElementWithValue(podNameIEName); ok { podNameString := podName.GetStringValue() if podNamespace, _, ok := record.GetInfoElementWithValue(podNamespaceIEName); ok { podNamespaceString := podNamespace.GetStringValue() if podNameString != "" && podNamespaceString != "" { - podLabelsString = fa.fetchPodLabels(podNamespaceString, podNameString) + podLabelsString = fa.fetchPodLabels(ip, startTime) } } } + podLabelsElement, err := fa.registry.GetInfoElement(podLabelsIEName, ipfixregistry.AntreaEnterpriseID) if err == nil { podLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(podLabelsElement, bytes.NewBufferString(podLabelsString).Bytes()) @@ -526,11 +509,11 @@ func (fa *flowAggregator) fillPodLabelsForSide(record ipfixentities.Record, podN return nil } -func (fa *flowAggregator) fillPodLabels(record ipfixentities.Record) { - if err := fa.fillPodLabelsForSide(record, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil { +func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ipfixentities.Record, startTime time.Time) { + if err := fa.fillPodLabelsForSide(key.SourceAddress, record, startTime, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil { klog.ErrorS(err, "Error when filling pod labels", "side", "source") } - if err := fa.fillPodLabelsForSide(record, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil { + if err := fa.fillPodLabelsForSide(key.DestinationAddress, record, startTime, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil { klog.ErrorS(err, "Error when filling pod labels", "side", "destination") } } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 1caf6c26bde..bc71bbe77f2 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -18,6 +18,7 @@ import ( "bytes" "os" "path/filepath" + "strconv" "sync" "testing" "time" @@ -33,10 +34,8 @@ import ( "gopkg.in/yaml.v2" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator" "antrea.io/antrea/pkg/flowaggregator/exporter" @@ -45,6 +44,7 @@ import ( "antrea.io/antrea/pkg/flowaggregator/querier" "antrea.io/antrea/pkg/ipfix" ipfixtesting "antrea.io/antrea/pkg/ipfix/testing" + podstoretest "antrea.io/antrea/pkg/util/podstore/testing" ) const ( @@ -59,17 +59,14 @@ func init() { func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { ctrl := gomock.NewController(t) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) - client := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync) - newFlowAggregator := func(includePodLabels bool) *flowAggregator { - podInformer := informerFactory.Core().V1().Pods() return &flowAggregator{ aggregatorTransportProtocol: "tcp", aggregationProcess: mockAggregationProcess, @@ -80,8 +77,7 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { registry: mockIPFIXRegistry, flowAggregatorAddress: "", includePodLabels: includePodLabels, - podInformer: podInformer, - podLister: podInformer.Lister(), + podStore: mockPodStore, } } @@ -150,9 +146,12 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { for _, exporter := range mockExporters { exporter.EXPECT().AddRecord(mockRecord, tc.isIPv6) } + emptyStr := make([]byte, 0) + mockAggregationProcess.EXPECT().ResetStatAndThroughputElementsInRecord(mockRecord).Return(nil) + flowStartSecondsElement, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("flowStartSeconds", 150, 14, ipfixregistry.IANAEnterpriseID, 4), []byte(strconv.Itoa(int(time.Now().Unix())))) + mockRecord.EXPECT().GetInfoElementWithValue("flowStartSeconds").Return(flowStartSecondsElement, 0, true) mockAggregationProcess.EXPECT().AreCorrelatedFieldsFilled(*tc.flowRecord).Return(false) - emptyStr := make([]byte, 0) sourcePodNameElem, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourcePodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), emptyStr) mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, false) destPodNameElem, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationPodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), emptyStr) @@ -445,6 +444,7 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) { func TestFlowAggregator_Run(t *testing.T) { ctrl := gomock.NewController(t) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) mockS3Exporter := exportertesting.NewMockInterface(ctrl) @@ -499,12 +499,14 @@ func TestFlowAggregator_Run(t *testing.T) { ipfixExporter: mockIPFIXExporter, configWatcher: configWatcher, updateCh: updateCh, + podStore: mockPodStore, } mockCollectingProcess.EXPECT().Start() mockCollectingProcess.EXPECT().Stop() mockAggregationProcess.EXPECT().Start() mockAggregationProcess.EXPECT().Stop() + mockPodStore.EXPECT().Run(gomock.Any()) // Mock expectations determined by sequence of updateOptions operations below. mockIPFIXExporter.EXPECT().Start().Times(2) @@ -663,64 +665,8 @@ func TestFlowAggregator_closeUpdateChBeforeFlowExportLoopReturns(t *testing.T) { wg2.Wait() } -func TestFlowAggregator_podInfoIndexFunc(t *testing.T) { - node := &v1.Node{} - pendingPod := &v1.Pod{ - Status: v1.PodStatus{ - Phase: v1.PodPending, - PodIPs: []v1.PodIP{ - { - IP: "192.168.1.2", - }, - }, - }, - } - succeededPod := &v1.Pod{ - Status: v1.PodStatus{ - Phase: v1.PodSucceeded, - PodIPs: []v1.PodIP{ - { - IP: "192.168.1.3", - }, - }, - }, - } - - tests := []struct { - name string - obj interface{} - want []string - expectedErr string - }{ - { - name: "object is not pod", - obj: node, - expectedErr: "obj is not pod: ", - }, - { - name: "pod status in pending phase", - obj: pendingPod, - want: []string{"192.168.1.2"}, - }, - { - name: "pod status in succeeded phase", - obj: succeededPod, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := podInfoIndexFunc(tt.obj) - if tt.expectedErr != "" { - assert.ErrorContains(t, err, tt.expectedErr) - } else { - require.NoError(t, err) - assert.Equal(t, tt.want, got) - } - }) - } -} - func TestFlowAggregator_fetchPodLabels(t *testing.T) { + ctrl := gomock.NewController(t) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", @@ -740,46 +686,36 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) { } client := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) - informerFactory.Core().V1().Pods().Informer().AddIndexers(cache.Indexers{podInfoIndex: podInfoIndexFunc}) - - stopCh := make(chan struct{}) - defer close(stopCh) - - informerFactory.Start(stopCh) - informerFactory.WaitForCacheSync(stopCh) - informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pod) + // Mock pod store + mockPodStore := podstoretest.NewMockInterface(ctrl) + mockPodStore.EXPECT().GetPodByIPAndTime("", gomock.Any()).Return(nil, false) + mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.2", gomock.Any()).Return(pod, true) tests := []struct { - name string - podName string - podNamespace string - want string + name string + ip string + want string }{ { - name: "no pod object", - podName: "", - podNamespace: "", - want: "", + name: "no pod object", + ip: "", + want: "", }, { - name: "pod with label", - podName: "testPod", - podNamespace: "default", - want: "{\"test\":\"ut\"}", + name: "pod with label", + ip: "192.168.1.2", + want: "{\"test\":\"ut\"}", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - podInformer := informerFactory.Core().V1().Pods() fa := &flowAggregator{ k8sClient: client, includePodLabels: true, - podInformer: podInformer, - podLister: podInformer.Lister(), + podStore: mockPodStore, } - got := fa.fetchPodLabels(tt.podNamespace, tt.podName) + got := fa.fetchPodLabels(tt.ip, time.Now()) assert.Equal(t, tt.want, got) }) } @@ -865,6 +801,7 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "sourcePod", + UID: "sourcePod", }, Spec: v1.PodSpec{ NodeName: "sourceNode", @@ -881,6 +818,7 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "destinationPod", + UID: "destinationPod", }, Spec: v1.PodSpec{ NodeName: "destinationNode", @@ -909,25 +847,18 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { ctrl := gomock.NewController(t) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) - client := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) - informerFactory.Core().V1().Pods().Informer().AddIndexers(cache.Indexers{podInfoIndex: podInfoIndexFunc}) + mockPodStore := podstoretest.NewMockInterface(ctrl) stopCh := make(chan struct{}) defer close(stopCh) - informerFactory.Start(stopCh) - informerFactory.WaitForCacheSync(stopCh) - informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(srcPod) - informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(dstPod) - ipv4Key := ipfixintermediate.FlowKey{ SourceAddress: "192.168.1.2", DestinationAddress: "192.168.1.3", } fa := &flowAggregator{ - podInformer: informerFactory.Core().V1().Pods(), + podStore: mockPodStore, } mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, true) @@ -936,6 +867,8 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destinationPodNameElem, 0, true) mockRecord.EXPECT().GetInfoElementWithValue("destinationPodNamespace").Return(destinationPodNamespaceElem, 0, true) mockRecord.EXPECT().GetInfoElementWithValue("destinationNodeName").Return(destinationNodeNameElem, 0, true) + mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.2", gomock.Any()).Return(srcPod, true) + mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.3", gomock.Any()).Return(dstPod, true) - fa.fillK8sMetadata(ipv4Key, mockRecord) + fa.fillK8sMetadata(ipv4Key, mockRecord, time.Now()) } diff --git a/pkg/util/podstore/podstore.go b/pkg/util/podstore/podstore.go new file mode 100644 index 00000000000..42d6c31d601 --- /dev/null +++ b/pkg/util/podstore/podstore.go @@ -0,0 +1,232 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package podstore + +import ( + "fmt" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const ( + deleteQueueName = "podStorePodsToDelete" + podIPIndex = "PodIP" + delayTime = time.Minute * 5 +) + +type PodStore struct { + pods cache.Indexer + podsToDelete workqueue.DelayingInterface + // Mapping pod.uuid to podTimestamps + timestampMap map[types.UID]*podTimestamps + clock clock.Clock + mutex sync.RWMutex +} + +type podTimestamps struct { + CreationTimestamp time.Time + // DeletionTimestamp is nil if a Pod is not deleted. + DeletionTimestamp *time.Time +} + +// Interface is a podStore interface to create local podStore for Flow Exporter and Flow Aggregator. +type Interface interface { + GetPodByIPAndTime(ip string, startTime time.Time) (*corev1.Pod, bool) + Run(stopCh <-chan struct{}) +} + +// NewPodStoreWithClock creates a Pod Store with a custom clock, +// which is useful when writing robust unit tests. +func NewPodStoreWithClock(podInformer cache.SharedIndexInformer, clock clock.WithTicker) *PodStore { + s := &PodStore{ + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + podsToDelete: workqueue.NewDelayingQueueWithCustomClock(clock, deleteQueueName), + clock: clock, + timestampMap: map[types.UID]*podTimestamps{}, + mutex: sync.RWMutex{}, + } + podInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: s.onPodCreate, + UpdateFunc: s.onPodUpdate, + DeleteFunc: s.onPodDelete, + }) + return s +} + +func NewPodStore(podInformer cache.SharedIndexInformer) *PodStore { + return NewPodStoreWithClock(podInformer, clock.RealClock{}) +} + +func (s *PodStore) onPodUpdate(oldObj interface{}, newObj interface{}) { + newPod, ok := newObj.(*corev1.Pod) + if !ok { + klog.ErrorS(nil, "Received unexpected object", "newObj", newObj) + return + } + err := s.pods.Update(newPod) + if err != nil { + klog.ErrorS(err, "Error when updating Pod in index") + return + } + klog.V(4).InfoS("Processed Pod Update Event", "Pod", klog.KObj(newPod)) +} + +func (s *PodStore) onPodCreate(obj interface{}) { + s.mutex.Lock() + defer s.mutex.Unlock() + timeNow := s.clock.Now() + pod, ok := obj.(*corev1.Pod) + if !ok { + klog.ErrorS(nil, "Received unexpected object", "obj", obj) + return + } + err := s.pods.Add(pod) + if err != nil { + klog.ErrorS(err, "Error when adding Pod to index") + return + } + switch pod.Status.Phase { + case corev1.PodPending: + s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: timeNow} + default: + s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: pod.CreationTimestamp.Time} + } + klog.V(4).InfoS("Processed Pod Create Event", "Pod", klog.KObj(pod)) +} + +func (s *PodStore) onPodDelete(obj interface{}) { + s.mutex.Lock() + defer s.mutex.Unlock() + timeNow := s.clock.Now() + pod, ok := obj.(*corev1.Pod) + if !ok { + var err error + pod, err = s.checkDeletedPod(obj) + if err != nil { + klog.ErrorS(err, "Got error while processing Delete Event") + return + } + } + timestamp, ok := s.timestampMap[pod.UID] + if !ok { + klog.ErrorS(nil, "Cannot find podTimestamps in timestampMap", "UID", pod.UID) + return + } + timestamp.DeletionTimestamp = &timeNow + s.podsToDelete.AddAfter(pod, delayTime) + klog.V(4).InfoS("Processed Pod Delete Event", "Pod", klog.KObj(pod)) +} + +func (s *PodStore) checkDeletedPod(obj interface{}) (*corev1.Pod, error) { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("received unexpected object: %v", obj) + } + pod, ok := deletedState.Obj.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown object is not of type Pod: %v", deletedState.Obj) + } + return pod, nil +} + +func (s *PodStore) GetPodByIPAndTime(ip string, time time.Time) (*corev1.Pod, bool) { + s.mutex.RLock() + defer s.mutex.RUnlock() + pods, _ := s.pods.ByIndex(podIPIndex, ip) + if len(pods) == 0 { + return nil, false + } else if len(pods) == 1 { + pod := pods[0].(*corev1.Pod) + // In case the clocks may be skewed between different Nodes in the cluster, we directly return the Pod if there is only + // one Pod in the indexer. Otherwise, we check the timestamp for Pods in the indexer. + klog.V(4).InfoS("Matched Pod IP to Pod from indexer", "ip", ip, "Pod", klog.KObj(pod)) + return pod, true + } + for _, pod := range pods { + pod := pod.(*corev1.Pod) + timestamp, ok := s.timestampMap[pod.UID] + if !ok { + continue + } + if timestamp.CreationTimestamp.Before(time) && (timestamp.DeletionTimestamp == nil || time.Before(*timestamp.DeletionTimestamp)) { + klog.V(4).InfoS("Matched Pod IP and time to Pod from indexer", "ip", ip, "time", time, "Pod", klog.KObj(pod)) + return pod, true + } + } + return nil, false +} + +func (s *PodStore) Run(stopCh <-chan struct{}) { + defer s.podsToDelete.ShutDown() + go wait.Until(s.worker, time.Second, stopCh) + <-stopCh +} + +// worker runs a worker thread that just dequeues item from deleteQueue and +// remove the item from prevPod. +func (s *PodStore) worker() { + for s.processDeleteQueueItem() { + } +} + +func (s *PodStore) processDeleteQueueItem() bool { + pod, quit := s.podsToDelete.Get() + if quit { + return false + } + s.mutex.Lock() + defer s.mutex.Unlock() + err := s.pods.Delete(pod) + if err != nil { + klog.ErrorS(err, "Error when deleting Pod from deletion workqueue", "Pod", klog.KObj(pod.(*corev1.Pod))) + return false + } + delete(s.timestampMap, pod.(*corev1.Pod).UID) + s.podsToDelete.Done(pod) + klog.V(4).InfoS("Removed Pod from Pod Store", "Pod", klog.KObj(pod.(*corev1.Pod))) + return true +} + +func podKeyFunc(obj interface{}) (string, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return "", fmt.Errorf("obj is not Pod: %+v", obj) + } + return string(pod.UID), nil +} + +func podIPIndexFunc(obj interface{}) ([]string, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("obj is not Pod: %+v", obj) + } + if len(pod.Status.PodIPs) > 0 { + indexes := make([]string, len(pod.Status.PodIPs)) + for i := range pod.Status.PodIPs { + indexes[i] = pod.Status.PodIPs[i].IP + } + return indexes, nil + } + return nil, nil +} diff --git a/pkg/util/podstore/podstore_test.go b/pkg/util/podstore/podstore_test.go new file mode 100644 index 00000000000..a53e42e19e5 --- /dev/null +++ b/pkg/util/podstore/podstore_test.go @@ -0,0 +1,546 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package podstore + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + clock "k8s.io/utils/clock/testing" +) + +var ( + refTime = time.Now() + refTime2 = refTime.Add(-5 * time.Minute) + pod1 = &v1.Pod{ + Status: v1.PodStatus{ + PodIPs: []v1.PodIP{ + { + IP: "1.2.3.4", + }, + }, + Phase: v1.PodPending, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "pod1_ns", + UID: "pod1", + CreationTimestamp: metav1.Time{Time: refTime2}, + }, + } + pod2 = &v1.Pod{ + Status: v1.PodStatus{ + PodIPs: []v1.PodIP{ + { + IP: "5.6.7.8", + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "pod2_ns", + UID: "pod2", + CreationTimestamp: metav1.Time{Time: refTime2}, + DeletionTimestamp: &metav1.Time{Time: refTime}, + }, + } + pod3 = &v1.Pod{ + Status: v1.PodStatus{ + PodIPs: []v1.PodIP{ + { + IP: "4.3.2.1", + }, + }, + Phase: v1.PodRunning, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Namespace: "pod3_ns", + UID: "pod3", + CreationTimestamp: metav1.Time{Time: refTime2}, + }, + } + pod4 = &v1.Pod{ + Status: v1.PodStatus{ + PodIPs: []v1.PodIP{ + { + IP: "1.2.3.4", + }, + }, + Phase: v1.PodSucceeded, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "pod1_ns", + UID: "pod4", + CreationTimestamp: metav1.Time{Time: refTime2}, + DeletionTimestamp: &metav1.Time{Time: refTime}, + }, + } + timestampMap = map[types.UID]*podTimestamps{ + "pod1": {CreationTimestamp: refTime}, + "pod2": {CreationTimestamp: refTime2, DeletionTimestamp: &refTime}, + "pod4": {CreationTimestamp: refTime2, DeletionTimestamp: &refTime}, + } + node = &v1.Node{} +) + +func Test_onPodUpdate(t *testing.T) { + newPod1 := &v1.Pod{ + Status: v1.PodStatus{ + PodIPs: []v1.PodIP{ + { + IP: "4.5.6.7", + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "pod1_ns", + UID: "pod1", + }, + } + tests := []struct { + name string + oldObj interface{} + newObj interface{} + expectedPod *v1.Pod + }{ + { + name: "newObj is not Pod", + newObj: node, + expectedPod: pod1, + }, + { + name: "valid case", + newObj: newPod1, + expectedPod: newPod1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + podStore := &PodStore{ + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + } + require.NoError(t, podStore.pods.Add(pod1)) + podStore.onPodUpdate(tt.oldObj, tt.newObj) + require.Len(t, podStore.pods.List(), 1) + assert.Equal(t, tt.expectedPod, podStore.pods.List()[0].(*v1.Pod)) + }) + } +} + +func Test_onPodCreate(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedMap map[types.UID]*podTimestamps + }{ + { + name: "object is not Pod", + obj: node, + expectedMap: map[types.UID]*podTimestamps{}, + }, + { + name: "valid case for Pending Pod", + obj: pod1, + expectedMap: map[types.UID]*podTimestamps{"pod1": {CreationTimestamp: refTime}}, + }, + { + name: "valid case for Running Pod", + obj: pod3, + expectedMap: map[types.UID]*podTimestamps{"pod3": {CreationTimestamp: refTime2}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + podStore := &PodStore{ + timestampMap: map[types.UID]*podTimestamps{}, + clock: clock.NewFakeClock(refTime), + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + } + podStore.onPodCreate(tt.obj) + assert.Equal(t, tt.expectedMap, podStore.timestampMap) + }) + } +} + +func Test_onPodDelete(t *testing.T) { + t.Run("object is neither Pod nor DeletedFinalStateUnknown", func(t *testing.T) { + k8sClient := fake.NewSimpleClientset() + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := NewPodStore(podInformer) + require.NoError(t, podStore.pods.Add(pod1)) + podStore.timestampMap = map[types.UID]*podTimestamps{"pod1": {CreationTimestamp: refTime}} + podStore.onPodDelete(node) + assert.Equal(t, &podTimestamps{CreationTimestamp: refTime}, podStore.timestampMap["pod1"]) + }) + t.Run("Pod is in prevPod and podsToDelete", func(t *testing.T) { + k8sClient := fake.NewSimpleClientset() + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + fakeClock := clock.NewFakeClock(refTime) + podStore := NewPodStoreWithClock(podInformer, fakeClock) + require.NoError(t, podStore.pods.Add(pod1)) + podStore.timestampMap = map[types.UID]*podTimestamps{"pod1": {CreationTimestamp: refTime}} + expectedDeleteTime := refTime.Add(delayTime) + podStore.onPodDelete(pod1) + assert.Equal(t, &podTimestamps{CreationTimestamp: refTime, DeletionTimestamp: &refTime}, podStore.timestampMap["pod1"]) + fakeClock.SetTime(expectedDeleteTime.Add(-10 * time.Millisecond)) + assert.Equal(t, podStore.podsToDelete.Len(), 0) + fakeClock.SetTime(expectedDeleteTime.Add(10 * time.Millisecond)) + assert.Eventuallyf(t, func() bool { + return podStore.podsToDelete.Len() == 1 + }, 1*time.Second, 10*time.Millisecond, "Pod is not added to PodsToDelete") + }) +} + +func Test_checkDeletedPod(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedResult *v1.Pod + expectedErr string + }{ + { + name: "object is not DeletedFinalStateUnknown", + obj: node, + expectedErr: "received unexpected object: ", + }, + { + name: "object in DeletedFinalStateUnknown is not Pod", + obj: cache.DeletedFinalStateUnknown{Obj: node}, + expectedErr: "DeletedFinalStateUnknown object is not of type Pod", + }, + { + name: "valid case", + obj: cache.DeletedFinalStateUnknown{Obj: pod1}, + expectedResult: pod1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pod, err := (&PodStore{}).checkDeletedPod(tt.obj) + if tt.expectedErr != "" { + assert.ErrorContains(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expectedResult, pod) + } + }) + } +} + +func Test_GetPodByIPAndTime(t *testing.T) { + tests := []struct { + name string + ip string + startTime time.Time + expectedResult *v1.Pod + }{ + { + name: "no Pod in the Pod store", + ip: "1.3.5.7", + startTime: refTime, + expectedResult: nil, + }, + { + name: "find only one Pod in the Pod store - correct startTime", + ip: "5.6.7.8", + startTime: refTime.Add(-time.Minute), + expectedResult: pod2, + }, + { + name: "find only one Pod in the Pod store - incorrect startTime", + ip: "5.6.7.8", + startTime: refTime.Add(time.Minute), + expectedResult: pod2, + }, + { + name: "find current Pod in the Pod store", + ip: "1.2.3.4", + startTime: refTime.Add(time.Minute), + expectedResult: pod1, + }, + { + name: "find previous Pod in the Pod store", + ip: "1.2.3.4", + startTime: refTime.Add(-time.Minute), + expectedResult: pod4, + }, + { + name: "cannot find the Pod in the Pod store - SearchTime < CreationTime", + ip: "1.2.3.4", + startTime: refTime.Add(-time.Minute * 10), + expectedResult: nil, + }, + { + name: "cannot find the Pod in the Pod store - SearchTime > DeletionTime", + ip: "1.2.3.4", + startTime: refTime, + expectedResult: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + k8sClient := fake.NewSimpleClientset() + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := NewPodStore(podInformer) + require.NoError(t, podStore.pods.Add(pod1)) + require.NoError(t, podStore.pods.Add(pod2)) + require.NoError(t, podStore.pods.Add(pod4)) + podStore.timestampMap = timestampMap + pod, ok := podStore.GetPodByIPAndTime(tt.ip, tt.startTime) + if tt.expectedResult == nil { + assert.False(t, ok) + } else { + assert.True(t, ok) + assert.Equal(t, tt.expectedResult, pod) + } + }) + } +} + +func Test_processDeleteQueueItem(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + podStore := &PodStore{ + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + podsToDelete: workqueue.NewDelayingQueueWithCustomClock(fakeClock, deleteQueueName), + timestampMap: map[types.UID]*podTimestamps{"pod1": {}}, + } + require.NoError(t, podStore.pods.Add(pod1)) + podStore.podsToDelete.Add(pod1) + result := podStore.processDeleteQueueItem() + require.Equal(t, true, result) + assert.Equal(t, 0, podStore.podsToDelete.Len()) + assert.Len(t, podStore.pods.List(), 0) + assert.Len(t, podStore.timestampMap, 0) +} + +func Test_podKeyFunc(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedResult string + expectedErr string + }{ + { + name: "object is not Pod", + obj: node, + expectedErr: "obj is not Pod: ", + }, + { + name: "valid case", + obj: pod1, + expectedResult: "pod1", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := podKeyFunc(tt.obj) + if tt.expectedErr != "" { + assert.ErrorContains(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expectedResult, got) + } + }) + } +} + +func Test_podIPIndexFunc(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedResult []string + expectedErr string + }{ + { + name: "object is not Pod", + obj: node, + expectedErr: "obj is not Pod:", + }, + { + name: "valid case", + obj: pod1, + expectedResult: []string{"1.2.3.4"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := podIPIndexFunc(tt.obj) + if tt.expectedErr != "" { + assert.ErrorContains(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + assert.ElementsMatch(t, tt.expectedResult, got) + } + }) + } +} + +/* +Sample output: +goos: darwin +goarch: amd64 +pkg: antrea.io/antrea/pkg/util/podstore +cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +BenchmarkGetPodByIPAndTime +BenchmarkGetPodByIPAndTime/input_size_100 +BenchmarkGetPodByIPAndTime/input_size_100-12 2232166 538.6 ns/op + podstore_test.go:465: + Summary: + Number of initial Pods: 100 + Total times of calling GetPodByIPAndTime: 3242267 + Total times of successfully finding Pod in podStore: 3242267 +BenchmarkGetPodByIPAndTime/input_size_1000 +BenchmarkGetPodByIPAndTime/input_size_1000-12 2238074 551.0 ns/op + podstore_test.go:465: + Summary: + Number of initial Pods: 1000 + Total times of calling GetPodByIPAndTime: 3248175 + Total times of successfully finding Pod in podStore: 3248175 +BenchmarkGetPodByIPAndTime/input_size_10000 +BenchmarkGetPodByIPAndTime/input_size_10000-12 1000000 1043 ns/op + podstore_test.go:465: + Summary: + Number of initial Pods: 10000 + Total times of calling GetPodByIPAndTime: 1010101 + Total times of successfully finding Pod in podStore: 1010101 +PASS +*/ + +func BenchmarkGetPodByIPAndTime(b *testing.B) { + var PodNumber = []struct { + input int + }{ + {input: 100}, + {input: 1000}, + {input: 10000}, + } + for _, v := range PodNumber { + success := 0 + total := 0 + k8sClient := fake.NewSimpleClientset() + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := NewPodStore(podInformer) + stopCh := make(chan struct{}) + go podInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, podInformer.HasSynced) + podArray, err := addPods(v.input, k8sClient) + if err != nil { + b.Fatalf("error when adding Pods: %v", err) + } + assert.Eventuallyf(b, func() bool { + return len(podInformer.GetIndexer().List()) == v.input + }, 1*time.Second, 10*time.Millisecond, "Pods should be added to podInformer") + errChan := make(chan error) + go func() { + err = deletePodsK8s(podArray, k8sClient) + if err != nil { + errChan <- err + return + } + close(errChan) + }() + b.Run(fmt.Sprintf("input_size_%d", v.input), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + randomPod := podArray[rand.Intn(v.input)] + creationTime := podStore.timestampMap[randomPod.UID].CreationTimestamp + _, ok := podStore.GetPodByIPAndTime(randomPod.Status.PodIPs[0].IP, creationTime.Add(time.Millisecond)) + total++ + if ok { + success++ + } + } + }) + close(stopCh) + err = <-errChan + if err != nil { + b.Fatalf("error when deleting Pods: %v", err) + } + b.Logf("\nSummary:\nNumber of initial Pods: %d\nTotal times of calling GetPodByIPAndTime: %d\nTotal times of successfully finding Pod in podStore: %d\n", v.input, total, success) + } +} + +func deletePodsK8s(pods []*v1.Pod, k8sClient kubernetes.Interface) error { + for _, pod := range pods { + err := k8sClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("error when deleting Pods through k8s api: %v", err) + } + // the channel will be full if no sleep time. + time.Sleep(time.Millisecond) + } + return nil +} + +func addPods(number int, k8sClient kubernetes.Interface) ([]*v1.Pod, error) { + var podArray []*v1.Pod + for i := 0; i < number; i++ { + pod := generatePod() + _, err := k8sClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("error when adding Pods through k8s api: %v", err) + } + // the channel will be full if no sleep time. + time.Sleep(time.Millisecond) + podArray = append(podArray, pod) + } + return podArray, nil +} + +func generatePod() *v1.Pod { + ip := getRandomIP() + uid := uuid.New().String() + startTime := rand.Intn(360000000) + creationTime := refTime.Add(time.Duration(startTime)) + deletionTime := creationTime.Add(time.Hour) + pod := &v1.Pod{ + Status: v1.PodStatus{ + PodIPs: []v1.PodIP{ + { + IP: ip, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: creationTime}, + DeletionTimestamp: &metav1.Time{Time: deletionTime}, + Name: "pod-" + uid, + Namespace: "pod_ns", + UID: types.UID(uid), + }, + } + return pod +} + +func getRandomIP() string { + return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256)) +} diff --git a/pkg/util/podstore/testing/mock_podstore.go b/pkg/util/podstore/testing/mock_podstore.go new file mode 100644 index 00000000000..42d7ce9c510 --- /dev/null +++ b/pkg/util/podstore/testing/mock_podstore.go @@ -0,0 +1,77 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: antrea.io/antrea/pkg/util/podstore (interfaces: Interface) + +// Package testing is a generated GoMock package. +package testing + +import ( + gomock "github.com/golang/mock/gomock" + v1 "k8s.io/api/core/v1" + reflect "reflect" + time "time" +) + +// MockInterface is a mock of Interface interface +type MockInterface struct { + ctrl *gomock.Controller + recorder *MockInterfaceMockRecorder +} + +// MockInterfaceMockRecorder is the mock recorder for MockInterface +type MockInterfaceMockRecorder struct { + mock *MockInterface +} + +// NewMockInterface creates a new mock instance +func NewMockInterface(ctrl *gomock.Controller) *MockInterface { + mock := &MockInterface{ctrl: ctrl} + mock.recorder = &MockInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { + return m.recorder +} + +// GetPodByIPAndTime mocks base method +func (m *MockInterface) GetPodByIPAndTime(arg0 string, arg1 time.Time) (*v1.Pod, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPodByIPAndTime", arg0, arg1) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetPodByIPAndTime indicates an expected call of GetPodByIPAndTime +func (mr *MockInterfaceMockRecorder) GetPodByIPAndTime(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodByIPAndTime", reflect.TypeOf((*MockInterface)(nil).GetPodByIPAndTime), arg0, arg1) +} + +// Run mocks base method +func (m *MockInterface) Run(arg0 <-chan struct{}) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Run", arg0) +} + +// Run indicates an expected call of Run +func (mr *MockInterfaceMockRecorder) Run(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockInterface)(nil).Run), arg0) +} diff --git a/test/integration/agent/flowexporter_test.go b/test/integration/agent/flowexporter_test.go index a19a3ffc763..0885ba803e4 100644 --- a/test/integration/agent/flowexporter_test.go +++ b/test/integration/agent/flowexporter_test.go @@ -26,15 +26,17 @@ import ( mock "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/connections" connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" - "antrea.io/antrea/pkg/agent/interfacestore" - interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/util/sysctl" queriertest "antrea.io/antrea/pkg/querier/testing" + podstoretest "antrea.io/antrea/pkg/util/podstore/testing" ) const ( @@ -82,18 +84,23 @@ func createConnsForTest() ([]*flowexporter.Connection, []*flowexporter.Connectio return testConns, testConnKeys } -func prepareInterfaceConfigs(contID, podName, podNS, ifName string, ip *net.IP) *interfacestore.InterfaceConfig { - podConfig := &interfacestore.ContainerInterfaceConfig{ - ContainerID: contID, - PodName: podName, - PodNamespace: podNS, +func preparePodInformation(podName string, podNS string, ip *net.IP) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: podNS, + Name: podName, + UID: types.UID(podName), + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + PodIPs: []v1.PodIP{ + { + IP: ip.String(), + }, + }, + }, } - iface := &interfacestore.InterfaceConfig{ - InterfaceName: ifName, - IPs: []net.IP{*ip}, - ContainerInterfaceConfig: podConfig, - } - return iface + return pod } // TestConnectionStoreAndFlowRecords covers two scenarios: (i.) Add connections to connection store through connectionStore.Poll @@ -102,14 +109,15 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) { // Test setup ctrl := mock.NewController(t) - // Prepare connections and interface config for test + // Prepare connections and pod store for test testConns, testConnKeys := createConnsForTest() - testIfConfigs := make([]*interfacestore.InterfaceConfig, 2) - testIfConfigs[0] = prepareInterfaceConfigs("1", "pod1", "ns1", "interface1", &testConns[0].FlowKey.SourceAddress) - testIfConfigs[1] = prepareInterfaceConfigs("2", "pod2", "ns2", "interface2", &testConns[1].FlowKey.DestinationAddress) + testPods := make([]*v1.Pod, 2) + testPods[0] = preparePodInformation("pod1", "ns1", &testConns[0].FlowKey.SourceAddress) + testPods[1] = preparePodInformation("pod2", "ns2", &testConns[1].FlowKey.DestinationAddress) + // Create connectionStore, FlowRecords and associated mocks connDumperMock := connectionstest.NewMockConnTrackDumper(ctrl) - ifStoreMock := interfacestoretest.NewMockInterfaceStore(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) // TODO: Enhance the integration test by testing service. o := &flowexporter.FlowExporterOptions{ @@ -117,17 +125,17 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) { IdleFlowTimeout: testIdleFlowTimeout, StaleConnectionTimeout: testStaleConnectionTimeout, PollInterval: testPollInterval} - conntrackConnStore := connections.NewConntrackConnectionStore(connDumperMock, true, false, npQuerier, ifStoreMock, nil, o) + conntrackConnStore := connections.NewConntrackConnectionStore(connDumperMock, true, false, npQuerier, mockPodStore, nil, o) // Expect calls for connStore.poll and other callees connDumperMock.EXPECT().DumpFlows(uint16(openflow.CtZone)).Return(testConns, 0, nil) connDumperMock.EXPECT().GetMaxConnections().Return(0, nil) for i, testConn := range testConns { if i == 0 { - ifStoreMock.EXPECT().GetInterfaceByIP(testConn.FlowKey.SourceAddress.String()).Return(testIfConfigs[i], true) - ifStoreMock.EXPECT().GetInterfaceByIP(testConn.FlowKey.DestinationAddress.String()).Return(nil, false) + mockPodStore.EXPECT().GetPodByIPAndTime(testConn.FlowKey.SourceAddress.String(), mock.Any()).Return(testPods[i], true) + mockPodStore.EXPECT().GetPodByIPAndTime(testConn.FlowKey.DestinationAddress.String(), mock.Any()).Return(nil, false) } else { - ifStoreMock.EXPECT().GetInterfaceByIP(testConn.FlowKey.SourceAddress.String()).Return(nil, false) - ifStoreMock.EXPECT().GetInterfaceByIP(testConn.FlowKey.DestinationAddress.String()).Return(testIfConfigs[i], true) + mockPodStore.EXPECT().GetPodByIPAndTime(testConn.FlowKey.SourceAddress.String(), mock.Any()).Return(nil, false) + mockPodStore.EXPECT().GetPodByIPAndTime(testConn.FlowKey.DestinationAddress.String(), mock.Any()).Return(testPods[i], true) } } // Execute connStore.Poll @@ -139,11 +147,11 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) { // Check if connections in connectionStore are same as testConns or not for i, expConn := range testConns { if i == 0 { - expConn.SourcePodName = testIfConfigs[i].PodName - expConn.SourcePodNamespace = testIfConfigs[i].PodNamespace + expConn.SourcePodName = testPods[i].ObjectMeta.Name + expConn.SourcePodNamespace = testPods[i].ObjectMeta.Namespace } else { - expConn.DestinationPodName = testIfConfigs[i].PodName - expConn.DestinationPodNamespace = testIfConfigs[i].PodNamespace + expConn.DestinationPodName = testPods[i].ObjectMeta.Name + expConn.DestinationPodNamespace = testPods[i].ObjectMeta.Name } actualConn, found := conntrackConnStore.GetConnByKey(*testConnKeys[i]) assert.Equal(t, found, true, "testConn should be present in connection store")