Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <[email protected]>
  • Loading branch information
heanlan committed Sep 21, 2021
1 parent 54cebe6 commit f391387
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 53 deletions.
23 changes: 3 additions & 20 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/proxy"
)

Expand Down Expand Up @@ -152,26 +151,10 @@ func (cs *connectionStore) ReleaseConnStoreLock() {
cs.mutex.Unlock()
}

// deleteConnWithoutLock deletes the connection from the connection map given
// the connection key without grabbing the lock. Caller is expected to grab lock.
func (cs *connectionStore) deleteConnWithoutLock(connKey flowexporter.ConnectionKey, isDenyConn bool) error {
_, exists := cs.connections[connKey]
if !exists {
return fmt.Errorf("connection with key %v doesn't exist in map", connKey)
}
delete(cs.connections, connKey)
if !isDenyConn {
metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
} else {
metrics.TotalDenyConnections.Dec()
}
return nil
}

// UpdateConnAndQueue deletes the inactive connection from keyToItem map,
// also does not adding it back to the PQ. In this way, we can avoid to reset
// the item's expire time every time when we encounter it in the PQ. The method
// also updates active connection's stats fields and add it back to the PQ.
// without adding it back to the PQ. In this way, we can avoid to reset the
// item's expire time every time we encounter it in the PQ. The method also
// updates active connection's stats fields and adds it back to the PQ.
func (cs *connectionStore) UpdateConnAndQueue(pqItem *flowexporter.ItemToExpire, currTime time.Time) {
conn := pqItem.Conn
conn.LastExportTime = currTime
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {

// For testing purposes, set the metric
metrics.TotalDenyConnections.Set(1)
denyConnStore.deleteConnWithoutLock(connKey, true)
denyConnStore.deleteConnWithoutLock(connKey)
_, exists := denyConnStore.GetConnByKey(connKey)
assert.Equal(t, false, exists, "connection should be deleted in connection store")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
Expand All @@ -118,7 +118,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {
conntrackConnStore.connections[connKey] = conn

metrics.TotalAntreaConnectionsInConnTrackTable.Set(1)
conntrackConnStore.deleteConnWithoutLock(connKey, false)
conntrackConnStore.deleteConnWithoutLock(connKey)
_, exists = conntrackConnStore.GetConnByKey(connKey)
assert.Equal(t, false, exists, "connection should be deleted in connection store")
checkAntreaConnectionMetrics(t, len(conntrackConnStore.connections))
Expand Down
37 changes: 19 additions & 18 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
// Delete the connection if it is ready to delete or it was not exported
// in the time period as specified by the stale connection timeout.
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
if err := cs.deleteConnWithoutLock(key, false); err != nil {
if err := cs.deleteConnWithoutLock(key); err != nil {
return err
}
}
Expand Down Expand Up @@ -254,17 +254,6 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
}
}

// checkConnActive returns true if there are changes in connection's stats or
// TCP state, representing the connection is active.
func (cs *ConntrackConnectionStore) checkConnActive(conn *flowexporter.Connection) bool {
if (conn.OriginalPackets > conn.PrevPackets) ||
(conn.ReversePackets > conn.PrevReversePackets) ||
(conn.TCPState != conn.PrevTCPState) {
return true
}
return false
}

func (cs *ConntrackConnectionStore) GetExpiredConns(expiredConns []flowexporter.Connection, currTime time.Time, maxSize int) ([]flowexporter.Connection, time.Duration) {
cs.AcquireConnStoreLock()
defer cs.ReleaseConnStoreLock()
Expand All @@ -275,18 +264,30 @@ func (cs *ConntrackConnectionStore) GetExpiredConns(expiredConns []flowexporter.
}
expiredConns = append(expiredConns, *pqItem.Conn)
if flowexporter.IsConnectionDying(pqItem.Conn) {
// If a conntrack connection is in dying state or connection
// is not in conntrack table, we set the ReadyToDelete flag to
// true to do the deletion later.
// If a conntrack connection is in dying state or connection is not
// in the conntrack table, we set the ReadyToDelete flag to true to
// do the deletion later.
pqItem.Conn.ReadyToDelete = true
}
if pqItem.IdleExpireTime.Before(currTime) {
// If a conntrack connection item is idle time out, we consider
// the connection to be inactive, as no related flows coming within
// the idle time out range.
// If a conntrack connection's correspondent pqItem is idle time out,
// we consider the connection to be inactive, as no related packets
// received within the idle time out range.
pqItem.Conn.IsActive = false
}
cs.UpdateConnAndQueue(pqItem, currTime)
}
return expiredConns, cs.connectionStore.expirePriorityQueue.GetExpiryFromExpirePriorityQueue()
}

// deleteConnWithoutLock deletes the connection from the connection map given
// the connection key without grabbing the lock. Caller is expected to grab lock.
func (cs *ConntrackConnectionStore) deleteConnWithoutLock(connKey flowexporter.ConnectionKey) error {
_, exists := cs.connections[connKey]
if !exists {
return fmt.Errorf("connection with key %v doesn't exist in map", connKey)
}
delete(cs.connections, connKey)
metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
}
// Delete the connections in connection store.
for i := 0; i < len(testFlows); i++ {
err := connStore.deleteConnWithoutLock(*testFlowKeys[i], false)
err := connStore.deleteConnWithoutLock(*testFlowKeys[i])
assert.Nil(t, err, "DeleteConnectionByKey should return nil")
_, exists := connStore.GetConnByKey(*testFlowKeys[i])
assert.Equal(t, exists, false, "connection should be deleted in connection store")
Expand Down
14 changes: 13 additions & 1 deletion pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (ds *DenyConnectionStore) RunPeriodicDeletion(stopCh <-chan struct{}) {
case <-pollTicker.C:
deleteIfStaleConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= ds.staleConnectionTimeout {
if err := ds.deleteConnWithoutLock(key, true); err != nil {
if err := ds.deleteConnWithoutLock(key); err != nil {
return err
}
}
Expand Down Expand Up @@ -125,3 +125,15 @@ func (ds *DenyConnectionStore) GetExpiredConns(expiredConns []flowexporter.Conne
}
return expiredConns, ds.connectionStore.expirePriorityQueue.GetExpiryFromExpirePriorityQueue()
}

// deleteConnWithoutLock deletes the connection from the connection map given
// the connection key without grabbing the lock. Caller is expected to grab lock.
func (ds *DenyConnectionStore) deleteConnWithoutLock(connKey flowexporter.ConnectionKey) error {
_, exists := ds.connections[connKey]
if !exists {
return fmt.Errorf("connection with key %v doesn't exist in map", connKey)
}
delete(ds.connections, connKey)
metrics.TotalDenyConnections.Dec()
return nil
}
17 changes: 8 additions & 9 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@ import (
"antrea.io/antrea/pkg/util/env"
)

// We pre-allocate a slice to store expired connections with a fixed size right
// before exporting them. The advantage is every time we export, the connection
// store lock will only be held by a bounded time. The disadvantages are: 1. the
// constant is irrespective of actual number of expired connections 2. when the
// When initializing flowExporter, a slice is allocated with a fixed size to
// store expired connections. The advantage is every time we export, the connection
// store lock will only be held for a bounded time. The disadvantages are: 1. the
// constant is independent of actual number of expired connections 2. when the
// number of expired connections goes over the constant, the export can not be
// finished in a single round. It could be delayed by conntrack connections polling
// routine, which also acquires the connection store lock. The possible solutions
// are: 1. take a fraction of the size of connection store to approximate the number
// of expired connections, while having a min and a max to handle edge cases,
// e.g. min(50 + 0.1 * connectionStore.size(), 200) 2. do some experiments to find
// out the optimized constant which has a better performance.
// routine, which also acquires the connection store lock. The possible solution
// can be taking a fraction of the size of connection store to approximate the
// number of expired connections, while having a min and a max to handle edge cases,
// e.g. min(50 + 0.1 * connectionStore.size(), 200)
const maxConnsToExport = 64

var (
Expand Down
6 changes: 5 additions & 1 deletion pkg/agent/flowexporter/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import (
"antrea.io/antrea/pkg/agent/flowexporter"
)

var minExpiryTime = 100 * time.Millisecond
// minExpiryTime provides two usages: 1. We want to avoid passing a non positive
// value to ticker 2. We want to avoid processing a single expired item per call.
// If multiple items have very close expiry time(<100ms), by adding a small constant,
// we can make these items expired and process in one call.
const minExpiryTime = 100 * time.Millisecond

type ExpirePriorityQueue struct {
items []*flowexporter.ItemToExpire
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func IsConnectionDying(conn *Connection) bool {
}

// checkConntrackConnActive returns true if there are changes in connection's stats or
// TCP state, representing the connection is active.
// TCP state, indicating that the connection is active.
func CheckConntrackConnActive(conn *Connection) bool {
if (conn.OriginalPackets > conn.PrevPackets) ||
(conn.ReversePackets > conn.PrevReversePackets) ||
Expand Down

0 comments on commit f391387

Please sign in to comment.