Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change flow exporter's export expiry mechanism #2360

Merged
merged 1 commit into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"antrea.io/antrea/pkg/agent/controller/traceflow"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
"antrea.io/antrea/pkg/agent/flowexporter/exporter"
"antrea.io/antrea/pkg/agent/flowexporter/flowrecords"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/metrics"
npl "antrea.io/antrea/pkg/agent/nodeportlocal"
Expand Down Expand Up @@ -199,10 +199,13 @@ func run(o *Options) error {
statusManagerEnabled := antreaPolicyEnabled

var denyConnStore *connections.DenyConnectionStore
var denyPriorityQueue *priorityqueue.ExpirePriorityQueue
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
denyConnStore = connections.NewDenyConnectionStore(ifaceStore, proxier, o.staleConnectionTimeout)
denyPriorityQueue = priorityqueue.NewExpirePriorityQueue(o.activeFlowTimeout, o.idleFlowTimeout)
denyConnStore = connections.NewDenyConnectionStore(ifaceStore, proxier, denyPriorityQueue, o.staleConnectionTimeout)
go denyConnStore.RunPeriodicDeletion(stopCh)
}

networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
ofClient,
Expand Down Expand Up @@ -367,37 +370,37 @@ func run(o *Options) error {
}

// Initialize flow exporter to start go routines to poll conntrack flows and export IPFIX flow records
var conntrackPriorityQueue *priorityqueue.ExpirePriorityQueue
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
isNetworkPolicyOnly := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly()

flowRecords := flowrecords.NewFlowRecords()
conntrackPriorityQueue = priorityqueue.NewExpirePriorityQueue(o.activeFlowTimeout, o.idleFlowTimeout)
conntrackConnStore := connections.NewConntrackConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
flowRecords,
ifaceStore,
v4Enabled,
v6Enabled,
proxier,
networkPolicyController,
o.pollInterval,
conntrackPriorityQueue,
o.staleConnectionTimeout)
go conntrackConnStore.Run(stopCh)

flowExporter, err := exporter.NewFlowExporter(
conntrackConnStore,
flowRecords,
denyConnStore,
o.flowCollectorAddr,
o.flowCollectorProto,
o.activeFlowTimeout,
o.idleFlowTimeout,
v4Enabled,
v6Enabled,
k8sClient,
nodeRouteController,
isNetworkPolicyOnly)
isNetworkPolicyOnly,
conntrackPriorityQueue,
denyPriorityQueue)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
}
}
}

c.denyConnStore.AddOrUpdateConn(&denyConn, time.Now(), uint64(packet.IPLength))
return nil
}
45 changes: 45 additions & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package connections

import (
"container/heap"
"fmt"
"sync"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/proxy"
)
Expand All @@ -35,19 +37,22 @@ type connectionStore struct {
connections map[flowexporter.ConnectionKey]*flowexporter.Connection
ifaceStore interfacestore.InterfaceStore
antreaProxier proxy.Proxier
expirePriorityQueue *priorityqueue.ExpirePriorityQueue
staleConnectionTimeout time.Duration
mutex sync.Mutex
}

func NewConnectionStore(
ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier,
expirePriorityQueue *priorityqueue.ExpirePriorityQueue,
staleConnectionTimeout time.Duration,
) connectionStore {
return connectionStore{
connections: make(map[flowexporter.ConnectionKey]*flowexporter.Connection),
ifaceStore: ifaceStore,
antreaProxier: proxier,
expirePriorityQueue: expirePriorityQueue,
staleConnectionTimeout: staleConnectionTimeout,
}
}
Expand Down Expand Up @@ -126,3 +131,43 @@ func lookupServiceProtocol(protoID uint8) (corev1.Protocol, error) {
}
return serviceProto, nil
}

func (cs *connectionStore) addItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) {
heanlan marked this conversation as resolved.
Show resolved Hide resolved
currTime := time.Now()
pqItem := &flowexporter.ItemToExpire{
Conn: conn,
ActiveExpireTime: currTime.Add(cs.expirePriorityQueue.ActiveFlowTimeout),
IdleExpireTime: currTime.Add(cs.expirePriorityQueue.IdleFlowTimeout),
}
heap.Push(cs.expirePriorityQueue, pqItem)
cs.expirePriorityQueue.KeyToItem[connKey] = pqItem
}

func (cs *connectionStore) AcquireConnStoreLock() {
cs.mutex.Lock()
}

func (cs *connectionStore) ReleaseConnStoreLock() {
cs.mutex.Unlock()
}

// UpdateConnAndQueue deletes the inactive connection from keyToItem map,
// without adding it back to the PQ. In this way, we can avoid to reset the
// item's expire time every time we encounter it in the PQ. The method also
// updates active connection's stats fields and adds it back to the PQ.
func (cs *connectionStore) UpdateConnAndQueue(pqItem *flowexporter.ItemToExpire, currTime time.Time) {
conn := pqItem.Conn
conn.LastExportTime = currTime
if conn.ReadyToDelete || !conn.IsActive {
cs.expirePriorityQueue.RemoveItemFromMap(conn)
} else {
// For active connections, we update their "prev" stats fields,
// reset active expire time and push back into the PQ.
conn.PrevBytes = conn.OriginalBytes
conn.PrevPackets = conn.OriginalPackets
conn.PrevTCPState = conn.TCPState
conn.PrevReverseBytes = conn.ReverseBytes
conn.PrevReversePackets = conn.ReversePackets
cs.expirePriorityQueue.ResetActiveExpireTimeAndPush(pqItem, currTime)
}
}
37 changes: 36 additions & 1 deletion pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/stretchr/testify/assert"

"antrea.io/antrea/pkg/agent/flowexporter"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/metrics"
)

const (
Expand Down Expand Up @@ -68,7 +70,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
}
// Create connectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
connStore := NewConnectionStore(mockIfaceStore, nil, testStaleConnectionTimeout)
connStore := NewConnectionStore(mockIfaceStore, nil, nil, testStaleConnectionTimeout)
// Add flows to the Connection store
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = flow
Expand All @@ -88,3 +90,36 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
assert.Equal(t, conn.OriginalPackets, uint64(0), "OriginalPackets should be reset")
}
}

func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
metrics.InitializeConnectionMetrics()
// test on deny connection store
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
denyConnStore := NewDenyConnectionStore(mockIfaceStore, nil, nil, testStaleConnectionTimeout)
tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
conn := &flowexporter.Connection{
FlowKey: tuple,
}
connKey := flowexporter.NewConnectionKey(conn)
denyConnStore.connections[connKey] = conn

// For testing purposes, set the metric
metrics.TotalDenyConnections.Set(1)
denyConnStore.deleteConnWithoutLock(connKey)
_, exists := denyConnStore.GetConnByKey(connKey)
assert.Equal(t, false, exists, "connection should be deleted in connection store")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

// test on conntrack connection store
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
conntrackConnStore := NewConntrackConnectionStore(mockConnDumper, mockIfaceStore, true, false, nil, nil, testPollInterval, nil, testStaleConnectionTimeout)
conntrackConnStore.connections[connKey] = conn

metrics.TotalAntreaConnectionsInConnTrackTable.Set(1)
conntrackConnStore.deleteConnWithoutLock(connKey)
_, exists = conntrackConnStore.GetConnByKey(connKey)
assert.Equal(t, false, exists, "connection should be deleted in connection store")
checkAntreaConnectionMetrics(t, len(conntrackConnStore.connections))
}
83 changes: 54 additions & 29 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/flowrecords"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
Expand All @@ -39,7 +39,6 @@ var serviceProtocolMap = map[uint8]corev1.Protocol{
}

type ConntrackConnectionStore struct {
flowRecords *flowrecords.FlowRecords
connDumper ConnTrackDumper
v4Enabled bool
v6Enabled bool
Expand All @@ -50,23 +49,22 @@ type ConntrackConnectionStore struct {

func NewConntrackConnectionStore(
connTrackDumper ConnTrackDumper,
flowRecords *flowrecords.FlowRecords,
ifaceStore interfacestore.InterfaceStore,
v4Enabled bool,
v6Enabled bool,
proxier proxy.Proxier,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
pollInterval time.Duration,
expirePriorityQueue *priorityqueue.ExpirePriorityQueue,
staleConnectionTimeout time.Duration,
) *ConntrackConnectionStore {
return &ConntrackConnectionStore{
flowRecords: flowRecords,
connDumper: connTrackDumper,
v4Enabled: v4Enabled,
v6Enabled: v6Enabled,
networkPolicyQuerier: npQuerier,
pollInterval: pollInterval,
connectionStore: NewConnectionStore(ifaceStore, proxier, staleConnectionTimeout),
connectionStore: NewConnectionStore(ifaceStore, proxier, expirePriorityQueue, staleConnectionTimeout),
}
}

Expand All @@ -88,9 +86,6 @@ func (cs *ConntrackConnectionStore) Run(stopCh <-chan struct{}) {
// TODO: Come up with a backoff/retry mechanism by increasing poll interval and adding retry timeout
klog.Errorf("Error during conntrack poll cycle: %v", err)
}
// AddOrUpdateFlowRecord method does not return any error, hence no error handling required.
cs.ForAllConnectionsDo(cs.flowRecords.AddOrUpdateFlowRecord)
klog.V(2).Infof("Flow records are successfully updated")
}
}
}
Expand All @@ -108,21 +103,12 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
// then we consider it to be stale and delete it.
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if !conn.IsPresent {
if conn.DyingAndDoneExport {
if err := cs.DeleteConnWithoutLock(key); err != nil {
// Delete the connection if it is ready to delete or it was not exported
// in the time period as specified by the stale connection timeout.
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
if err := cs.deleteConnWithoutLock(key); err != nil {
return err
}
} else {
record, exists := cs.flowRecords.GetFlowRecordFromMap(&key)
if exists {
// Delete the connection if it was not exported for the time
// period as specified by the stale connection timeout.
if time.Since(record.LastExportTime) >= cs.staleConnectionTimeout {
// Ignore error if flow record not found.
cs.flowRecords.DeleteFlowRecordFromMap(&key)
delete(cs.connections, key)
}
}
}
} else {
conn.IsPresent = false
Expand Down Expand Up @@ -212,11 +198,10 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
connKey := flowexporter.NewConnectionKey(conn)
cs.mutex.Lock()
defer cs.mutex.Unlock()
existingConn, exists := cs.connections[connKey]

existingConn, exists := cs.connections[connKey]
if exists {
existingConn.IsPresent = true
// avoid updating stats of the existing connection that is about to close
if flowexporter.IsConnectionDying(existingConn) {
return
}
Expand All @@ -228,15 +213,28 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
existingConn.ReverseBytes = conn.ReverseBytes
existingConn.ReversePackets = conn.ReversePackets
existingConn.TCPState = conn.TCPState
klog.V(4).Infof("Antrea flow updated: %v", existingConn)
existingConn.IsActive = flowexporter.CheckConntrackConnActive(existingConn)
if existingConn.IsActive {
existingItem, exists := cs.expirePriorityQueue.KeyToItem[connKey]
if !exists {
// If the connKey:pqItem pair does not exist in the map, it shows the
// conn was inactive, and was removed from PQ and map. Since it becomes
// active again now, we create a new pqItem and add it to PQ and map.
cs.expirePriorityQueue.AddItemToQueue(connKey, existingConn)
} else {
cs.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime,
time.Now().Add(cs.connectionStore.expirePriorityQueue.IdleFlowTimeout))
}
}
klog.V(4).InfoS("Antrea flow updated", "connection", existingConn)
} else {
cs.fillPodInfo(conn)
if conn.Mark == openflow.ServiceCTMark.GetValue() {
clusterIP := conn.DestinationServiceAddress.String()
svcPort := conn.DestinationServicePort
protocol, err := lookupServiceProtocol(conn.FlowKey.Protocol)
if err != nil {
klog.Warningf("Could not retrieve Service protocol: %v", err)
klog.InfoS("Could not retrieve Service protocol", "error", err)
} else {
serviceStr := fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol)
cs.fillServiceInfo(conn, serviceStr)
Expand All @@ -248,15 +246,42 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
conn.StopTime = time.Now()
}
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
conn.IsActive = true
// Add new antrea connection to connection store and PQ.
cs.connections[connKey] = conn
cs.expirePriorityQueue.AddItemToQueue(connKey, conn)
klog.V(4).InfoS("New Antrea flow added", "connection", conn)
}
}

func (cs *ConntrackConnectionStore) GetExpiredConns(expiredConns []flowexporter.Connection, currTime time.Time, maxSize int) ([]flowexporter.Connection, time.Duration) {
cs.AcquireConnStoreLock()
defer cs.ReleaseConnStoreLock()
for i := 0; i < maxSize; i++ {
pqItem := cs.connectionStore.expirePriorityQueue.GetTopExpiredItem(currTime)
if pqItem == nil {
break
}
expiredConns = append(expiredConns, *pqItem.Conn)
if flowexporter.IsConnectionDying(pqItem.Conn) {
// If a conntrack connection is in dying state or connection is not
// in the conntrack table, we set the ReadyToDelete flag to true to
// do the deletion later.
pqItem.Conn.ReadyToDelete = true
}
if pqItem.IdleExpireTime.Before(currTime) {
// No packets have been received during the idle timeout interval,
// the connection is therefore considered inactive.
pqItem.Conn.IsActive = false
}
cs.UpdateConnAndQueue(pqItem, currTime)
}
return expiredConns, cs.connectionStore.expirePriorityQueue.GetExpiryFromExpirePriorityQueue()
}

// DeleteConnWithoutLock deletes the connection from the connection map given
// deleteConnWithoutLock deletes the connection from the connection map given
// the connection key without grabbing the lock. Caller is expected to grab lock.
func (cs *ConntrackConnectionStore) DeleteConnWithoutLock(connKey flowexporter.ConnectionKey) error {
func (cs *ConntrackConnectionStore) deleteConnWithoutLock(connKey flowexporter.ConnectionKey) error {
_, exists := cs.connections[connKey]
if !exists {
return fmt.Errorf("connection with key %v doesn't exist in map", connKey)
Expand Down
Loading