Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
yuntanghsu committed Jul 24, 2023
1 parent 405d23a commit 3d54448
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 359 deletions.
18 changes: 6 additions & 12 deletions cmd/flow-aggregator/flow-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

aggregator "antrea.io/antrea/pkg/flowaggregator"
Expand All @@ -50,14 +48,9 @@ func run(configFile string) error {
return fmt.Errorf("error when creating K8s client: %v", err)
}

podInformer := coreinformers.NewPodInformer(
k8sClient,
metav1.NamespaceAll,
informerDefaultResync,
cache.Indexers{},
)

podStore := podstore.NewPodStore(podInformer)
informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync)
podInformer := informerFactory.Core().V1().Pods()
podStore := podstore.NewPodStore(podInformer.Informer())

flowAggregator, err := aggregator.NewFlowAggregator(
k8sClient,
Expand Down Expand Up @@ -88,7 +81,8 @@ func run(configFile string) error {
return fmt.Errorf("error when creating flow aggregator API server: %v", err)
}
go apiServer.Run(stopCh)
go podInformer.Run(stopCh)

informerFactory.Start(stopCh)

<-stopCh
klog.InfoS("Stopping flow aggregator")
Expand Down
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ MOCKGEN_TARGETS=(
"pkg/querier AgentNetworkPolicyInfoQuerier,AgentMulticastInfoQuerier,EgressQuerier testing"
"pkg/flowaggregator/querier FlowAggregatorQuerier testing"
"pkg/flowaggregator/s3uploader S3UploaderAPI testing"
"pkg/util/podstore StoreInterface testing"
"pkg/util/podstore Interface testing"
"third_party/proxy Provider testing"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ const (

type connectionStore struct {
connections map[flowexporter.ConnectionKey]*flowexporter.Connection
podStore podstore.StoreInterface
podStore podstore.Interface
antreaProxier proxy.Proxier
expirePriorityQueue *priorityqueue.ExpirePriorityQueue
staleConnectionTimeout time.Duration
mutex sync.Mutex
}

func NewConnectionStore(
podStore podstore.StoreInterface,
podStore podstore.Interface,
proxier proxy.Proxier,
o *flowexporter.FlowExporterOptions) connectionStore {
return connectionStore{
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
testFlowKeys[i] = &connKey
}
// Create connectionStore
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
connStore := NewConnectionStore(mockPodStore, nil, testFlowExporterOptions)
// Add flows to the Connection store
for i, flow := range testFlows {
Expand All @@ -106,7 +106,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {
ctrl := gomock.NewController(t)
metrics.InitializeConnectionMetrics()
// test on deny connection store
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
denyConnStore := NewDenyConnectionStore(mockPodStore, nil, testFlowExporterOptions)
tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
conn := &flowexporter.Connection{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewConntrackConnectionStore(
v4Enabled bool,
v6Enabled bool,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
podStore podstore.StoreInterface,
podStore podstore.Interface,
proxier proxy.Proxier,
o *flowexporter.FlowExporterOptions,
) *ConntrackConnectionStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BenchmarkPoll(b *testing.B) {
func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connectionstest.MockConnTrackDumper) {
ctrl := gomock.NewController(b)
defer ctrl.Finish()
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "pod-ns",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
},
}

mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
Expand All @@ -241,7 +241,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
}

// testAddNewConn tests podInfo, Services, network policy mapping.
func testAddNewConn(mockPodStore *podstoretest.MockStoreInterface, mockProxier *proxytest.MockProxier, npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier, conn flowexporter.Connection) {
func testAddNewConn(mockPodStore *podstoretest.MockInterface, mockProxier *proxytest.MockProxier, npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier, conn flowexporter.Connection) {
mockPodStore.EXPECT().GetPodByIPAndTime(conn.FlowKey.SourceAddress.String(), gomock.Any()).Return(nil, false)
mockPodStore.EXPECT().GetPodByIPAndTime(conn.FlowKey.DestinationAddress.String(), gomock.Any()).Return(pod1, true)

Expand Down Expand Up @@ -301,7 +301,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
// For testing purposes, set the metric
metrics.TotalAntreaConnectionsInConnTrackTable.Set(float64(len(testFlows)))
// Create connectionStore
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
connStore := NewConntrackConnectionStore(nil, true, false, nil, mockPodStore, nil, testFlowExporterOptions)
// Add flows to the connection store.
for i, flow := range testFlows {
Expand All @@ -323,7 +323,7 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) {

testFlows := make([]*flowexporter.Connection, 0)
// Create connectionStore
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, nil, mockPodStore, nil, testFlowExporterOptions)
// Hard-coded conntrack occupancy metrics for test
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type DenyConnectionStore struct {
connectionStore
}

func NewDenyConnectionStore(podStore podstore.StoreInterface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore {
func NewDenyConnectionStore(podStore podstore.Interface, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore {
return &DenyConnectionStore{
connectionStore: NewConnectionStore(podStore, proxier, o),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
OriginalPackets: uint64(1),
IsActive: true,
}
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
protocol, _ := lookupServiceProtocol(tuple.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type FlowExporter struct {
denyPriorityQueue *priorityqueue.ExpirePriorityQueue
expiredConns []flowexporter.Connection
egressQuerier querier.EgressQuerier
podStore podstore.StoreInterface
podStore podstore.Interface
}

func genObservationID(nodeName string) uint32 {
Expand All @@ -154,7 +154,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter
return expInput
}

func NewFlowExporter(podStore podstore.StoreInterface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet,
ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions,
egressQuerier querier.EgressQuerier) (*FlowExporter, error) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type flowAggregator struct {
flowAggregatorAddress string
includePodLabels bool
k8sClient kubernetes.Interface
podStore podstore.StoreInterface
podStore podstore.Interface
numRecordsExported int64
updateCh chan *options.Options
configFile string
Expand All @@ -125,7 +125,7 @@ type flowAggregator struct {

func NewFlowAggregator(
k8sClient kubernetes.Interface,
podStore podstore.StoreInterface,
podStore podstore.Interface,
configFile string,
) (*flowAggregator, error) {
if len(configFile) == 0 {
Expand Down Expand Up @@ -435,7 +435,7 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record
sourceNodeName.SetStringValue(pod.Spec.NodeName)
}
} else {
klog.ErrorS(nil, "Cannot find pod information", "source address", key.SourceAddress, "flow start time", startTime)
klog.ErrorS(nil, "Cannot find Pod information", "sourceAddress", key.SourceAddress, "flowStartTime", startTime)
}
}
}
Expand All @@ -452,7 +452,7 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record
destinationNodeName.SetStringValue(pod.Spec.NodeName)
}
} else {
klog.ErrorS(nil, "Cannot find pod information", "source address", key.DestinationAddress, "flow start time", startTime)
klog.ErrorS(nil, "Cannot find Pod information", "destinationAddress", key.DestinationAddress, "flowStartTime", startTime)
}
}
}
Expand All @@ -470,7 +470,7 @@ func (fa *flowAggregator) getRecordStartTime(record ipfixentities.Record) (*time
func (fa *flowAggregator) fetchPodLabels(ip string, startTime time.Time) string {
pod, exist := fa.podStore.GetPodByIPAndTime(ip, startTime)
if !exist {
klog.ErrorS(nil, "Error when getting pod information from podInformer", "ip", ip, "startTime", startTime)
klog.ErrorS(nil, "Error when getting Pod information from podInformer", "ip", ip, "startTime", startTime)
return ""
}
labelsJSON, err := json.Marshal(pod.GetLabels())
Expand Down
8 changes: 4 additions & 4 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func init() {

func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
ctrl := gomock.NewController(t)
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockIPFIXExporter := exportertesting.NewMockInterface(ctrl)
mockClickHouseExporter := exportertesting.NewMockInterface(ctrl)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) {

func TestFlowAggregator_Run(t *testing.T) {
ctrl := gomock.NewController(t)
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockIPFIXExporter := exportertesting.NewMockInterface(ctrl)
mockClickHouseExporter := exportertesting.NewMockInterface(ctrl)
mockS3Exporter := exportertesting.NewMockInterface(ctrl)
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) {

client := fake.NewSimpleClientset()
// Mock pod store
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockPodStore.EXPECT().GetPodByIPAndTime("", gomock.Any()).Return(nil, false)
mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.2", gomock.Any()).Return(pod, true)

Expand Down Expand Up @@ -847,7 +847,7 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) {

ctrl := gomock.NewController(t)
mockRecord := ipfixentitiestesting.NewMockRecord(ctrl)
mockPodStore := podstoretest.NewMockStoreInterface(ctrl)
mockPodStore := podstoretest.NewMockInterface(ctrl)

stopCh := make(chan struct{})
defer close(stopCh)
Expand Down
Loading

0 comments on commit 3d54448

Please sign in to comment.