From fb5d16a9718db065c9875e20e62081265dfe776a Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Mon, 10 Jul 2023 19:59:06 -0700 Subject: [PATCH] Address comments Signed-off-by: Yun-Tang Hsu --- cmd/antrea-agent/agent.go | 14 +- cmd/flow-aggregator/flow-aggregator.go | 5 +- .../flowexporter/connections/connections.go | 4 +- .../connections/connections_test.go | 9 +- .../connections/conntrack_connections.go | 2 +- .../conntrack_connections_perf_test.go | 19 +- .../connections/conntrack_connections_test.go | 25 +- .../connections/deny_connections.go | 2 +- .../connections/deny_connections_test.go | 4 +- pkg/agent/flowexporter/exporter/exporter.go | 4 +- pkg/flowaggregator/flowaggregator.go | 4 +- pkg/flowaggregator/flowaggregator_test.go | 49 +-- pkg/util/podstore/podstore.go | 103 +++--- pkg/util/podstore/podstore_test.go | 296 ++++++++++++++---- test/integration/agent/flowexporter_test.go | 25 +- 15 files changed, 404 insertions(+), 161 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 2d70695c248..892cfbd5481 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -139,6 +139,7 @@ func run(o *Options) error { l7NetworkPolicyEnabled := features.DefaultFeatureGate.Enabled(features.L7NetworkPolicy) enableMulticlusterGW := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableGateway enableMulticlusterNP := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableStretchedNetworkPolicy + enableFLowExporter := features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable // Bridging mode will connect the uplink interface to the OVS bridge. connectUplinkToBridge := enableBridgingMode @@ -152,7 +153,7 @@ func run(o *Options) error { features.DefaultFeatureGate.Enabled(features.AntreaPolicy), l7NetworkPolicyEnabled, o.enableEgress, - features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable, + enableFLowExporter, o.config.AntreaProxy.ProxyAll, connectUplinkToBridge, multicastEnabled, @@ -311,10 +312,9 @@ func run(o *Options) error { // Initialize localPodInformer for NPLAgent, AntreaIPAMController, // StretchedNetworkPolicyController, and secondary network controller. var localPodInformer cache.SharedIndexInformer - if enableNodePortLocal || enableBridgingMode || enableMulticlusterNP || + if enableNodePortLocal || enableBridgingMode || enableMulticlusterNP || enableFLowExporter || features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) || - features.DefaultFeatureGate.Enabled(features.TrafficControl) || - (features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable) { + features.DefaultFeatureGate.Enabled(features.TrafficControl) { listOptions := func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String() } @@ -596,8 +596,8 @@ func run(o *Options) error { } var flowExporter *exporter.FlowExporter - if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable { - podStore := podstore.NewPodStorage(localPodInformer) + if enableFLowExporter { + podStore := podstore.NewPodStore(localPodInformer) flowExporterOptions := &flowexporter.FlowExporterOptions{ FlowCollectorAddr: o.flowCollectorAddr, FlowCollectorProto: o.flowCollectorProto, @@ -872,7 +872,7 @@ func run(o *Options) error { go ofClient.StartPacketInHandler(stopCh) // Start the goroutine to periodically export IPFIX flow records. - if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable { + if enableFLowExporter { go flowExporter.Run(stopCh) } diff --git a/cmd/flow-aggregator/flow-aggregator.go b/cmd/flow-aggregator/flow-aggregator.go index 366f28a946f..1d17f1ae0cd 100644 --- a/cmd/flow-aggregator/flow-aggregator.go +++ b/cmd/flow-aggregator/flow-aggregator.go @@ -50,15 +50,14 @@ func run(configFile string) error { return fmt.Errorf("error when creating K8s client: %v", err) } - podInformer := coreinformers.NewFilteredPodInformer( + podInformer := coreinformers.NewPodInformer( k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, - nil, ) - podStore := podstore.NewPodStorage(podInformer) + podStore := podstore.NewPodStore(podInformer) flowAggregator, err := aggregator.NewFlowAggregator( k8sClient, diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index 963429495ec..b2d2e9543cf 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -34,7 +34,7 @@ const ( type connectionStore struct { connections map[flowexporter.ConnectionKey]*flowexporter.Connection - podStore *podstore.PodStorage + podStore *podstore.PodStore antreaProxier proxy.Proxier expirePriorityQueue *priorityqueue.ExpirePriorityQueue staleConnectionTimeout time.Duration @@ -42,7 +42,7 @@ type connectionStore struct { } func NewConnectionStore( - podStore *podstore.PodStorage, + podStore *podstore.PodStore, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) connectionStore { return connectionStore{ diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index 03d202bc095..51490312eb1 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -38,7 +38,6 @@ const ( testIdleFlowTimeout = 1 * time.Second testPollInterval = 0 // Not used in these tests, hence 0. testStaleConnectionTimeout = 5 * time.Minute - informerDefaultResync = 12 * time.Hour ) var testFlowExporterOptions = &flowexporter.FlowExporterOptions{ @@ -85,8 +84,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { } // Create connectionStore k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) connStore := NewConnectionStore(podStore, nil, testFlowExporterOptions) // Add flows to the Connection store for i, flow := range testFlows { @@ -113,8 +112,8 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) { metrics.InitializeConnectionMetrics() // test on deny connection store k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) denyConnStore := NewDenyConnectionStore(podStore, nil, testFlowExporterOptions) tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} conn := &flowexporter.Connection{ diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index e46e0433d87..b364c831a95 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -53,7 +53,7 @@ func NewConntrackConnectionStore( v4Enabled bool, v6Enabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, - podStore *podstore.PodStorage, + podStore *podstore.PodStore, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions, ) *ConntrackConnectionStore { diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index 4040eda93dd..00a2ddc73a8 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -18,6 +18,7 @@ package connections import ( + "context" "crypto/rand" "flag" "fmt" @@ -27,9 +28,11 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" @@ -82,17 +85,17 @@ func setupConntrackConnStore(b *testing.B, connections []*flowexporter.Connectio ctrl := gomock.NewController(b) defer ctrl.Finish() k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) stopCh := make(chan struct{}) defer close(stopCh) go podInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "pod-ns", - Name: "pod", - CreationTimestamp: metav1.Time{Time: time.Now().Add(-255 * time.Second)}, + Namespace: "pod-ns", + Name: "pod", + UID: "pod", }, Status: v1.PodStatus{ Phase: v1.PodPending, @@ -101,7 +104,11 @@ func setupConntrackConnStore(b *testing.B, connections []*flowexporter.Connectio for _, conn := range connections { pod.Status.PodIPs = append(pod.Status.PodIPs, v1.PodIP{IP: conn.FlowKey.SourceAddress.String()}, v1.PodIP{IP: conn.FlowKey.DestinationAddress.String()}) } - podInformer.GetIndexer().Add(pod) + k8sClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + err := wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return len(podInformer.GetIndexer().List()) == 1, nil + }) + require.NoError(b, err, "Pod should be added to podInformer") mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) mockConnDumper.EXPECT().GetMaxConnections().Return(100000, nil).AnyTimes() diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_test.go index eb4004cfdee..9444106d341 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_test.go @@ -15,6 +15,7 @@ package connections import ( + "context" "encoding/binary" "fmt" "net" @@ -29,10 +30,12 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "k8s.io/component-base/metrics/legacyregistry" + clock "k8s.io/utils/clock/testing" "antrea.io/antrea/pkg/agent/flowexporter" connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" @@ -223,15 +226,21 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + fakeClock := clock.NewFakeClock(refTime) + podStore := podstore.NewPodStoreWithClock(podInformer, fakeClock) conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, podStore, mockProxier, testFlowExporterOptions) stopCh := make(chan struct{}) defer close(stopCh) go podInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) - pod1.CreationTimestamp = metav1.Time{Time: refTime.Add(-(time.Second * 60))} - podInformer.GetIndexer().Add(pod1) + fakeClock.SetTime(refTime.Add(-(time.Second * 60))) + k8sClient.CoreV1().Pods(pod1.Namespace).Create(context.TODO(), pod1, metav1.CreateOptions{}) + err := wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return len(podInformer.GetIndexer().List()) == 1, nil + }) + require.NoError(t, err, "Pod should be added to podInformer") + fakeClock.SetTime(refTime) for _, c := range tc { t.Run(c.name, func(t *testing.T) { @@ -310,8 +319,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { // Create connectionStore stopCh := make(chan struct{}) k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) defer close(stopCh) go podInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) @@ -337,8 +346,8 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) { testFlows := make([]*flowexporter.Connection, 0) // Create connectionStore k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) stopCh := make(chan struct{}) defer close(stopCh) go podInformer.Run(stopCh) diff --git a/pkg/agent/flowexporter/connections/deny_connections.go b/pkg/agent/flowexporter/connections/deny_connections.go index 16f4e1e6859..bcbc3c49df1 100644 --- a/pkg/agent/flowexporter/connections/deny_connections.go +++ b/pkg/agent/flowexporter/connections/deny_connections.go @@ -32,7 +32,7 @@ type DenyConnectionStore struct { connectionStore } -func NewDenyConnectionStore(podStore *podstore.PodStorage, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore { +func NewDenyConnectionStore(podStore *podstore.PodStore, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore { return &DenyConnectionStore{ connectionStore: NewConnectionStore(podStore, proxier, o), } diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index d9cc7892835..6f19b43cdb9 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -65,8 +65,8 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { IsActive: true, } k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) stopCh := make(chan struct{}) defer close(stopCh) go podInformer.Run(stopCh) diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index c4e8b324be7..2e1094a1b7a 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -129,7 +129,7 @@ type FlowExporter struct { denyPriorityQueue *priorityqueue.ExpirePriorityQueue expiredConns []flowexporter.Connection egressQuerier querier.EgressQuerier - podStore *podstore.PodStorage + podStore *podstore.PodStore } func genObservationID(nodeName string) uint32 { @@ -154,7 +154,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter return expInput } -func NewFlowExporter(podStore *podstore.PodStorage, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller, +func NewFlowExporter(podStore *podstore.PodStore, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller, trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet, ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions, egressQuerier querier.EgressQuerier) (*FlowExporter, error) { diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 574985ddf87..b96d47dc19e 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -109,7 +109,7 @@ type flowAggregator struct { flowAggregatorAddress string includePodLabels bool k8sClient kubernetes.Interface - podStore *podstore.PodStorage + podStore *podstore.PodStore numRecordsExported int64 updateCh chan *options.Options configFile string @@ -125,7 +125,7 @@ type flowAggregator struct { func NewFlowAggregator( k8sClient kubernetes.Interface, - podStore *podstore.PodStorage, + podStore *podstore.PodStore, configFile string, ) (*flowAggregator, error) { if len(configFile) == 0 { diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 777620cad87..ae2debd0727 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -16,6 +16,7 @@ package flowaggregator import ( "bytes" + "context" "os" "path/filepath" "strconv" @@ -34,6 +35,7 @@ import ( "gopkg.in/yaml.v2" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -69,8 +71,8 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) newFlowAggregator := func(includePodLabels bool) *flowAggregator { return &flowAggregator{ @@ -451,8 +453,8 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) { func TestFlowAggregator_Run(t *testing.T) { ctrl := gomock.NewController(t) k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) mockS3Exporter := exportertesting.NewMockInterface(ctrl) @@ -673,7 +675,6 @@ func TestFlowAggregator_closeUpdateChBeforeFlowExportLoopReturns(t *testing.T) { } func TestFlowAggregator_fetchPodLabels(t *testing.T) { - timeNow := time.Now() pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", @@ -681,7 +682,6 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) { Labels: map[string]string{ "test": "ut", }, - CreationTimestamp: metav1.Time{Time: timeNow}, }, Status: v1.PodStatus{ Phase: v1.PodPending, @@ -694,31 +694,32 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) { } k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) stopCh := make(chan struct{}) defer close(stopCh) go podInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) - podInformer.GetIndexer().Add(pod) + k8sClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + err := wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return len(podInformer.GetIndexer().List()) == 1, nil + }) + require.NoError(t, err, "Pod should be added to podInformer") tests := []struct { name string ip string - time time.Time want string }{ { name: "no pod object", ip: "", - time: timeNow, want: "", }, { name: "pod with label", ip: "192.168.1.2", - time: timeNow, want: "{\"test\":\"ut\"}", }, } @@ -730,7 +731,7 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) { includePodLabels: true, podStore: podStore, } - got := fa.fetchPodLabels(tt.ip, tt.time) + got := fa.fetchPodLabels(tt.ip, time.Now()) assert.Equal(t, tt.want, got) }) } @@ -815,9 +816,8 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { timeNow := time.Now() srcPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "sourcePod", - CreationTimestamp: metav1.Time{Time: timeNow}, + Namespace: "default", + Name: "sourcePod", }, Spec: v1.PodSpec{ NodeName: "sourceNode", @@ -832,9 +832,8 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { } dstPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "destinationPod", - CreationTimestamp: metav1.Time{Time: timeNow}, + Namespace: "default", + Name: "destinationPod", }, Spec: v1.PodSpec{ NodeName: "destinationNode", @@ -864,16 +863,20 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { ctrl := gomock.NewController(t) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) stopCh := make(chan struct{}) defer close(stopCh) go podInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) - podInformer.GetIndexer().Add(srcPod) - podInformer.GetIndexer().Add(dstPod) + k8sClient.CoreV1().Pods(srcPod.Namespace).Create(context.TODO(), srcPod, metav1.CreateOptions{}) + k8sClient.CoreV1().Pods(dstPod.Namespace).Create(context.TODO(), dstPod, metav1.CreateOptions{}) + err = wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return len(podInformer.GetIndexer().List()) == 2, nil + }) + require.NoError(t, err, "Pod should be added to podInformer") ipv4Key := ipfixintermediate.FlowKey{ SourceAddress: "192.168.1.2", diff --git a/pkg/util/podstore/podstore.go b/pkg/util/podstore/podstore.go index 42041e3a84b..a3a333d0039 100644 --- a/pkg/util/podstore/podstore.go +++ b/pkg/util/podstore/podstore.go @@ -16,14 +16,15 @@ package podstore import ( "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" ) const ( @@ -32,50 +33,79 @@ const ( DelayTime = time.Minute * 5 ) -// for testing -var delayTime = DelayTime - -type PodStorage struct { - curPod cache.Indexer - prevPod cache.Indexer - // Could be other lighter implementations +type PodStore struct { + curPod cache.Indexer + prevPod cache.Indexer podsToDelete workqueue.DelayingInterface + // Mapping pod.uuid to LocalClock + timestampMap map[string]*LocalClock + clock clock.WithTicker + mutex sync.Mutex +} + +type LocalClock struct { + // The local clock when the Pod is added + CreationTimestamp time.Time + // The local clock when the Pod is actually deleted from kube-apiserver. Nil if it's not deleted yet. + DeletionTimestamp *time.Time } -func NewPodStorage(podInformer cache.SharedIndexInformer) *PodStorage { - s := &PodStorage{ +// NewPodStoreWithClock creates a Pod Store with a custom clock, +// which is useful when writing robust unit tests. +func NewPodStoreWithClock(podInformer cache.SharedIndexInformer, clock clock.WithTicker) *PodStore { + s := &PodStore{ curPod: podInformer.GetIndexer(), prevPod: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - podsToDelete: workqueue.NewNamedDelayingQueue(DeleteQueueName), + podsToDelete: workqueue.NewDelayingQueueWithCustomClock(clock, DeleteQueueName), + clock: clock, + timestampMap: map[string]*LocalClock{}, } podInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ + AddFunc: s.onPodCreate, DeleteFunc: s.onPodDelete, }) s.curPod.AddIndexers(cache.Indexers{PodIPIndex: podIPIndexFunc}) return s } -func (s *PodStorage) onPodDelete(obj interface{}) { +func NewPodStore(podInformer cache.SharedIndexInformer) *PodStore { + return NewPodStoreWithClock(podInformer, clock.RealClock{}) +} + +func (s *PodStore) onPodCreate(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + klog.ErrorS(nil, "Error while processing Create Event") + return + } + s.mutex.Lock() + defer s.mutex.Unlock() + timeNow := s.clock.Now() + s.timestampMap[string(pod.UID)] = &LocalClock{CreationTimestamp: timeNow} + klog.V(4).InfoS("Process Pod Create Event", "Pod", klog.KObj(pod)) +} + +func (s *PodStore) onPodDelete(obj interface{}) { pod, ok := obj.(*corev1.Pod) if !ok { var err error pod, err = s.checkDeletedPod(obj) if err != nil { - klog.ErrorS(err, "Got error while processing event delete") + klog.ErrorS(err, "Got error while processing Delete Event") return } } - if pod.GetDeletionTimestamp() == nil { - timeNow := time.Now() - pod.DeletionTimestamp = &metav1.Time{Time: timeNow} - } - klog.V(4).InfoS("pod timestamp", "pod name", pod.ObjectMeta.Name, "creationTime", pod.GetCreationTimestamp().String(), "deletionTime", pod.GetDeletionTimestamp().String()) + s.mutex.Lock() + defer s.mutex.Unlock() + timeNow := s.clock.Now() + s.timestampMap[string(pod.UID)].DeletionTimestamp = &timeNow + klog.V(4).InfoS("Process Pod Delete Event", "Pod", klog.KObj(pod)) s.prevPod.Add(pod) - s.podsToDelete.AddAfter(pod, delayTime) + s.podsToDelete.AddAfter(pod, DelayTime) } -func (s *PodStorage) checkDeletedPod(obj interface{}) (*corev1.Pod, error) { +func (s *PodStore) checkDeletedPod(obj interface{}) (*corev1.Pod, error) { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return nil, fmt.Errorf("received unexpected object: %v", obj) @@ -87,31 +117,35 @@ func (s *PodStorage) checkDeletedPod(obj interface{}) (*corev1.Pod, error) { return pod, nil } -func (s *PodStorage) GetPodByIPAndTime(ip string, startTime time.Time) (*corev1.Pod, bool) { +func (s *PodStore) GetPodByIPAndTime(ip string, startTime time.Time) (*corev1.Pod, bool) { // Query current Pod first + s.mutex.Lock() + defer s.mutex.Unlock() curPods, _ := s.curPod.ByIndex(PodIPIndex, ip) if len(curPods) != 0 { pod := curPods[0].(*corev1.Pod) - if pod.GetCreationTimestamp().UnixMilli() <= startTime.UnixMilli() { - klog.V(4).InfoS("Get pod info from current store", "pod name", pod.Name, "pod ns", pod.Namespace) + if s.timestampMap[string(pod.UID)].CreationTimestamp.Before(startTime) && s.timestampMap[string(pod.UID)].DeletionTimestamp == nil { + klog.V(4).InfoS("Get Pod info from current store", "Pod", klog.KObj(pod)) return pod, true } } + // Might not be able to find previous Pod in prePods if prevPod.ByIndex is called immediately after + // Delete Event happened. The clientState will delete the Pod in the informer before the DeleteFunc is called. prePods, _ := s.prevPod.ByIndex(PodIPIndex, ip) if len(prePods) == 0 { return nil, false } for _, prePod := range prePods { pod := prePod.(*corev1.Pod) - if pod.GetCreationTimestamp().UnixMilli() <= startTime.UnixMilli() && startTime.UnixMilli() < pod.GetDeletionTimestamp().UnixMilli() { - klog.V(4).InfoS("Get pod info from pre store", "pod name", pod.Name, "pod ns", pod.Namespace) + if s.timestampMap[string(pod.UID)].CreationTimestamp.Before(startTime) && startTime.Before(*s.timestampMap[string(pod.UID)].DeletionTimestamp) { + klog.V(4).InfoS("Get Pod info from pre store", "Pod", klog.KObj(pod)) return pod, true } } return nil, false } -func (s *PodStorage) RunWorker(stopCh <-chan struct{}) { +func (s *PodStore) RunWorker(stopCh <-chan struct{}) { defer s.podsToDelete.ShutDown() go wait.Until(s.worker, time.Second, stopCh) <-stopCh @@ -119,30 +153,27 @@ func (s *PodStorage) RunWorker(stopCh <-chan struct{}) { // worker runs a worker thread that just dequeues item from deleteQueue and // remove the item from prevPod. -func (s *PodStorage) worker() { +func (s *PodStore) worker() { for s.processDeleteQueueItem() { } } -func (s *PodStorage) processDeleteQueueItem() bool { +func (s *PodStore) processDeleteQueueItem() bool { pod, quit := s.podsToDelete.Get() if quit { return false } defer s.podsToDelete.Done(pod) - - if err := s.prevPod.Delete(pod); err != nil { - klog.ErrorS(err, "Unexpected error when trying to delete pod in pod store") - return true - } - klog.V(4).InfoS("Remove pod from pod store", "pod uid", pod.(*corev1.Pod).UID, "pod name", pod.(*corev1.Pod).Name, "pod namespace", pod.(*corev1.Pod).Namespace) + s.prevPod.Delete(pod) + delete(s.timestampMap, string(pod.(*corev1.Pod).UID)) + klog.V(4).InfoS("Remove Pod from Pod Store", "Pod", klog.KObj(pod.(*corev1.Pod))) return true } func podKeyFunc(obj interface{}) (string, error) { pod, ok := obj.(*corev1.Pod) if !ok { - return "", fmt.Errorf("obj is not pod: %+v", obj) + return "", fmt.Errorf("obj is not Pod: %+v", obj) } return string(pod.UID), nil } @@ -150,7 +181,7 @@ func podKeyFunc(obj interface{}) (string, error) { func podIPIndexFunc(obj interface{}) ([]string, error) { pod, ok := obj.(*corev1.Pod) if !ok { - return nil, fmt.Errorf("obj is not pod: %+v", obj) + return nil, fmt.Errorf("obj is not Pod: %+v", obj) } if len(pod.Status.PodIPs) > 0 { indexes := make([]string, len(pod.Status.PodIPs)) diff --git a/pkg/util/podstore/podstore_test.go b/pkg/util/podstore/podstore_test.go index 6aa810b2637..0a1cd3452de 100644 --- a/pkg/util/podstore/podstore_test.go +++ b/pkg/util/podstore/podstore_test.go @@ -15,15 +15,25 @@ package podstore import ( + "context" + "fmt" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + clock "k8s.io/utils/clock/testing" ) var ( @@ -37,10 +47,9 @@ var ( }, }, ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.Time{Time: refTime}, - Name: "pod1", - Namespace: "pod1_ns", - UID: "pod1", + Name: "pod1", + Namespace: "pod1_ns", + UID: "pod1", }, } pod2 = &v1.Pod{ @@ -52,22 +61,63 @@ var ( }, }, ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.Time{Time: refTime.Add(-5 * time.Minute)}, - DeletionTimestamp: &metav1.Time{Time: refTime}, - Name: "pod2", - Namespace: "pod2_ns", + Name: "pod2", + Namespace: "pod2_ns", + UID: "pod2", }, } + timestampMap = map[string]*LocalClock{ + "pod1": {CreationTimestamp: refTime}, + "pod2": {CreationTimestamp: refTime.Add(-5 * time.Minute), DeletionTimestamp: &refTime}, + } node = &v1.Node{} ) +func Test_onPodCreate(t *testing.T) { + podStore := &PodStore{ + timestampMap: map[string]*LocalClock{}, + clock: clock.NewFakeClock(refTime), + } + tests := []struct { + name string + podStore *PodStore + obj interface{} + expectedTimestampMapLen int + }{ + { + name: "object is not Pod", + podStore: podStore, + obj: node, + expectedTimestampMapLen: 0, + }, + { + name: "Valid case", + obj: pod1, + podStore: podStore, + expectedTimestampMapLen: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, 0, len(tt.podStore.timestampMap)) + podStore.onPodCreate(tt.obj) + assert.Equal(t, tt.expectedTimestampMapLen, len(tt.podStore.timestampMap)) + if tt.expectedTimestampMapLen > 0 { + assert.Equal(t, tt.podStore.clock.Now(), tt.podStore.timestampMap[string(tt.obj.(*v1.Pod).UID)].CreationTimestamp) + } + }) + } +} + func Test_onPodDelete(t *testing.T) { - podStore := PodStorage{ + fakeClock := clock.NewFakeClock(refTime) + podStore := PodStore{ curPod: nil, prevPod: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - podsToDelete: workqueue.NewNamedDelayingQueue(DeleteQueueName), + podsToDelete: workqueue.NewDelayingQueueWithCustomClock(fakeClock, DeleteQueueName), + timestampMap: map[string]*LocalClock{"pod1": {CreationTimestamp: refTime}}, + clock: clock.NewFakeClock(refTime), } - delayTime = 5 * time.Second tests := []struct { name string obj interface{} @@ -82,23 +132,17 @@ func Test_onPodDelete(t *testing.T) { expectedResultInPrev: nil, expectedResultInDelete: 0, }, - { - name: "Pod in prevPod but not in podsToDelete", - obj: pod1, - timeWait: 1 * time.Second, - expectedResultInPrev: pod1, - expectedResultInDelete: 0, - }, { name: "Pod in prevPod and in podsToDelete", obj: pod1, - timeWait: 6 * time.Second, + timeWait: 5 * time.Minute, expectedResultInPrev: pod1, expectedResultInDelete: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + expectedDeleteTime := refTime.Add(tt.timeWait) podStore.onPodDelete(tt.obj) prePods, _ := podStore.prevPod.ByIndex(PodIPIndex, "1.2.3.4") if tt.expectedResultInPrev == nil { @@ -106,18 +150,25 @@ func Test_onPodDelete(t *testing.T) { } else { prePod := prePods[0].(*v1.Pod) assert.Equal(t, tt.expectedResultInPrev, prePod) - assert.LessOrEqual(t, refTime.Unix(), prePod.DeletionTimestamp.Unix()) + assert.NotNil(t, podStore.timestampMap[string(prePod.UID)].DeletionTimestamp) + + fakeClock.SetTime(expectedDeleteTime.Add(-10 * time.Millisecond)) + err := wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return podStore.podsToDelete.Len() == 0, nil + }) + require.NoError(t, err, "Pod should not be added to PodsToDelete") + fakeClock.SetTime(expectedDeleteTime.Add(10 * time.Millisecond)) + err = wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return podStore.podsToDelete.Len() > 0, nil + }) + require.NoError(t, err, "Pod is not added to PodsToDelete") } - podStore.podsToDelete.Len() - assert.Equal(t, 0, podStore.podsToDelete.Len()) - time.Sleep(tt.timeWait) - assert.Equal(t, tt.expectedResultInDelete, podStore.podsToDelete.Len()) }) } } func Test_checkDeletedPod(t *testing.T) { - podStore := PodStorage{} + podStore := PodStore{} tests := []struct { name string obj interface{} @@ -158,10 +209,11 @@ func Test_GetPodByIPAndTime(t *testing.T) { prevPodStore := cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}) curPodStore.Add(pod1) prevPodStore.Add(pod2) - podStorage := &PodStorage{ + podStore := &PodStore{ curPod: curPodStore, prevPod: prevPodStore, podsToDelete: nil, + timestampMap: timestampMap, } tests := []struct { @@ -203,48 +255,38 @@ func Test_GetPodByIPAndTime(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pod, _ := podStorage.GetPodByIPAndTime(tt.ip, tt.startTime) + pod, _ := podStore.GetPodByIPAndTime(tt.ip, tt.startTime) assert.Equal(t, tt.expectedResult, pod) }) } } func Test_processDeleteQueueItem(t *testing.T) { - prevPodStore := cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}) - podsToDelete := workqueue.NewNamedDelayingQueue(DeleteQueueName) - prevPodStore.Add(pod1) - + fakeClock := clock.NewFakeClock(time.Now()) tests := []struct { name string - podStorage *PodStorage + podStore *PodStore expectedResult bool shutdown bool }{ { name: "podsToDelete is shutdown", - podStorage: &PodStorage{ + podStore: &PodStore{ curPod: nil, prevPod: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - podsToDelete: workqueue.NewNamedDelayingQueue(DeleteQueueName), + podsToDelete: workqueue.NewDelayingQueueWithCustomClock(fakeClock, DeleteQueueName), + timestampMap: map[string]*LocalClock{"pod1": {}}, }, expectedResult: false, shutdown: true, }, { - name: "Can't find pod in prevPod", - podStorage: &PodStorage{ + name: "Delete pod in podsToDelete/prevPod/timestampMap", + podStore: &PodStore{ curPod: nil, prevPod: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - podsToDelete: podsToDelete, - }, - expectedResult: true, - }, - { - name: "Pod in both prevPod and podsToDelete", - podStorage: &PodStorage{ - curPod: nil, - prevPod: prevPodStore, - podsToDelete: podsToDelete, + podsToDelete: workqueue.NewDelayingQueueWithCustomClock(fakeClock, DeleteQueueName), + timestampMap: map[string]*LocalClock{"pod1": {}}, }, expectedResult: true, }, @@ -252,11 +294,17 @@ func Test_processDeleteQueueItem(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.shutdown { - tt.podStorage.podsToDelete.ShutDown() + tt.podStore.podsToDelete.ShutDown() } - tt.podStorage.podsToDelete.Add(pod1) - result := tt.podStorage.processDeleteQueueItem() + tt.podStore.prevPod.Add(pod1) + tt.podStore.podsToDelete.Add(pod1) + result := tt.podStore.processDeleteQueueItem() assert.Equal(t, tt.expectedResult, result) + if result { + assert.Equal(t, 0, tt.podStore.podsToDelete.Len()) + assert.Equal(t, 0, len(tt.podStore.prevPod.List())) + assert.Equal(t, 0, len(tt.podStore.timestampMap)) + } }) } } @@ -269,9 +317,9 @@ func Test_podKeyFunc(t *testing.T) { expectedErr string }{ { - name: "object is not pod", + name: "object is not Pod", obj: node, - expectedErr: "obj is not pod: ", + expectedErr: "obj is not Pod: ", }, { name: "Valid case", @@ -300,9 +348,9 @@ func Test_podIPIndexFunc(t *testing.T) { expectedErr string }{ { - name: "object is not pod", + name: "object is not Pod", obj: node, - expectedErr: "obj is not pod: ", + expectedErr: "obj is not Pod: ", }, { name: "Valid case", @@ -322,3 +370,145 @@ func Test_podIPIndexFunc(t *testing.T) { }) } } + +/* +Sample output: +goos: darwin +goarch: amd64 +pkg: antrea.io/antrea/pkg/util/podstore +cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +BenchmarkGetPodByIPAndTime +BenchmarkGetPodByIPAndTime/input_size_100 +BenchmarkGetPodByIPAndTime/input_size_100-12 3307392 348.6 ns/op + podstore_test.go:418: + Summary: + Number of initial Pods: 100 + Total times of calling GetPodByIPAndTime: 4317493 + Total times of successfully finding Pod in podStore: 4317491 +BenchmarkGetPodByIPAndTime/input_size_1000 +BenchmarkGetPodByIPAndTime/input_size_1000-12 3138345 412.3 ns/op + podstore_test.go:418: + Summary: + Number of initial Pods: 1000 + Total times of calling GetPodByIPAndTime: 4148446 + Total times of successfully finding Pod in podStore: 4148446 +BenchmarkGetPodByIPAndTime/input_size_10000 +BenchmarkGetPodByIPAndTime/input_size_10000-12 1503332 764.8 ns/op + podstore_test.go:418: + Summary: + Number of initial Pods: 10000 + Total times of calling GetPodByIPAndTime: 2513433 + Total times of successfully finding Pod in podStore: 2513433 +PASS +*/ + +func BenchmarkGetPodByIPAndTime(b *testing.B) { + var PodNumber = []struct { + input int + }{ + {input: 100}, + {input: 1000}, + {input: 10000}, + } + for _, v := range PodNumber { + success := 0 + total := 0 + k8sClient := fake.NewSimpleClientset() + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := NewPodStore(podInformer) + stopCh := make(chan struct{}) + go podInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, podInformer.HasSynced) + podArray, err := addPods(v.input, k8sClient) + if err != nil { + b.Fatalf("error when adding Pods: %v", err) + } + err = wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return len(podInformer.GetIndexer().List()) == v.input, nil + }) + require.NoError(b, err, "Pods should be added to podInformer") + errChan := make(chan error) + go func() { + err = deletePodsK8s(podArray, k8sClient) + if err != nil { + errChan <- err + return + } + close(errChan) + }() + b.Run(fmt.Sprintf("input_size_%d", v.input), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + randomPod := podArray[rand.Intn(v.input)] + creationTime := podStore.timestampMap[string(randomPod.UID)].CreationTimestamp + _, ok := podStore.GetPodByIPAndTime(randomPod.Status.PodIPs[0].IP, creationTime.Add(time.Millisecond)) + total++ + if ok { + success++ + } + } + }) + close(stopCh) + err = <-errChan + if err != nil { + b.Fatalf("error when deleting Pods: %v", err) + } + b.Logf("\nSummary:\nNumber of initial Pods: %d\nTotal times of calling GetPodByIPAndTime: %d\nTotal times of successfully finding Pod in podStore: %d\n", v.input, total, success) + } +} + +func deletePodsK8s(pods []*v1.Pod, k8sClient kubernetes.Interface) error { + for _, pod := range pods { + err := k8sClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("error when deleting Pods through k8s api: %v", err) + } + // the channel will be full if no sleep time. + time.Sleep(time.Millisecond) + } + return nil +} + +func addPods(number int, k8sClient kubernetes.Interface) ([]*v1.Pod, error) { + var podArray []*v1.Pod + for i := 0; i < number; i++ { + pod := generatePod() + _, err := k8sClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("error when adding Pods through k8s api: %v", err) + } + // the channel will be full if no sleep time. + time.Sleep(time.Millisecond) + podArray = append(podArray, pod) + } + return podArray, nil +} + +func generatePod() *v1.Pod { + ip := getRandomIP() + uid := uuid.New().String() + startTime := rand.Intn(360000000) + creationTime := refTime.Add(time.Duration(startTime)) + deletionTime := creationTime.Add(time.Hour) + pod := &v1.Pod{ + Status: v1.PodStatus{ + PodIPs: []v1.PodIP{ + { + IP: ip, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: creationTime}, + DeletionTimestamp: &metav1.Time{Time: deletionTime}, + Name: "pod-" + uid, + Namespace: "pod_ns", + UID: types.UID(uid), + }, + } + return pod +} + +func getRandomIP() string { + return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256)) +} diff --git a/test/integration/agent/flowexporter_test.go b/test/integration/agent/flowexporter_test.go index adcaa0cc05e..4158783bb5d 100644 --- a/test/integration/agent/flowexporter_test.go +++ b/test/integration/agent/flowexporter_test.go @@ -18,6 +18,7 @@ package agent import ( + "context" "fmt" "net" "testing" @@ -28,6 +29,7 @@ import ( "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" @@ -87,12 +89,11 @@ func createConnsForTest() ([]*flowexporter.Connection, []*flowexporter.Connectio return testConns, testConnKeys } -func preparePodInformation(podName string, podNS string, ip *net.IP, timeRef time.Time) *v1.Pod { +func preparePodInformation(podName string, podNS string, ip *net.IP) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: podNS, - Name: podName, - CreationTimestamp: metav1.Time{Time: timeRef}, + Namespace: podNS, + Name: podName, }, Status: v1.PodStatus{ Phase: v1.PodPending, @@ -114,18 +115,22 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) { // Prepare connections and pod store for test testConns, testConnKeys := createConnsForTest() - pod1 := preparePodInformation("pod1", "ns1", &testConns[0].FlowKey.SourceAddress, testConns[0].StartTime.Add(-(time.Second * 50))) - pod2 := preparePodInformation("pod2", "ns2", &testConns[1].FlowKey.DestinationAddress, testConns[1].StartTime.Add(-(time.Second * 50))) + pod1 := preparePodInformation("pod1", "ns1", &testConns[0].FlowKey.SourceAddress) + pod2 := preparePodInformation("pod2", "ns2", &testConns[1].FlowKey.DestinationAddress) k8sClient := fake.NewSimpleClientset() - podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}, nil) - podStore := podstore.NewPodStorage(podInformer) + podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) + podStore := podstore.NewPodStore(podInformer) stopCh := make(chan struct{}) defer close(stopCh) go podInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) - podInformer.GetIndexer().Add(pod1) - podInformer.GetIndexer().Add(pod2) + k8sClient.CoreV1().Pods(pod1.Namespace).Create(context.TODO(), pod1, metav1.CreateOptions{}) + k8sClient.CoreV1().Pods(pod2.Namespace).Create(context.TODO(), pod2, metav1.CreateOptions{}) + err := wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + return len(podInformer.GetIndexer().List()) == 2, nil + }) + require.NoError(t, err, "Pod should be added to podInformer") // Create connectionStore, FlowRecords and associated mocks connDumperMock := connectionstest.NewMockConnTrackDumper(ctrl)