Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
yuntanghsu committed Jul 18, 2023
1 parent acc0be2 commit fb5d16a
Show file tree
Hide file tree
Showing 15 changed files with 404 additions and 161 deletions.
14 changes: 7 additions & 7 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func run(o *Options) error {
l7NetworkPolicyEnabled := features.DefaultFeatureGate.Enabled(features.L7NetworkPolicy)
enableMulticlusterGW := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableGateway
enableMulticlusterNP := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableStretchedNetworkPolicy
enableFLowExporter := features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable

// Bridging mode will connect the uplink interface to the OVS bridge.
connectUplinkToBridge := enableBridgingMode
Expand All @@ -152,7 +153,7 @@ func run(o *Options) error {
features.DefaultFeatureGate.Enabled(features.AntreaPolicy),
l7NetworkPolicyEnabled,
o.enableEgress,
features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable,
enableFLowExporter,
o.config.AntreaProxy.ProxyAll,
connectUplinkToBridge,
multicastEnabled,
Expand Down Expand Up @@ -311,10 +312,9 @@ func run(o *Options) error {
// Initialize localPodInformer for NPLAgent, AntreaIPAMController,
// StretchedNetworkPolicyController, and secondary network controller.
var localPodInformer cache.SharedIndexInformer
if enableNodePortLocal || enableBridgingMode || enableMulticlusterNP ||
if enableNodePortLocal || enableBridgingMode || enableMulticlusterNP || enableFLowExporter ||
features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) ||
features.DefaultFeatureGate.Enabled(features.TrafficControl) ||
(features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable) {
features.DefaultFeatureGate.Enabled(features.TrafficControl) {
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String()
}
Expand Down Expand Up @@ -596,8 +596,8 @@ func run(o *Options) error {
}

var flowExporter *exporter.FlowExporter
if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable {
podStore := podstore.NewPodStorage(localPodInformer)
if enableFLowExporter {
podStore := podstore.NewPodStore(localPodInformer)
flowExporterOptions := &flowexporter.FlowExporterOptions{
FlowCollectorAddr: o.flowCollectorAddr,
FlowCollectorProto: o.flowCollectorProto,
Expand Down Expand Up @@ -872,7 +872,7 @@ func run(o *Options) error {
go ofClient.StartPacketInHandler(stopCh)

// Start the goroutine to periodically export IPFIX flow records.
if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable {
if enableFLowExporter {
go flowExporter.Run(stopCh)
}

Expand Down
5 changes: 2 additions & 3 deletions cmd/flow-aggregator/flow-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ func run(configFile string) error {
return fmt.Errorf("error when creating K8s client: %v", err)
}

podInformer := coreinformers.NewFilteredPodInformer(
podInformer := coreinformers.NewPodInformer(
k8sClient,
metav1.NamespaceAll,
informerDefaultResync,
cache.Indexers{},
nil,
)

podStore := podstore.NewPodStorage(podInformer)
podStore := podstore.NewPodStore(podInformer)

flowAggregator, err := aggregator.NewFlowAggregator(
k8sClient,
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ const (

type connectionStore struct {
connections map[flowexporter.ConnectionKey]*flowexporter.Connection
podStore *podstore.PodStorage
podStore *podstore.PodStore
antreaProxier proxy.Proxier
expirePriorityQueue *priorityqueue.ExpirePriorityQueue
staleConnectionTimeout time.Duration
mutex sync.Mutex
}

func NewConnectionStore(
podStore *podstore.PodStorage,
podStore *podstore.PodStore,
proxier proxy.Proxier,
o *flowexporter.FlowExporterOptions) connectionStore {
return connectionStore{
Expand Down
9 changes: 4 additions & 5 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
testIdleFlowTimeout = 1 * time.Second
testPollInterval = 0 // Not used in these tests, hence 0.
testStaleConnectionTimeout = 5 * time.Minute
informerDefaultResync = 12 * time.Hour
)

var testFlowExporterOptions = &flowexporter.FlowExporterOptions{
Expand Down Expand Up @@ -85,8 +84,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
}
// Create connectionStore
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
connStore := NewConnectionStore(podStore, nil, testFlowExporterOptions)
// Add flows to the Connection store
for i, flow := range testFlows {
Expand All @@ -113,8 +112,8 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {
metrics.InitializeConnectionMetrics()
// test on deny connection store
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
denyConnStore := NewDenyConnectionStore(podStore, nil, testFlowExporterOptions)
tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
conn := &flowexporter.Connection{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewConntrackConnectionStore(
v4Enabled bool,
v6Enabled bool,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
podStore *podstore.PodStorage,
podStore *podstore.PodStore,
proxier proxy.Proxier,
o *flowexporter.FlowExporterOptions,
) *ConntrackConnectionStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package connections

import (
"context"
"crypto/rand"
"flag"
"fmt"
Expand All @@ -27,9 +28,11 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -82,17 +85,17 @@ func setupConntrackConnStore(b *testing.B, connections []*flowexporter.Connectio
ctrl := gomock.NewController(b)
defer ctrl.Finish()
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "pod-ns",
Name: "pod",
CreationTimestamp: metav1.Time{Time: time.Now().Add(-255 * time.Second)},
Namespace: "pod-ns",
Name: "pod",
UID: "pod",
},
Status: v1.PodStatus{
Phase: v1.PodPending,
Expand All @@ -101,7 +104,11 @@ func setupConntrackConnStore(b *testing.B, connections []*flowexporter.Connectio
for _, conn := range connections {
pod.Status.PodIPs = append(pod.Status.PodIPs, v1.PodIP{IP: conn.FlowKey.SourceAddress.String()}, v1.PodIP{IP: conn.FlowKey.DestinationAddress.String()})
}
podInformer.GetIndexer().Add(pod)
k8sClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
err := wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) {
return len(podInformer.GetIndexer().List()) == 1, nil
})
require.NoError(b, err, "Pod should be added to podInformer")

mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockConnDumper.EXPECT().GetMaxConnections().Return(100000, nil).AnyTimes()
Expand Down
25 changes: 17 additions & 8 deletions pkg/agent/flowexporter/connections/conntrack_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package connections

import (
"context"
"encoding/binary"
"fmt"
"net"
Expand All @@ -29,10 +30,12 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/metrics/legacyregistry"
clock "k8s.io/utils/clock/testing"

"antrea.io/antrea/pkg/agent/flowexporter"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
Expand Down Expand Up @@ -223,15 +226,21 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{})
fakeClock := clock.NewFakeClock(refTime)
podStore := podstore.NewPodStoreWithClock(podInformer, fakeClock)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, true, false, npQuerier, podStore, mockProxier, testFlowExporterOptions)
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
pod1.CreationTimestamp = metav1.Time{Time: refTime.Add(-(time.Second * 60))}
podInformer.GetIndexer().Add(pod1)
fakeClock.SetTime(refTime.Add(-(time.Second * 60)))
k8sClient.CoreV1().Pods(pod1.Namespace).Create(context.TODO(), pod1, metav1.CreateOptions{})
err := wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) {
return len(podInformer.GetIndexer().List()) == 1, nil
})
require.NoError(t, err, "Pod should be added to podInformer")
fakeClock.SetTime(refTime)

for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
Expand Down Expand Up @@ -310,8 +319,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
// Create connectionStore
stopCh := make(chan struct{})
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
defer close(stopCh)
go podInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
Expand All @@ -337,8 +346,8 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) {
testFlows := make([]*flowexporter.Connection, 0)
// Create connectionStore
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type DenyConnectionStore struct {
connectionStore
}

func NewDenyConnectionStore(podStore *podstore.PodStorage, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore {
func NewDenyConnectionStore(podStore *podstore.PodStore, proxier proxy.Proxier, o *flowexporter.FlowExporterOptions) *DenyConnectionStore {
return &DenyConnectionStore{
connectionStore: NewConnectionStore(podStore, proxier, o),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
IsActive: true,
}
k8sClient := fake.NewSimpleClientset()
podInformer := coreinformers.NewFilteredPodInformer(k8sClient, metav1.NamespaceAll, informerDefaultResync, cache.Indexers{}, nil)
podStore := podstore.NewPodStorage(podInformer)
podInformer := coreinformers.NewPodInformer(k8sClient, metav1.NamespaceAll, 0, cache.Indexers{})
podStore := podstore.NewPodStore(podInformer)
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type FlowExporter struct {
denyPriorityQueue *priorityqueue.ExpirePriorityQueue
expiredConns []flowexporter.Connection
egressQuerier querier.EgressQuerier
podStore *podstore.PodStorage
podStore *podstore.PodStore
}

func genObservationID(nodeName string) uint32 {
Expand All @@ -154,7 +154,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter
return expInput
}

func NewFlowExporter(podStore *podstore.PodStorage, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
func NewFlowExporter(podStore *podstore.PodStore, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet,
ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions,
egressQuerier querier.EgressQuerier) (*FlowExporter, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type flowAggregator struct {
flowAggregatorAddress string
includePodLabels bool
k8sClient kubernetes.Interface
podStore *podstore.PodStorage
podStore *podstore.PodStore
numRecordsExported int64
updateCh chan *options.Options
configFile string
Expand All @@ -125,7 +125,7 @@ type flowAggregator struct {

func NewFlowAggregator(
k8sClient kubernetes.Interface,
podStore *podstore.PodStorage,
podStore *podstore.PodStore,
configFile string,
) (*flowAggregator, error) {
if len(configFile) == 0 {
Expand Down
Loading

0 comments on commit fb5d16a

Please sign in to comment.