Skip to content

Commit

Permalink
Pre-allocate expiredConns slice
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <[email protected]>
  • Loading branch information
heanlan committed Sep 14, 2021
1 parent 3872f71 commit 54cebe6
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 47 deletions.
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ func (cs *ConntrackConnectionStore) checkConnActive(conn *flowexporter.Connectio
return false
}

func (cs *ConntrackConnectionStore) GetExpiredConns(expiredConns []flowexporter.Connection, currTime time.Time) ([]flowexporter.Connection, time.Duration) {
func (cs *ConntrackConnectionStore) GetExpiredConns(expiredConns []flowexporter.Connection, currTime time.Time, maxSize int) ([]flowexporter.Connection, time.Duration) {
cs.AcquireConnStoreLock()
defer cs.ReleaseConnStoreLock()
for {
for i := 0; i < maxSize; i++ {
pqItem := cs.connectionStore.expirePriorityQueue.GetTopExpiredItem(currTime)
if pqItem == nil {
break
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
}
}

func (ds *DenyConnectionStore) GetExpiredConns(expiredConns []flowexporter.Connection, currTime time.Time) ([]flowexporter.Connection, time.Duration) {
func (ds *DenyConnectionStore) GetExpiredConns(expiredConns []flowexporter.Connection, currTime time.Time, maxSize int) ([]flowexporter.Connection, time.Duration) {
ds.AcquireConnStoreLock()
defer ds.ReleaseConnStoreLock()
for {
for i := 0; i < maxSize; i++ {
pqItem := ds.connectionStore.expirePriorityQueue.GetTopExpiredItem(currTime)
if pqItem == nil {
break
Expand Down
26 changes: 21 additions & 5 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ import (
"antrea.io/antrea/pkg/util/env"
)

// We pre-allocate a slice to store expired connections with a fixed size right
// before exporting them. The advantage is every time we export, the connection
// store lock will only be held by a bounded time. The disadvantages are: 1. the
// constant is irrespective of actual number of expired connections 2. when the
// number of expired connections goes over the constant, the export can not be
// finished in a single round. It could be delayed by conntrack connections polling
// routine, which also acquires the connection store lock. The possible solutions
// are: 1. take a fraction of the size of connection store to approximate the number
// of expired connections, while having a min and a max to handle edge cases,
// e.g. min(50 + 0.1 * connectionStore.size(), 200) 2. do some experiments to find
// out the optimized constant which has a better performance.
const maxConnsToExport = 64

var (
IANAInfoElementsCommon = []string{
"flowStartSeconds",
Expand Down Expand Up @@ -103,6 +116,7 @@ type flowExporter struct {
nodeName string
conntrackPriorityQueue *priorityqueue.ExpirePriorityQueue
denyPriorityQueue *priorityqueue.ExpirePriorityQueue
expiredConns []flowexporter.Connection
}

func genObservationID(nodeName string) uint32 {
Expand Down Expand Up @@ -158,6 +172,7 @@ func NewFlowExporter(connStore *connections.ConntrackConnectionStore, denyConnSt
nodeName: nodeName,
conntrackPriorityQueue: conntrackPriorityQueue,
denyPriorityQueue: denyPriorityQueue,
expiredConns: make([]flowexporter.Connection, 0, maxConnsToExport*2),
}, nil
}

Expand Down Expand Up @@ -209,18 +224,19 @@ func (exp *flowExporter) Run(stopCh <-chan struct{}) {

func (exp *flowExporter) sendFlowRecords() (time.Duration, error) {
currTime := time.Now()
expiredConns := make([]flowexporter.Connection, 0)
var expireTime1, expireTime2 time.Duration
expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(expiredConns, currTime)
expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(expiredConns, currTime)
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// Select the shorter time out among two connection stores to do the next round of export.
nextExpireTime := getMinTime(expireTime1, expireTime2)
for i := range expiredConns {
if err := exp.exportConn(&expiredConns[i]); err != nil {
for i := range exp.expiredConns {
if err := exp.exportConn(&exp.expiredConns[i]); err != nil {
klog.ErrorS(err, "Error when sending expired flow record")
return nextExpireTime, err
}
}
// Clear expiredConns slice after exporting. Allocated memory is kept.
exp.expiredConns = exp.expiredConns[:0]
return nextExpireTime, nil
}

Expand Down
91 changes: 53 additions & 38 deletions pkg/agent/flowexporter/exporter/exporter_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/rand"
"flag"
"fmt"
"math"
"math/big"
"net"
"testing"
Expand All @@ -48,26 +49,31 @@ var recordsReceived = 0

/*
Sample output:
go test -test.v -run=BenchmarkExport -test.benchmem -bench=BenchmarkExportConntrackConns -memprofile memprofile.out -cpuprofile profile.out
goos: linux
goarch: amd64
pkg: antrea.io/antrea/pkg/agent/flowexporter/exporter
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkExportConntrackConns (truncated output)
exporter_perf_test.go:79:
Summary:
Number of conntrack connections: 20000
Number of dying conntrack connections: 2000
Total connections received: 18703
BenchmarkExportConntrackConns-2 75 13750074 ns/op 965550 B/op 22268 allocs/op
PASS
ok antrea.io/antrea/pkg/agent/flowexporter/exporter 5.494s
go test -test.v -run=BenchmarkExport -test.benchmem -bench=BenchmarkExportConntrackConns -benchtime=100x -memprofile memprofile.out -cpuprofile profile.out
goos: linux
goarch: amd64
pkg: antrea.io/antrea/pkg/agent/flowexporter/exporter
BenchmarkExportConntrackConns-2 100 3484688 ns/op 527820 B/op 8214 allocs/op
--- BENCH: BenchmarkExportConntrackConns-2
exporter_perf_test.go:92:
Summary:
Number of conntrack connections: 20000
Number of dying conntrack connections: 2000
Total connections received: 19698
exporter_perf_test.go:92:
Summary:
Number of conntrack connections: 20000
Number of dying conntrack connections: 2000
Total connections received: 18509
... [output truncated]
PASS
ok antrea.io/antrea/pkg/agent/flowexporter/exporter 1.134s
Reference value:
#conns
20000 156 8037522 ns/op 340792 B/op 13540 allocs/op
30000 61 20510362 ns/op 1082075 B/op 43304 allocs/op
40000 39 46557414 ns/op 3180649 B/op 127687 allocs/op
50000 18 55581807 ns/op 4420593 B/op 177554 allocs/op
20000 100 3484688 ns/op 527820 B/op 8214 allocs/op
30000 100 5868374 ns/op 788098 B/op 12313 allocs/op
40000 100 7300047 ns/op 1047562 B/op 16392 allocs/op
50000 100 9312464 ns/op 1308313 B/op 20503 allocs/op
*/
func BenchmarkExportConntrackConns(b *testing.B) {
disableLogToStderr()
Expand All @@ -82,33 +88,40 @@ func BenchmarkExportConntrackConns(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
exp.initFlowExporter()
exp.sendFlowRecords()
for i := 0; i < int(math.Ceil(testNumOfConns/maxConnsToExport)); i++ {
exp.sendFlowRecords()
}
}
b.Logf("\nSummary:\nNumber of conntrack connections: %d\nNumber of dying conntrack connections: %d\nTotal connections received: %d\n", testNumOfConns, testNumOfDyingConns, recordsReceived)
}

/*
Sample output:
go test -test.v -run=BenchmarkExport -test.benchmem -bench=BenchmarkExportDenyConns -memprofile memprofile.out -cpuprofile profile.out
goos: linux
goarch: amd64
pkg: antrea.io/antrea/pkg/agent/flowexporter/exporter
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkExportDenyConns (truncated output)
exporter_perf_test.go:112:
Summary:
Number of deny connections: 20000
Number of idle deny connections: 2000
Total connections received: 19124
BenchmarkExportDenyConns-2 204 6922004 ns/op 357215 B/op 12106 allocs/op
PASS
ok antrea.io/antrea/pkg/agent/flowexporter/exporter 6.189s
go test -test.v -run=BenchmarkExport -test.benchmem -bench=BenchmarkExportDenyConns -benchtime=100x -memprofile memprofile.out -cpuprofile profile.out
goos: linux
goarch: amd64
pkg: antrea.io/antrea/pkg/agent/flowexporter/exporter
BenchmarkExportDenyConns-2 100 3714699 ns/op 507942 B/op 7037 allocs/op
--- BENCH: BenchmarkExportDenyConns-2
exporter_perf_test.go:135:
Summary:
Number of deny connections: 20000
Number of idle deny connections: 2000
Total connections received: 19742
exporter_perf_test.go:135:
Summary:
Number of deny connections: 20000
Number of idle deny connections: 2000
Total connections received: 19671
... [output truncated]
PASS
ok antrea.io/antrea/pkg/agent/flowexporter/exporter 1.331s
Reference value:
#conns
20000 210 5401396 ns/op 195415 B/op 7908 allocs/op
30000 102 11793506 ns/op 555344 B/op 22770 allocs/op
40000 64 19141650 ns/op 1239398 B/op 51008 allocs/op
50000 37 27369835 ns/op 2036012 B/op 83802 allocs/op
20000 100 3714699 ns/op 507942 B/op 7037 allocs/op
30000 100 5073132 ns/op 755810 B/op 10488 allocs/op
40000 100 7874295 ns/op 1004996 B/op 13965 allocs/op
50000 100 8681581 ns/op 1257332 B/op 17527 allocs/op
*/
func BenchmarkExportDenyConns(b *testing.B) {
disableLogToStderr()
Expand All @@ -123,7 +136,9 @@ func BenchmarkExportDenyConns(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
exp.initFlowExporter()
exp.sendFlowRecords()
for i := 0; i < int(math.Ceil(testNumOfDenyConns/maxConnsToExport)); i++ {
exp.sendFlowRecords()
}
}
b.Logf("\nSummary:\nNumber of deny connections: %d\nNumber of idle deny connections: %d\nTotal connections received: %d\n", testNumOfDenyConns, testNumOfIdleDenyConns, recordsReceived)

Expand Down

0 comments on commit 54cebe6

Please sign in to comment.