Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
yuntanghsu committed Jul 11, 2023
1 parent ecda17c commit 3541f81
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 96 deletions.
14 changes: 7 additions & 7 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func run(o *Options) error {
l7NetworkPolicyEnabled := features.DefaultFeatureGate.Enabled(features.L7NetworkPolicy)
enableMulticlusterGW := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableGateway
enableMulticlusterNP := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableStretchedNetworkPolicy
enableFLowExporter := features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable

// Bridging mode will connect the uplink interface to the OVS bridge.
connectUplinkToBridge := enableBridgingMode
Expand All @@ -152,7 +153,7 @@ func run(o *Options) error {
features.DefaultFeatureGate.Enabled(features.AntreaPolicy),
l7NetworkPolicyEnabled,
o.enableEgress,
features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable,
enableFLowExporter,
o.config.AntreaProxy.ProxyAll,
connectUplinkToBridge,
multicastEnabled,
Expand Down Expand Up @@ -311,10 +312,9 @@ func run(o *Options) error {
// Initialize localPodInformer for NPLAgent, AntreaIPAMController,
// StretchedNetworkPolicyController, and secondary network controller.
var localPodInformer cache.SharedIndexInformer
if enableNodePortLocal || enableBridgingMode || enableMulticlusterNP ||
if enableNodePortLocal || enableBridgingMode || enableMulticlusterNP || enableFLowExporter ||
features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) ||
features.DefaultFeatureGate.Enabled(features.TrafficControl) ||
(features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable) {
features.DefaultFeatureGate.Enabled(features.TrafficControl) {
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String()
}
Expand Down Expand Up @@ -596,8 +596,8 @@ func run(o *Options) error {
}

var flowExporter *exporter.FlowExporter
if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable {
podStore := podstore.NewPodStorage(localPodInformer)
if enableFLowExporter {
podStore := podstore.NewPodStore(localPodInformer)
flowExporterOptions := &flowexporter.FlowExporterOptions{
FlowCollectorAddr: o.flowCollectorAddr,
FlowCollectorProto: o.flowCollectorProto,
Expand Down Expand Up @@ -872,7 +872,7 @@ func run(o *Options) error {
go ofClient.StartPacketInHandler(stopCh)

// Start the goroutine to periodically export IPFIX flow records.
if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable {
if enableFLowExporter {
go flowExporter.Run(stopCh)
}

Expand Down
5 changes: 2 additions & 3 deletions cmd/flow-aggregator/flow-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ func run(configFile string) error {
return fmt.Errorf("error when creating K8s client: %v", err)
}

podInformer := coreinformers.NewFilteredPodInformer(
podInformer := coreinformers.NewPodInformer(
k8sClient,
metav1.NamespaceAll,
informerDefaultResync,
cache.Indexers{},
nil,
)

podStore := podstore.NewPodStorage(podInformer)
podStore := podstore.NewPodStore(podInformer)

flowAggregator, err := aggregator.NewFlowAggregator(
k8sClient,
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ const (

type connectionStore struct {
connections map[flowexporter.ConnectionKey]*flowexporter.Connection
podStore *podstore.PodStorage
podStore *podstore.PodStore
antreaProxier proxy.Proxier
expirePriorityQueue *priorityqueue.ExpirePriorityQueue
staleConnectionTimeout time.Duration
mutex sync.Mutex
}

func NewConnectionStore(
podStore *podstore.PodStorage,
podStore *podstore.PodStore,
proxier proxy.Proxier,
o *flowexporter.FlowExporterOptions) connectionStore {
return connectionStore{
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
}
// Create connectionStore
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
connStore := NewConnectionStore(podStore, nil, testFlowExporterOptions)
// Add flows to the Connection store
for i, flow := range testFlows {
Expand All @@ -113,8 +113,8 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {
metrics.InitializeConnectionMetrics()
// test on deny connection store
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
denyConnStore := NewDenyConnectionStore(podStore, nil, testFlowExporterOptions)
tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
conn := &flowexporter.Connection{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewConntrackConnectionStore(
v4Enabled bool,
v6Enabled bool,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
podStore *podstore.PodStorage,
podStore *podstore.PodStore,
proxier proxy.Proxier,
o *flowexporter.FlowExporterOptions,
) *ConntrackConnectionStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func setupConntrackConnStore(b *testing.B, connections []*flowexporter.Connectio
ctrl := gomock.NewController(b)
defer ctrl.Finish()
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
Expand Down
12 changes: 6 additions & 6 deletions pkg/agent/flowexporter/connections/conntrack_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, podStore, mockProxier, testFlowExporterOptions)
stopCh := make(chan struct{})
defer close(stopCh)
Expand Down Expand Up @@ -310,8 +310,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
// Create connectionStore
stopCh := make(chan struct{})
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
defer close(stopCh)
go podInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
Expand All @@ -337,8 +337,8 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) {
testFlows := make([]*flowexporter.Connection, 0)
// Create connectionStore
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type DenyConnectionStore struct {
connectionStore
}

func NewDenyConnectionStore(podStore *podstore.PodStorage, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore {
func NewDenyConnectionStore(podStore *podstore.PodStore, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore {
return &DenyConnectionStore{
connectionStore: NewConnectionStore(podStore, proxier, o),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
IsActive: true,
}
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type FlowExporter struct {
denyPriorityQueue *priorityqueue.ExpirePriorityQueue
expiredConns []flowexporter.Connection
egressQuerier querier.EgressQuerier
podStore *podstore.PodStorage
podStore *podstore.PodStore
}

func genObservationID(nodeName string) uint32 {
Expand All @@ -154,7 +154,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter
return expInput
}

func NewFlowExporter(podStore *podstore.PodStorage, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
func NewFlowExporter(podStore *podstore.PodStore, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet,
ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions,
egressQuerier querier.EgressQuerier) (*FlowExporter, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type flowAggregator struct {
flowAggregatorAddress string
includePodLabels bool
k8sClient kubernetes.Interface
podStore *podstore.PodStorage
podStore *podstore.PodStore
numRecordsExported int64
updateCh chan *options.Options
configFile string
Expand All @@ -125,7 +125,7 @@ type flowAggregator struct {

func NewFlowAggregator(
k8sClient kubernetes.Interface,
podStore *podstore.PodStorage,
podStore *podstore.PodStore,
configFile string,
) (*flowAggregator, error) {
if len(configFile) == 0 {
Expand Down
20 changes: 10 additions & 10 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl)

k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)

newFlowAggregator := func(includePodLabels bool) *flowAggregator {
return &flowAggregator{
Expand Down Expand Up @@ -451,8 +451,8 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) {
func TestFlowAggregator_Run(t *testing.T) {
ctrl := gomock.NewController(t)
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
mockIPFIXExporter := exportertesting.NewMockInterface(ctrl)
mockClickHouseExporter := exportertesting.NewMockInterface(ctrl)
mockS3Exporter := exportertesting.NewMockInterface(ctrl)
Expand Down Expand Up @@ -694,8 +694,8 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) {
}

k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)

stopCh := make(chan struct{})
defer close(stopCh)
Expand All @@ -712,13 +712,13 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) {
{
name: "no pod object",
ip: "",
time: timeNow,
time: timeNow.Add(time.Second),
want: "",
},
{
name: "pod with label",
ip: "192.168.1.2",
time: timeNow,
time: timeNow.Add(time.Second),
want: "{\"test\":\"ut\"}",
},
}
Expand Down Expand Up @@ -864,8 +864,8 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) {
ctrl := gomock.NewController(t)
mockRecord := ipfixentitiestesting.NewMockRecord(ctrl)
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)

stopCh := make(chan struct{})
defer close(stopCh)
Expand Down
Loading

0 comments on commit 3541f81

Please sign in to comment.