diff --git a/cmd/flow-aggregator/flow-aggregator.go b/cmd/flow-aggregator/flow-aggregator.go index 1d17f1ae0cd..df61ce1e2ed 100644 --- a/cmd/flow-aggregator/flow-aggregator.go +++ b/cmd/flow-aggregator/flow-aggregator.go @@ -19,11 +19,9 @@ import ( "sync" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" aggregator "antrea.io/antrea/pkg/flowaggregator" @@ -50,14 +48,9 @@ func run(configFile string) error { return fmt.Errorf("error when creating K8s client: %v", err) } - podInformer := coreinformers.NewPodInformer( - k8sClient, - metav1.NamespaceAll, - informerDefaultResync, - cache.Indexers{}, - ) - - podStore := podstore.NewPodStore(podInformer) + informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync) + podInformer := informerFactory.Core().V1().Pods() + podStore := podstore.NewPodStore(podInformer.Informer()) flowAggregator, err := aggregator.NewFlowAggregator( k8sClient, @@ -88,7 +81,8 @@ func run(configFile string) error { return fmt.Errorf("error when creating flow aggregator API server: %v", err) } go apiServer.Run(stopCh) - go podInformer.Run(stopCh) + + informerFactory.Start(stopCh) <-stopCh klog.InfoS("Stopping flow aggregator") diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index 07cf32e1463..091693f3ca7 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -70,7 +70,7 @@ MOCKGEN_TARGETS=( "pkg/querier AgentNetworkPolicyInfoQuerier,AgentMulticastInfoQuerier,EgressQuerier testing" "pkg/flowaggregator/querier FlowAggregatorQuerier testing" "pkg/flowaggregator/s3uploader S3UploaderAPI testing" - "pkg/util/podstore StoreInterface testing" + "pkg/util/podstore Interface testing" "third_party/proxy Provider testing" ) diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index 45f134575d1..926d804cd90 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -34,7 +34,7 @@ const ( type connectionStore struct { connections map[flowexporter.ConnectionKey]*flowexporter.Connection - podStore podstore.StoreInterface + podStore podstore.Interface antreaProxier proxy.Proxier expirePriorityQueue *priorityqueue.ExpirePriorityQueue staleConnectionTimeout time.Duration @@ -42,7 +42,7 @@ type connectionStore struct { } func NewConnectionStore( - podStore podstore.StoreInterface, + podStore podstore.Interface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) connectionStore { return connectionStore{ diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index 0749597edb0..edfc4c68e8a 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -80,7 +80,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { testFlowKeys[i] = &connKey } // Create connectionStore - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) connStore := NewConnectionStore(mockPodStore, nil, testFlowExporterOptions) // Add flows to the Connection store for i, flow := range testFlows { @@ -106,7 +106,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) { ctrl := gomock.NewController(t) metrics.InitializeConnectionMetrics() // test on deny connection store - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) denyConnStore := NewDenyConnectionStore(mockPodStore, nil, testFlowExporterOptions) tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} conn := &flowexporter.Connection{ diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index 2a7880405ea..1f729ff062d 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -53,7 +53,7 @@ func NewConntrackConnectionStore( v4Enabled bool, v6Enabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, - podStore podstore.StoreInterface, + podStore podstore.Interface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions, ) *ConntrackConnectionStore { diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index 4dc5c8c75e9..f7304b49c89 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -78,7 +78,7 @@ func BenchmarkPoll(b *testing.B) { func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connectionstest.MockConnTrackDumper) { ctrl := gomock.NewController(b) defer ctrl.Finish() - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "pod-ns", diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_test.go index 7c3a4af3f56..65214459a37 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_test.go @@ -216,7 +216,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { }, } - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockProxier := proxytest.NewMockProxier(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) @@ -241,7 +241,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { } // testAddNewConn tests podInfo, Services, network policy mapping. -func testAddNewConn(mockPodStore *podstoretest.MockStoreInterface, mockProxier *proxytest.MockProxier, npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier, conn flowexporter.Connection) { +func testAddNewConn(mockPodStore *podstoretest.MockInterface, mockProxier *proxytest.MockProxier, npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier, conn flowexporter.Connection) { mockPodStore.EXPECT().GetPodByIPAndTime(conn.FlowKey.SourceAddress.String(), gomock.Any()).Return(nil, false) mockPodStore.EXPECT().GetPodByIPAndTime(conn.FlowKey.DestinationAddress.String(), gomock.Any()).Return(pod1, true) @@ -301,7 +301,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { // For testing purposes, set the metric metrics.TotalAntreaConnectionsInConnTrackTable.Set(float64(len(testFlows))) // Create connectionStore - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) connStore := NewConntrackConnectionStore(nil, true, false, nil, mockPodStore, nil, testFlowExporterOptions) // Add flows to the connection store. for i, flow := range testFlows { @@ -323,7 +323,7 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) { testFlows := make([]*flowexporter.Connection, 0) // Create connectionStore - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockPodStore, nil, testFlowExporterOptions) // Hard-coded conntrack occupancy metrics for test diff --git a/pkg/agent/flowexporter/connections/deny_connections.go b/pkg/agent/flowexporter/connections/deny_connections.go index 1234e0806a6..0f60ff0daeb 100644 --- a/pkg/agent/flowexporter/connections/deny_connections.go +++ b/pkg/agent/flowexporter/connections/deny_connections.go @@ -32,7 +32,7 @@ type DenyConnectionStore struct { connectionStore } -func NewDenyConnectionStore(podStore podstore.StoreInterface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore { +func NewDenyConnectionStore(podStore podstore.Interface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore { return &DenyConnectionStore{ connectionStore: NewConnectionStore(podStore, proxier, o), } diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index 9d163566cca..c3a7cb3cb9e 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -60,7 +60,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { OriginalPackets: uint64(1), IsActive: true, } - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockProxier := proxytest.NewMockProxier(ctrl) protocol, _ := lookupServiceProtocol(tuple.Protocol) serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol) diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 4820668bc9c..fb853f895c5 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -129,7 +129,7 @@ type FlowExporter struct { denyPriorityQueue *priorityqueue.ExpirePriorityQueue expiredConns []flowexporter.Connection egressQuerier querier.EgressQuerier - podStore podstore.StoreInterface + podStore podstore.Interface } func genObservationID(nodeName string) uint32 { @@ -154,7 +154,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter return expInput } -func NewFlowExporter(podStore podstore.StoreInterface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller, +func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller, trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet, ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions, egressQuerier querier.EgressQuerier) (*FlowExporter, error) { diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 7441df7eba3..bc4b36fc714 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -109,7 +109,7 @@ type flowAggregator struct { flowAggregatorAddress string includePodLabels bool k8sClient kubernetes.Interface - podStore podstore.StoreInterface + podStore podstore.Interface numRecordsExported int64 updateCh chan *options.Options configFile string @@ -125,7 +125,7 @@ type flowAggregator struct { func NewFlowAggregator( k8sClient kubernetes.Interface, - podStore podstore.StoreInterface, + podStore podstore.Interface, configFile string, ) (*flowAggregator, error) { if len(configFile) == 0 { @@ -435,7 +435,7 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record sourceNodeName.SetStringValue(pod.Spec.NodeName) } } else { - klog.ErrorS(nil, "Cannot find pod information", "source address", key.SourceAddress, "flow start time", startTime) + klog.ErrorS(nil, "Cannot find Pod information", "sourceAddress", key.SourceAddress, "flowStartTime", startTime) } } } @@ -452,7 +452,7 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record destinationNodeName.SetStringValue(pod.Spec.NodeName) } } else { - klog.ErrorS(nil, "Cannot find pod information", "source address", key.DestinationAddress, "flow start time", startTime) + klog.ErrorS(nil, "Cannot find Pod information", "destinationAddress", key.DestinationAddress, "flowStartTime", startTime) } } } @@ -470,7 +470,7 @@ func (fa *flowAggregator) getRecordStartTime(record ipfixentities.Record) (*time func (fa *flowAggregator) fetchPodLabels(ip string, startTime time.Time) string { pod, exist := fa.podStore.GetPodByIPAndTime(ip, startTime) if !exist { - klog.ErrorS(nil, "Error when getting pod information from podInformer", "ip", ip, "startTime", startTime) + klog.ErrorS(nil, "Error when getting Pod information from podInformer", "ip", ip, "startTime", startTime) return "" } labelsJSON, err := json.Marshal(pod.GetLabels()) diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index ce1fedb869b..bc71bbe77f2 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -59,7 +59,7 @@ func init() { func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { ctrl := gomock.NewController(t) - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) @@ -444,7 +444,7 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) { func TestFlowAggregator_Run(t *testing.T) { ctrl := gomock.NewController(t) - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) mockS3Exporter := exportertesting.NewMockInterface(ctrl) @@ -687,7 +687,7 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) { client := fake.NewSimpleClientset() // Mock pod store - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) mockPodStore.EXPECT().GetPodByIPAndTime("", gomock.Any()).Return(nil, false) mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.2", gomock.Any()).Return(pod, true) @@ -847,7 +847,7 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { ctrl := gomock.NewController(t) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) stopCh := make(chan struct{}) defer close(stopCh) diff --git a/pkg/util/podstore/podstore.go b/pkg/util/podstore/podstore.go index f66ff53d1ff..2fbca98bf44 100644 --- a/pkg/util/podstore/podstore.go +++ b/pkg/util/podstore/podstore.go @@ -20,6 +20,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -28,37 +29,41 @@ import ( ) const ( - DeleteQueueName = "pod_delete" - PodIPIndex = "PodIP" - DelayTime = time.Minute * 5 + deleteQueueName = "pod_delete" + podIPIndex = "PodIP" + delayTime = time.Minute * 5 ) type PodStore struct { - curPods cache.Indexer - prevPods cache.Indexer + pods cache.Indexer podsToDelete workqueue.DelayingInterface - // Mapping pod.uuid to PodLocalTimestamps - timestampMap map[string]*podLocalTimestamps + // Mapping pod.uuid to podTimestamps + timestampMap map[types.UID]*podTimestamps clock clock.Clock - mutex sync.Mutex + mutex sync.RWMutex } -type podLocalTimestamps struct { - // The local clock when the Pod is added +type podTimestamps struct { CreationTimestamp time.Time - // The local clock when the Pod is actually deleted from kube-apiserver. Nil if it's not deleted yet. + // DeletionTimestamp is nil if a Pod is not deleted. DeletionTimestamp *time.Time } +// Interface is a podStore interface to create local podStore for Flow Exporter and Flow Aggregator. +type Interface interface { + GetPodByIPAndTime(ip string, startTime time.Time) (*corev1.Pod, bool) + Run(stopCh <-chan struct{}) +} + // NewPodStoreWithClock creates a Pod Store with a custom clock, // which is useful when writing robust unit tests. func NewPodStoreWithClock(podInformer cache.SharedIndexInformer, clock clock.WithTicker) *PodStore { s := &PodStore{ - curPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - prevPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - podsToDelete: workqueue.NewDelayingQueueWithCustomClock(clock, DeleteQueueName), + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + podsToDelete: workqueue.NewDelayingQueueWithCustomClock(clock, deleteQueueName), clock: clock, - timestampMap: map[string]*podLocalTimestamps{}, + timestampMap: map[types.UID]*podTimestamps{}, + mutex: sync.RWMutex{}, } podInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ @@ -74,27 +79,20 @@ func NewPodStore(podInformer cache.SharedIndexInformer) *PodStore { } func (s *PodStore) onPodUpdate(oldObj interface{}, newObj interface{}) { - oldPod, ok := oldObj.(*corev1.Pod) - if !ok { - klog.ErrorS(nil, "Received unexpected object", "oldObj", oldObj) - return - } newPod, ok := newObj.(*corev1.Pod) if !ok { klog.ErrorS(nil, "Received unexpected object", "newObj", newObj) return } - if oldPod.UID != newPod.UID { - klog.ErrorS(nil, "Pods' UID are not matched") + err := s.pods.Update(newPod) + if err != nil { + klog.ErrorS(err, "Error when updating Pod to index") return } - s.curPods.Update(newPod) - klog.V(4).InfoS("Process Pod Update Event", "Pod", klog.KObj(newPod)) + klog.V(4).InfoS("Processed Pod Update Event", "Pod", klog.KObj(newPod)) } func (s *PodStore) onPodCreate(obj interface{}) { - // Add event is unlikely to include the IP address. When the Pod is created the IP address has not - // been assigned yet by the CNI.0. IP will be update in onPodUpdate when we have Update event. s.mutex.Lock() defer s.mutex.Unlock() timeNow := s.clock.Now() @@ -103,13 +101,18 @@ func (s *PodStore) onPodCreate(obj interface{}) { klog.ErrorS(nil, "Received unexpected object", "obj", obj) return } - err := s.curPods.Add(pod) + err := s.pods.Add(pod) if err != nil { - klog.ErrorS(err, "Error when adding pod to curPods") + klog.ErrorS(err, "Error when adding Pod to index") return } - s.timestampMap[string(pod.UID)] = &podLocalTimestamps{CreationTimestamp: timeNow} - klog.V(4).InfoS("Process Pod Create Event", "Pod", klog.KObj(pod)) + switch pod.Status.Phase { + case corev1.PodPending: + s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: timeNow} + default: + s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: pod.CreationTimestamp.Time} + } + klog.V(4).InfoS("Processed Pod Create Event", "Pod", klog.KObj(pod)) } func (s *PodStore) onPodDelete(obj interface{}) { @@ -125,19 +128,14 @@ func (s *PodStore) onPodDelete(obj interface{}) { return } } - err := s.prevPods.Add(pod) - if err != nil { - klog.ErrorS(err, "Error when adding pod to prevPods") - return - } - err = s.curPods.Delete(pod) - if err != nil { - klog.ErrorS(err, "Error when deleting pod from curPods") + timestamp, ok := s.timestampMap[pod.UID] + if !ok { + klog.ErrorS(nil, "Cannot find podTimestamps in timestampMap", "UID", pod.UID) return } - s.timestampMap[string(pod.UID)].DeletionTimestamp = &timeNow - s.podsToDelete.AddAfter(pod, DelayTime) - klog.V(4).InfoS("Process Pod Delete Event", "Pod", klog.KObj(pod)) + timestamp.DeletionTimestamp = &timeNow + s.podsToDelete.AddAfter(pod, delayTime) + klog.V(4).InfoS("Processed Pod Delete Event", "Pod", klog.KObj(pod)) } func (s *PodStore) checkDeletedPod(obj interface{}) (*corev1.Pod, error) { @@ -152,31 +150,27 @@ func (s *PodStore) checkDeletedPod(obj interface{}) (*corev1.Pod, error) { return pod, nil } -func (s *PodStore) GetPodByIPAndTime(ip string, startTime time.Time) (*corev1.Pod, bool) { - s.mutex.Lock() - defer s.mutex.Unlock() - // Query current Pods first - curPod, ok := getPods(s.curPods, ip, startTime, s.timestampMap) - if ok { - return curPod, true - } - // Query previous Pods - prevPod, ok := getPods(s.prevPods, ip, startTime, s.timestampMap) - if ok { - return prevPod, true - } - return nil, false -} - -func getPods(indexer cache.Indexer, ip string, startTime time.Time, timestampMap map[string]*podLocalTimestamps) (*corev1.Pod, bool) { - pods, _ := indexer.ByIndex(PodIPIndex, ip) +func (s *PodStore) GetPodByIPAndTime(ip string, time time.Time) (*corev1.Pod, bool) { + s.mutex.RLock() + defer s.mutex.RUnlock() + pods, _ := s.pods.ByIndex(podIPIndex, ip) if len(pods) == 0 { return nil, false + } else if len(pods) == 1 { + pod := pods[0].(*corev1.Pod) + // In case the clocks may be skewed between different Nodes in the cluster, we directly return the Pod if there is only + // one Pod in the indexer. Otherwise, we check the timestamp for Pods in the indexer. + klog.V(4).InfoS("Matched Pod IP to Pod from indexer", "ip", ip, "Pod", klog.KObj(pod)) + return pod, true } for _, pod := range pods { pod := pod.(*corev1.Pod) - if timestampMap[string(pod.UID)].CreationTimestamp.Before(startTime) && (timestampMap[string(pod.UID)].DeletionTimestamp == nil || startTime.Before(*timestampMap[string(pod.UID)].DeletionTimestamp)) { - klog.V(4).InfoS("Get Pod info from indexer", "Pod", klog.KObj(pod)) + timestamp, ok := s.timestampMap[pod.UID] + if !ok { + continue + } + if timestamp.CreationTimestamp.Before(time) && (timestamp.DeletionTimestamp == nil || time.Before(*timestamp.DeletionTimestamp)) { + klog.V(4).InfoS("Matched Pod IP and time to Pod from indexer", "ip", ip, "time", time, "Pod", klog.KObj(pod)) return pod, true } } @@ -201,14 +195,16 @@ func (s *PodStore) processDeleteQueueItem() bool { if quit { return false } - err := s.prevPods.Delete(pod) + s.mutex.Lock() + defer s.mutex.Unlock() + err := s.pods.Delete(pod) if err != nil { - klog.ErrorS(err, "Error when deleting pod from prevPods") + klog.ErrorS(err, "Error when deleting Pod from deletion workqueue", "Pod", klog.KObj(pod.(*corev1.Pod))) return false } - delete(s.timestampMap, string(pod.(*corev1.Pod).UID)) + delete(s.timestampMap, pod.(*corev1.Pod).UID) s.podsToDelete.Done(pod) - klog.V(4).InfoS("Remove Pod from Pod Store", "Pod", klog.KObj(pod.(*corev1.Pod))) + klog.V(4).InfoS("Removed Pod from Pod Store", "Pod", klog.KObj(pod.(*corev1.Pod))) return true } diff --git a/pkg/util/podstore/podstore_test.go b/pkg/util/podstore/podstore_test.go index e30b63a7e61..a9a2b6971b0 100644 --- a/pkg/util/podstore/podstore_test.go +++ b/pkg/util/podstore/podstore_test.go @@ -44,11 +44,13 @@ var ( IP: "1.2.3.4", }, }, + Phase: v1.PodPending, }, ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "pod1_ns", - UID: "pod1", + Name: "pod1", + Namespace: "pod1_ns", + UID: "pod1", + CreationTimestamp: metav1.Time{Time: refTime.Add(-5 * time.Minute)}, }, } pod2 = &v1.Pod{ @@ -65,43 +67,49 @@ var ( UID: "pod2", }, } - timestampMap = map[string]*podLocalTimestamps{ - "pod1": {CreationTimestamp: refTime}, - "pod2": {CreationTimestamp: refTime.Add(-5 * time.Minute), DeletionTimestamp: &refTime}, - } - node = &v1.Node{} -) - -func Test_onPodUpdate(t *testing.T) { - oldPod := &v1.Pod{ + pod3 = &v1.Pod{ Status: v1.PodStatus{ PodIPs: []v1.PodIP{ { - IP: "1.2.3.4", + IP: "4.3.2.1", }, }, + Phase: v1.PodRunning, }, ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "pod1_ns", - UID: "pod1", + Name: "pod3", + Namespace: "pod3_ns", + UID: "pod3", + CreationTimestamp: metav1.Time{Time: refTime.Add(-5 * time.Minute)}, }, } - newPod1 := &v1.Pod{ + pod4 = &v1.Pod{ Status: v1.PodStatus{ PodIPs: []v1.PodIP{ { - IP: "4.5.6.7", + IP: "1.2.3.4", }, }, + Phase: v1.PodSucceeded, }, ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "pod1_ns", - UID: "pod1", + Name: "pod1", + Namespace: "pod1_ns", + UID: "pod4", + CreationTimestamp: metav1.Time{Time: refTime.Add(-5 * time.Minute)}, + DeletionTimestamp: &metav1.Time{Time: refTime}, }, } - newPod2 := &v1.Pod{ + timestampMap = map[types.UID]*podTimestamps{ + "pod1": {CreationTimestamp: refTime}, + "pod2": {CreationTimestamp: refTime.Add(-5 * time.Minute), DeletionTimestamp: &refTime}, + "pod4": {CreationTimestamp: refTime.Add(-5 * time.Minute), DeletionTimestamp: &refTime}, + } + node = &v1.Node{} +) + +func Test_onPodUpdate(t *testing.T) { + newPod1 := &v1.Pod{ Status: v1.PodStatus{ PodIPs: []v1.PodIP{ { @@ -112,100 +120,71 @@ func Test_onPodUpdate(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod1", Namespace: "pod1_ns", - UID: "newPod2", + UID: "pod1", }, } tests := []struct { name string - podStore *PodStore oldObj interface{} newObj interface{} expectedPod *v1.Pod }{ { - name: "oldObj is not Pod", - podStore: &PodStore{ - curPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - }, - oldObj: node, - newObj: newPod1, - expectedPod: oldPod, - }, - { - name: "newObj is not Pod", - podStore: &PodStore{ - curPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - }, - oldObj: oldPod, + name: "newObj is not Pod", newObj: node, - expectedPod: oldPod, - }, - { - name: "Unmatched UID", - podStore: &PodStore{ - curPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - }, - oldObj: oldPod, - newObj: newPod2, - expectedPod: oldPod, + expectedPod: pod1, }, { - name: "Valid case", - podStore: &PodStore{ - curPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - }, - oldObj: oldPod, + name: "valid case", newObj: newPod1, expectedPod: newPod1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.podStore.curPods.Add(oldPod) - tt.podStore.onPodUpdate(tt.oldObj, tt.newObj) - assert.Equal(t, 1, len(tt.podStore.curPods.List())) - assert.Equal(t, tt.expectedPod, tt.podStore.curPods.List()[0].(*v1.Pod)) + podStore := &PodStore{ + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + } + err := podStore.pods.Add(pod1) + require.NoError(t, err) + podStore.onPodUpdate(tt.oldObj, tt.newObj) + require.Len(t, podStore.pods.List(), 1) + assert.Equal(t, tt.expectedPod, podStore.pods.List()[0].(*v1.Pod)) }) } } func Test_onPodCreate(t *testing.T) { tests := []struct { - name string - podStore *PodStore - obj interface{} - expectedTimestampMapLen int + name string + obj interface{} + expectedMap map[types.UID]*podTimestamps }{ { - name: "object is not Pod", - podStore: &PodStore{ - timestampMap: map[string]*podLocalTimestamps{}, - clock: clock.NewFakeClock(refTime), - curPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - }, - obj: node, - expectedTimestampMapLen: 0, + name: "object is not Pod", + obj: node, + expectedMap: map[types.UID]*podTimestamps{}, }, { - name: "Valid case", - obj: pod1, - podStore: &PodStore{ - timestampMap: map[string]*podLocalTimestamps{}, - clock: clock.NewFakeClock(refTime), - curPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - }, - expectedTimestampMapLen: 1, + name: "valid case for Pending Pod", + obj: pod1, + expectedMap: map[types.UID]*podTimestamps{"pod1": {CreationTimestamp: refTime}}, + }, + { + name: "valid case for Running Pod", + obj: pod3, + expectedMap: map[types.UID]*podTimestamps{"pod3": {CreationTimestamp: refTime.Add(-5 * time.Minute)}}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, 0, len(tt.podStore.timestampMap)) - tt.podStore.onPodCreate(tt.obj) - assert.Equal(t, tt.expectedTimestampMapLen, len(tt.podStore.timestampMap)) - if tt.expectedTimestampMapLen > 0 { - assert.Equal(t, tt.expectedTimestampMapLen, len(tt.podStore.curPods.List())) - assert.Equal(t, tt.podStore.clock.Now(), tt.podStore.timestampMap[string(tt.obj.(*v1.Pod).UID)].CreationTimestamp) + podStore := &PodStore{ + timestampMap: map[types.UID]*podTimestamps{}, + clock: clock.NewFakeClock(refTime), + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), } + podStore.onPodCreate(tt.obj) + assert.Equal(t, tt.expectedMap, podStore.timestampMap) }) } } @@ -213,67 +192,30 @@ func Test_onPodCreate(t *testing.T) { func Test_onPodDelete(t *testing.T) { k8sClient := fake.NewSimpleClientset() podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{}) - tests := []struct { - name string - obj interface{} - timeWait time.Duration - expectedCurLength int - expectedResultInPrev *v1.Pod - expectedLength int - expectedTimestamp *podLocalTimestamps - }{ - { - name: "object is neither pod or DeletedFinalStateUnknown", - obj: node, - timeWait: 0, - expectedCurLength: 1, - expectedResultInPrev: nil, - expectedLength: 0, - expectedTimestamp: &podLocalTimestamps{CreationTimestamp: refTime}, - }, - { - name: "Pod in prevPod and in podsToDelete", - obj: pod1, - timeWait: 5 * time.Minute, - expectedCurLength: 0, - expectedResultInPrev: pod1, - expectedLength: 1, - expectedTimestamp: &podLocalTimestamps{CreationTimestamp: refTime, DeletionTimestamp: &refTime}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - fakeClock := clock.NewFakeClock(refTime) - podStore := NewPodStoreWithClock(podInformer, fakeClock) - err := podStore.curPods.Add(pod1) - assert.NoError(t, err) - podStore.timestampMap = map[string]*podLocalTimestamps{"pod1": {CreationTimestamp: refTime}} + fakeClock := clock.NewFakeClock(refTime) + podStore := NewPodStoreWithClock(podInformer, fakeClock) + err := podStore.pods.Add(pod1) + require.NoError(t, err) + podStore.timestampMap = map[types.UID]*podTimestamps{"pod1": {CreationTimestamp: refTime}} - expectedDeleteTime := refTime.Add(tt.timeWait) - podStore.onPodDelete(tt.obj) - prePods, _ := podStore.prevPods.ByIndex(PodIPIndex, "1.2.3.4") - if tt.expectedResultInPrev == nil { - assert.Equal(t, tt.expectedLength, len(prePods)) - assert.Equal(t, tt.expectedTimestamp, podStore.timestampMap["pod1"]) - assert.Equal(t, tt.expectedCurLength, len(podStore.curPods.List())) - } else { - prePod := prePods[0].(*v1.Pod) - assert.Equal(t, tt.expectedResultInPrev, prePod) - assert.NotNil(t, podStore.timestampMap[string(prePod.UID)].DeletionTimestamp) + t.Run("object is neither Pod or DeletedFinalStateUnknown", func(t *testing.T) { + podStore.onPodDelete(node) + assert.Equal(t, &podTimestamps{CreationTimestamp: refTime}, podStore.timestampMap["pod1"]) + }) - fakeClock.SetTime(expectedDeleteTime.Add(-10 * time.Millisecond)) - assert.Eventuallyf(t, func() bool { - return podStore.podsToDelete.Len() == 0 - }, 1*time.Second, 10*time.Millisecond, "Pod should not be added to PodsToDelete") - fakeClock.SetTime(expectedDeleteTime.Add(10 * time.Millisecond)) - assert.Eventuallyf(t, func() bool { - return podStore.podsToDelete.Len() == tt.expectedLength - }, 1*time.Second, 10*time.Millisecond, "Pod is not added to PodsToDelete") - assert.Equal(t, tt.expectedTimestamp, podStore.timestampMap["pod1"]) - assert.Equal(t, tt.expectedCurLength, len(podStore.curPods.List())) - } - }) - } + t.Run("Pod is in prevPod and podsToDelete", func(t *testing.T) { + expectedDeleteTime := refTime.Add(delayTime) + podStore.onPodDelete(pod1) + assert.Equal(t, &podTimestamps{CreationTimestamp: refTime, DeletionTimestamp: &refTime}, podStore.timestampMap["pod1"]) + fakeClock.SetTime(expectedDeleteTime.Add(-10 * time.Millisecond)) + assert.Eventuallyf(t, func() bool { + return podStore.podsToDelete.Len() == 0 + }, 1*time.Second, 10*time.Millisecond, "Pod should not be added to PodsToDelete") + fakeClock.SetTime(expectedDeleteTime.Add(10 * time.Millisecond)) + assert.Eventuallyf(t, func() bool { + return podStore.podsToDelete.Len() == 1 + }, 1*time.Second, 10*time.Millisecond, "Pod is not added to PodsToDelete") + }) } func Test_checkDeletedPod(t *testing.T) { @@ -289,24 +231,24 @@ func Test_checkDeletedPod(t *testing.T) { expectedErr: "received unexpected object: ", }, { - name: "object in DeletedFinalStateUnknown is not pod", + name: "object in DeletedFinalStateUnknown is not Pod", obj: cache.DeletedFinalStateUnknown{Obj: node}, expectedErr: "DeletedFinalStateUnknown object is not of type Pod", }, { - name: "Valid case", - obj: cache.DeletedFinalStateUnknown{Obj: pod2}, - expectedResult: pod2, + name: "valid case", + obj: cache.DeletedFinalStateUnknown{Obj: pod1}, + expectedResult: pod1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := (&PodStore{}).checkDeletedPod(tt.obj) + pod, err := (&PodStore{}).checkDeletedPod(tt.obj) if tt.expectedErr != "" { assert.ErrorContains(t, err, tt.expectedErr) } else { require.NoError(t, err) - assert.Equal(t, tt.expectedResult, got) + assert.Equal(t, tt.expectedResult, pod) } }) } @@ -322,32 +264,44 @@ func Test_GetPodByIPAndTime(t *testing.T) { expectedResult *v1.Pod }{ { - name: "Find pod in current pod store", - ip: "1.2.3.4", - startTime: refTime.Add(time.Minute), - expectedResult: pod1, + name: "no Pod in the Pod store", + ip: "1.3.5.7", + startTime: refTime, + expectedResult: nil, }, { - name: "Find pod in previous pod store", + name: "find only one Pod in the Pod store - correct startTime", ip: "5.6.7.8", startTime: refTime.Add(-time.Minute), expectedResult: pod2, }, { - name: "Cannot find pod in previous pod store. SearchTime < CreationTime", + name: "find only one Pod in the Pod store - incorrect startTime", ip: "5.6.7.8", - startTime: refTime.Add(-time.Minute * 10), - expectedResult: nil, + startTime: refTime.Add(time.Minute), + expectedResult: pod2, }, { - name: "Cannot find pod in previous pod store. SearchTime > DeletionTime", - ip: "5.6.7.8", - startTime: refTime, + name: "find current Pod in the Pod store", + ip: "1.2.3.4", + startTime: refTime.Add(time.Minute), + expectedResult: pod1, + }, + { + name: "find previous Pod in the Pod store", + ip: "1.2.3.4", + startTime: refTime.Add(-time.Minute), + expectedResult: pod4, + }, + { + name: "cannot find the Pod in the Pod store - SearchTime < CreationTime", + ip: "1.2.3.4", + startTime: refTime.Add(-time.Minute * 10), expectedResult: nil, }, { - name: "Empty in both pod store", - ip: "1.3.5.7", + name: "cannot find the Pod in the Pod store - SearchTime > DeletionTime", + ip: "1.2.3.4", startTime: refTime, expectedResult: nil, }, @@ -355,10 +309,12 @@ func Test_GetPodByIPAndTime(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { podStore := NewPodStore(podInformer) - err := podStore.curPods.Add(pod1) - assert.NoError(t, err) - err = podStore.prevPods.Add(pod2) - assert.NoError(t, err) + err := podStore.pods.Add(pod1) + require.NoError(t, err) + err = podStore.pods.Add(pod2) + require.NoError(t, err) + err = podStore.pods.Add(pod4) + require.NoError(t, err) podStore.timestampMap = timestampMap pod, _ := podStore.GetPodByIPAndTime(tt.ip, tt.startTime) assert.Equal(t, tt.expectedResult, pod) @@ -368,50 +324,25 @@ func Test_GetPodByIPAndTime(t *testing.T) { func Test_processDeleteQueueItem(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - tests := []struct { - name string - podStore *PodStore - expectedResult bool - shutdown bool - }{ - { - name: "podsToDelete is shutdown", - podStore: &PodStore{ - curPods: nil, - prevPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - podsToDelete: workqueue.NewDelayingQueueWithCustomClock(fakeClock, DeleteQueueName), - timestampMap: map[string]*podLocalTimestamps{"pod1": {}}, - }, - expectedResult: false, - shutdown: true, - }, - { - name: "Delete Pod in podsToDelete/prevPod/timestampMap", - podStore: &PodStore{ - curPods: nil, - prevPods: cache.NewIndexer(podKeyFunc, cache.Indexers{PodIPIndex: podIPIndexFunc}), - podsToDelete: workqueue.NewDelayingQueueWithCustomClock(fakeClock, DeleteQueueName), - timestampMap: map[string]*podLocalTimestamps{"pod1": {}}, - }, - expectedResult: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.shutdown { - tt.podStore.podsToDelete.ShutDown() - } - tt.podStore.prevPods.Add(pod1) - tt.podStore.podsToDelete.Add(pod1) - result := tt.podStore.processDeleteQueueItem() - assert.Equal(t, tt.expectedResult, result) - if result { - assert.Equal(t, 0, tt.podStore.podsToDelete.Len()) - assert.Equal(t, 0, len(tt.podStore.prevPods.List())) - assert.Equal(t, 0, len(tt.podStore.timestampMap)) - } - }) + podStore := &PodStore{ + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + podsToDelete: workqueue.NewDelayingQueueWithCustomClock(fakeClock, deleteQueueName), + timestampMap: map[types.UID]*podTimestamps{"pod1": {}}, } + t.Run("delete Pod in podsToDelete/prevPod/timestampMap", func(t *testing.T) { + err := podStore.pods.Add(pod1) + require.NoError(t, err) + podStore.podsToDelete.Add(pod1) + + result := podStore.processDeleteQueueItem() + assert.Equal(t, true, result) + if result { + assert.Equal(t, 0, podStore.podsToDelete.Len()) + assert.Equal(t, 0, len(podStore.pods.List())) + assert.Equal(t, 0, len(podStore.timestampMap)) + } + }) + } func Test_podKeyFunc(t *testing.T) { @@ -427,7 +358,7 @@ func Test_podKeyFunc(t *testing.T) { expectedErr: "obj is not Pod: ", }, { - name: "Valid case", + name: "valid case", obj: pod1, expectedResult: "pod1", }, @@ -455,10 +386,10 @@ func Test_podIPIndexFunc(t *testing.T) { { name: "object is not Pod", obj: node, - expectedErr: "obj is not Pod: ", + expectedErr: "obj is not Pod:", }, { - name: "Valid case", + name: "valid case", obj: pod1, expectedResult: []string{"1.2.3.4"}, }, @@ -544,7 +475,7 @@ func BenchmarkGetPodByIPAndTime(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { randomPod := podArray[rand.Intn(v.input)] - creationTime := podStore.timestampMap[string(randomPod.UID)].CreationTimestamp + creationTime := podStore.timestampMap[randomPod.UID].CreationTimestamp _, ok := podStore.GetPodByIPAndTime(randomPod.Status.PodIPs[0].IP, creationTime.Add(time.Millisecond)) total++ if ok { diff --git a/pkg/util/podstore/testing/mock_podstore.go b/pkg/util/podstore/testing/mock_podstore.go index 61d7e47d48e..42d7ce9c510 100644 --- a/pkg/util/podstore/testing/mock_podstore.go +++ b/pkg/util/podstore/testing/mock_podstore.go @@ -14,7 +14,7 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: antrea.io/antrea/pkg/util/podstore (interfaces: StoreInterface) +// Source: antrea.io/antrea/pkg/util/podstore (interfaces: Interface) // Package testing is a generated GoMock package. package testing @@ -26,31 +26,31 @@ import ( time "time" ) -// MockStoreInterface is a mock of StoreInterface interface -type MockStoreInterface struct { +// MockInterface is a mock of Interface interface +type MockInterface struct { ctrl *gomock.Controller - recorder *MockStoreInterfaceMockRecorder + recorder *MockInterfaceMockRecorder } -// MockStoreInterfaceMockRecorder is the mock recorder for MockStoreInterface -type MockStoreInterfaceMockRecorder struct { - mock *MockStoreInterface +// MockInterfaceMockRecorder is the mock recorder for MockInterface +type MockInterfaceMockRecorder struct { + mock *MockInterface } -// NewMockStoreInterface creates a new mock instance -func NewMockStoreInterface(ctrl *gomock.Controller) *MockStoreInterface { - mock := &MockStoreInterface{ctrl: ctrl} - mock.recorder = &MockStoreInterfaceMockRecorder{mock} +// NewMockInterface creates a new mock instance +func NewMockInterface(ctrl *gomock.Controller) *MockInterface { + mock := &MockInterface{ctrl: ctrl} + mock.recorder = &MockInterfaceMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockStoreInterface) EXPECT() *MockStoreInterfaceMockRecorder { +func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { return m.recorder } // GetPodByIPAndTime mocks base method -func (m *MockStoreInterface) GetPodByIPAndTime(arg0 string, arg1 time.Time) (*v1.Pod, bool) { +func (m *MockInterface) GetPodByIPAndTime(arg0 string, arg1 time.Time) (*v1.Pod, bool) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetPodByIPAndTime", arg0, arg1) ret0, _ := ret[0].(*v1.Pod) @@ -59,19 +59,19 @@ func (m *MockStoreInterface) GetPodByIPAndTime(arg0 string, arg1 time.Time) (*v1 } // GetPodByIPAndTime indicates an expected call of GetPodByIPAndTime -func (mr *MockStoreInterfaceMockRecorder) GetPodByIPAndTime(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockInterfaceMockRecorder) GetPodByIPAndTime(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodByIPAndTime", reflect.TypeOf((*MockStoreInterface)(nil).GetPodByIPAndTime), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodByIPAndTime", reflect.TypeOf((*MockInterface)(nil).GetPodByIPAndTime), arg0, arg1) } // Run mocks base method -func (m *MockStoreInterface) Run(arg0 <-chan struct{}) { +func (m *MockInterface) Run(arg0 <-chan struct{}) { m.ctrl.T.Helper() m.ctrl.Call(m, "Run", arg0) } // Run indicates an expected call of Run -func (mr *MockStoreInterfaceMockRecorder) Run(arg0 interface{}) *gomock.Call { +func (mr *MockInterfaceMockRecorder) Run(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockStoreInterface)(nil).Run), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockInterface)(nil).Run), arg0) } diff --git a/pkg/util/podstore/type.go b/pkg/util/podstore/type.go deleted file mode 100644 index cede99c9bc4..00000000000 --- a/pkg/util/podstore/type.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2023 Antrea Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package podstore - -import ( - "time" - - corev1 "k8s.io/api/core/v1" -) - -// StoreInterface is a podStore interface to create local podStore for Flow Exporter and Flow Aggregator. -type StoreInterface interface { - GetPodByIPAndTime(ip string, startTime time.Time) (*corev1.Pod, bool) - Run(stopCh <-chan struct{}) -} diff --git a/test/integration/agent/flowexporter_test.go b/test/integration/agent/flowexporter_test.go index 710146ac3e6..0885ba803e4 100644 --- a/test/integration/agent/flowexporter_test.go +++ b/test/integration/agent/flowexporter_test.go @@ -117,7 +117,7 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) { // Create connectionStore, FlowRecords and associated mocks connDumperMock := connectionstest.NewMockConnTrackDumper(ctrl) - mockPodStore := podstoretest.NewMockStoreInterface(ctrl) + mockPodStore := podstoretest.NewMockInterface(ctrl) npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) // TODO: Enhance the integration test by testing service. o := &flowexporter.FlowExporterOptions{