From 8b6ee1a802f0eefc54a554302d2929b451a4daa7 Mon Sep 17 00:00:00 2001 From: Srikar Tati Date: Wed, 12 May 2021 18:25:24 -0700 Subject: [PATCH] Flow Aggregator perf benchmarking Added test for IntraFlowRecords. Simulated 10 exporters and each of them send 1000 flow records. Used benchmark in-built tools like cpuprofile, memprofile, benchmem etc., to get the performance metrics. --- go.mod | 1 + .../flowaggregator_perf_test.go | 442 ++++++++++++++++++ pkg/flowaggregator/flowaggregator_test.go | 4 +- pkg/ipfix/ipfix_collector.go | 5 + 4 files changed, 451 insertions(+), 1 deletion(-) create mode 100644 pkg/flowaggregator/flowaggregator_perf_test.go diff --git a/go.mod b/go.mod index 175f7eeef3d..725fceb829b 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( k8s.io/apiserver v0.21.0 k8s.io/client-go v0.21.0 k8s.io/component-base v0.21.0 + k8s.io/klog v1.0.0 k8s.io/klog/v2 v2.8.0 k8s.io/kube-aggregator v0.21.0 k8s.io/kube-openapi v0.0.0-20210305164622-f622666832c1 diff --git a/pkg/flowaggregator/flowaggregator_perf_test.go b/pkg/flowaggregator/flowaggregator_perf_test.go new file mode 100644 index 00000000000..00457978836 --- /dev/null +++ b/pkg/flowaggregator/flowaggregator_perf_test.go @@ -0,0 +1,442 @@ +// +build !race + +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flowaggregator + +import ( + "flag" + "fmt" + "math/rand" + "net" + "strconv" + "strings" + "sync" + "testing" + "time" + + ipfixcollector "github.com/vmware/go-ipfix/pkg/collector" + ipfixentities "github.com/vmware/go-ipfix/pkg/entities" + ipfixexporter "github.com/vmware/go-ipfix/pkg/exporter" + ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" + "k8s.io/apimachinery/pkg/util/wait" + clienttest "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/flowexporter" + "antrea.io/antrea/pkg/agent/flowexporter/exporter" +) + +var ( + numRecords = 1000 + numMessages = 0 + numExporters = 10 + bufferPool sync.Pool +) + +func BenchmarkIntraNodeFlowRecords(b *testing.B) { + disableLogToStderr() + stopCh := make(chan struct{}) + + ipfixregistry.LoadRegistry() + + bufferPool = sync.Pool{ + New: func() interface{} { return make([]byte, 400) }, + } + testExportInterval = 500 * time.Millisecond + var wg sync.WaitGroup + localCollector := startLocalCollector(b, stopCh) + k8sClient := clienttest.NewSimpleClientset() + flowAgg := NewFlowAggregator( + localCollector.String(), + localCollector.Network(), + testExportInterval, + AggregatorTransportProtocolTCP, + "127.0.0.1:0:tcp", + k8sClient, + testObservationDomainID) + err := flowAgg.InitCollectingProcess() + if err != nil { + b.Fatalf("Error when creating collecting process in Flow Aggregator: %v", err) + } + err = flowAgg.InitAggregationProcess() + if err != nil { + b.Fatalf("Error when creating aggregation process in Flow Aggregator: %v", err) + } + go flowAgg.Run(stopCh) + + waitForCollectorReady(b, flowAgg.collectingProcess.GetCollectingProcess()) + // Start multiple exporters that simulate Antrea Agent Flow Exporters. + for i := 0; i < numExporters; i++ { + nodeName := "exporter-" + strconv.Itoa(i+1) + wg.Add(1) + go startExporter(b, flowAgg.collectingProcess.GetCollectingProcess(), &wg, nodeName, uint32(i)) + } + wg.Wait() + <-stopCh +} + +func startExporter(b *testing.B, cp *ipfixcollector.CollectingProcess, wg *sync.WaitGroup, nodeName string, nodeID uint32) { + defer wg.Done() + epInput := ipfixexporter.ExporterInput{ + CollectorAddress: cp.GetAddress().String(), + CollectorProtocol: cp.GetAddress().Network(), + ObservationDomainID: nodeID, + TempRefTimeout: 0, + PathMTU: 0, + IsEncrypted: false, + CACert: nil, + } + + exportingProcess, err := ipfixexporter.InitExportingProcess(epInput) + if err != nil { + b.Fatalf("Got error when connecting to %s", cp.GetAddress().String()) + } + defer exportingProcess.CloseConnToCollector() // Close exporting process + set := ipfixentities.NewSet(false) + if err = set.PrepareSet(ipfixentities.Template, testTemplateIDv4); err != nil { + b.Errorf("Error when preparing the set: %v", err) + return + } + // Send template set. + elements, err := sendTemplateSet(exportingProcess, set, false) + if err != nil { + b.Errorf("Error when sending template set: %v", err) + return + } + set.ResetSet() + + // Send multiple records + for i := 0; i < numRecords; i++ { + if i % 50 == 0 { + // Pausing after few records to simulate the behavior of Flow Exporter. + time.Sleep(250 * time.Millisecond) + } + if err = set.PrepareSet(ipfixentities.Data, testTemplateIDv4); err != nil { + b.Errorf("Error when preparing the set: %v", err) + return + } + err = sendDataSet(exportingProcess, set, elements, nodeName) + if err != nil { + b.Errorf("Error when sending data set: %v", err) + return + } + set.ResetSet() + } +} + +func startLocalCollector(b *testing.B, stopCh chan struct{}) net.Addr { + address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + b.Error(err) + } + listener, err := net.ListenUDP("udp", address) + if err != nil { + b.Fatalf("Got error when creating a local udp server: %v", err) + } + listener.SetReadBuffer(1048576) + + go func() { + defer listener.Close() + for { + buff := bufferPool.Get().([]byte) + size, _, err := listener.ReadFrom(buff[0:]) + if err != nil { + if size == 0 { // received stop collector message + return + } + b.Errorf("Error in udp collecting process: %v", err) + return + } + numMessages = numMessages + 1 + if numMessages == (numRecords * numExporters) + 1 { + b.Logf("Num messages : %v", numMessages) + close(stopCh) + return + } + } + }() + return listener.LocalAddr() +} + +func sendTemplateSet(exportProcess *ipfixexporter.ExportingProcess, templateSet ipfixentities.Set, isIPv6 bool) ([]*ipfixentities.InfoElementWithValue, error) { + elements := make([]*ipfixentities.InfoElementWithValue, 0) + + IANAInfoElements := exporter.IANAInfoElementsIPv4 + AntreaInfoElements := exporter.AntreaInfoElementsIPv4 + templateID := testTemplateIDv4 + if isIPv6 { + IANAInfoElements = exporter.IANAInfoElementsIPv6 + AntreaInfoElements = exporter.AntreaInfoElementsIPv6 + templateID = testTemplateIDv6 + } + for _, ie := range IANAInfoElements { + element, err := ipfixregistry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID) + if err != nil { + return nil, fmt.Errorf("%s not present. returned error: %v", ie, err) + } + ieWithValue := ipfixentities.NewInfoElementWithValue(element, nil) + elements = append(elements, ieWithValue) + } + for _, ie := range exporter.IANAReverseInfoElements { + element, err := ipfixregistry.GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID) + if err != nil { + return nil, fmt.Errorf("%s not present. returned error: %v", ie, err) + } + ieWithValue := ipfixentities.NewInfoElementWithValue(element, nil) + elements = append(elements, ieWithValue) + } + for _, ie := range AntreaInfoElements { + element, err := ipfixregistry.GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return nil, fmt.Errorf("information element %s is not present in Antrea registry", ie) + } + ieWithValue := ipfixentities.NewInfoElementWithValue(element, nil) + elements = append(elements, ieWithValue) + } + + err := templateSet.AddRecord(elements, templateID) + if err != nil { + return nil, fmt.Errorf("error in adding record to template set: %v", err) + } + + _, err = exportProcess.SendSet(templateSet) + if err != nil { + return nil, fmt.Errorf("error in IPFIX exporting process when sending template record: %v", err) + } + + return elements, nil +} + +func sendDataSet(exportProcess *ipfixexporter.ExportingProcess, dataSet ipfixentities.Set, elements []*ipfixentities.InfoElementWithValue, nodeName string) error { + record := getFlowRecord() + // Iterate over all infoElements in the list + for _, ie := range elements { + switch ieName := ie.Element.Name; ieName { + case "flowStartSeconds": + ie.Value = uint32(record.Conn.StartTime.Unix()) + case "flowEndSeconds": + ie.Value = uint32(record.Conn.StopTime.Unix()) + case "flowEndReason": + if flowexporter.IsConnectionDying(&record.Conn) { + ie.Value = ipfixregistry.EndOfFlowReason + } else if record.IsActive { + ie.Value = ipfixregistry.ActiveTimeoutReason + } else { + ie.Value = ipfixregistry.IdleTimeoutReason + } + case "sourceIPv4Address": + ie.Value = record.Conn.TupleOrig.SourceAddress + case "destinationIPv4Address": + ie.Value = record.Conn.TupleReply.SourceAddress + case "sourceIPv6Address": + ie.Value = record.Conn.TupleOrig.SourceAddress + case "destinationIPv6Address": + ie.Value = record.Conn.TupleReply.SourceAddress + case "sourceTransportPort": + ie.Value = record.Conn.TupleOrig.SourcePort + case "destinationTransportPort": + ie.Value = record.Conn.TupleReply.SourcePort + case "protocolIdentifier": + ie.Value = record.Conn.TupleOrig.Protocol + case "packetTotalCount": + ie.Value = record.Conn.OriginalPackets + case "octetTotalCount": + ie.Value = record.Conn.OriginalBytes + case "packetDeltaCount": + deltaPkts := int64(record.Conn.OriginalPackets) - int64(record.PrevPackets) + if deltaPkts < 0 { + klog.Warningf("Packet delta count for connection should not be negative: %d", deltaPkts) + } + ie.Value = uint64(deltaPkts) + case "octetDeltaCount": + deltaBytes := int64(record.Conn.OriginalBytes) - int64(record.PrevBytes) + if deltaBytes < 0 { + klog.Warningf("Byte delta count for connection should not be negative: %d", deltaBytes) + } + ie.Value = uint64(deltaBytes) + case "reversePacketTotalCount": + ie.Value = record.Conn.ReversePackets + case "reverseOctetTotalCount": + ie.Value = record.Conn.ReverseBytes + case "reversePacketDeltaCount": + deltaPkts := int64(record.Conn.ReversePackets) - int64(record.PrevReversePackets) + if deltaPkts < 0 { + klog.Warningf("Packet delta count for connection should not be negative: %d", deltaPkts) + } + ie.Value = uint64(deltaPkts) + case "reverseOctetDeltaCount": + deltaBytes := int64(record.Conn.ReverseBytes) - int64(record.PrevReverseBytes) + if deltaBytes < 0 { + klog.Warningf("Byte delta count for connection should not be negative: %d", deltaBytes) + } + ie.Value = uint64(deltaBytes) + case "sourcePodNamespace": + ie.Value = record.Conn.SourcePodNamespace + case "sourcePodName": + ie.Value = record.Conn.SourcePodName + case "sourceNodeName": + // Add nodeName for only local pods whose pod names are resolved. + if record.Conn.SourcePodName != "" { + ie.Value = nodeName + } else { + ie.Value = "" + } + case "destinationPodNamespace": + ie.Value = record.Conn.DestinationPodNamespace + case "destinationPodName": + ie.Value = record.Conn.DestinationPodName + case "destinationNodeName": + // Add nodeName for only local pods whose pod names are resolved. + if record.Conn.DestinationPodName != "" { + ie.Value = nodeName + } else { + ie.Value = "" + } + case "destinationClusterIPv4": + if record.Conn.DestinationServicePortName != "" { + ie.Value = record.Conn.TupleOrig.DestinationAddress + } else { + // Sending dummy IP as IPFIX collector expects constant length of data for IP field. + // We should probably think of better approach as this involves customization of IPFIX collector to ignore + // this dummy IP address. + ie.Value = net.IP{0, 0, 0, 0} + } + case "destinationClusterIPv6": + if record.Conn.DestinationServicePortName != "" { + ie.Value = record.Conn.TupleOrig.DestinationAddress + } else { + // Same as destinationClusterIPv4. + ie.Value = net.ParseIP("::") + } + case "destinationServicePort": + if record.Conn.DestinationServicePortName != "" { + ie.Value = record.Conn.TupleOrig.DestinationPort + } else { + ie.Value = uint16(0) + } + case "destinationServicePortName": + if record.Conn.DestinationServicePortName != "" { + ie.Value = record.Conn.DestinationServicePortName + } else { + ie.Value = "" + } + case "ingressNetworkPolicyName": + ie.Value = record.Conn.IngressNetworkPolicyName + case "ingressNetworkPolicyNamespace": + ie.Value = record.Conn.IngressNetworkPolicyNamespace + case "egressNetworkPolicyName": + ie.Value = record.Conn.EgressNetworkPolicyName + case "egressNetworkPolicyNamespace": + ie.Value = record.Conn.EgressNetworkPolicyNamespace + case "tcpState": + ie.Value = record.Conn.TCPState + case "flowType": + ie.Value = ipfixregistry.IntraNode + } + } + + templateID := testTemplateIDv4 + if record.IsIPv6 { + templateID = testTemplateIDv6 + } + err := dataSet.AddRecord(elements, templateID) + if err != nil { + return fmt.Errorf("error in adding record to data set: %v", err) + } + _, err = exportProcess.SendSet(dataSet) + if err != nil { + return err + } + + return nil +} + +func getFlowRecord() *flowexporter.FlowRecord { + src := net.ParseIP("192.168.0.100") + dst := net.ParseIP("192.169.0.200") + srcPort := uint16(rand.Uint32()) + dstPort := uint16(rand.Uint32()) + tuple, revTuple := makeTuple(&src, &dst, 6, srcPort, dstPort) + conn := flowexporter.Connection{ + StartTime: time.Now(), + StopTime: time.Now(), + IsPresent: true, + DoneExport: false, + TupleOrig: tuple, + TupleReply: revTuple, + OriginalPackets: 100, + OriginalBytes: 10, + ReversePackets: 50, + ReverseBytes: 5, + SourcePodNamespace: "ns1", + SourcePodName: "pod1", + DestinationPodNamespace: "ns2", + DestinationPodName: "pod2", + DestinationServicePortName: "service", + TCPState: "SYN_SENT", + } + record := &flowexporter.FlowRecord{ + Conn: conn, + PrevPackets: 0, + PrevBytes: 0, + PrevReversePackets: 0, + PrevReverseBytes: 0, + LastExportTime: time.Now(), + IsActive: true, + } + return record +} + +func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (flowexporter.Tuple, flowexporter.Tuple) { + tuple := flowexporter.Tuple{ + SourceAddress: *srcIP, + DestinationAddress: *dstIP, + Protocol: protoID, + SourcePort: srcPort, + DestinationPort: dstPort, + } + revTuple := flowexporter.Tuple{ + SourceAddress: *dstIP, + DestinationAddress: *srcIP, + Protocol: protoID, + SourcePort: dstPort, + DestinationPort: srcPort, + } + return tuple, revTuple +} + +func waitForCollectorReady(b *testing.B, cp *ipfixcollector.CollectingProcess) { + checkConn := func() (bool, error) { + if strings.Split(cp.GetAddress().String(), ":")[1] == "0" { + return false, fmt.Errorf("random port is not resolved") + } + connection, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String()) + if err != nil { + return false, err + } + connection.Close() + return true, nil + } + if err := wait.Poll(100*time.Millisecond, 1*time.Second, checkConn); err != nil { + b.Errorf("Cannot establish connection to %s", cp.GetAddress().String()) + } +} + +func disableLogToStderr() { + klogFlagSet := flag.NewFlagSet("klog", flag.ContinueOnError) + klog.InitFlags(klogFlagSet) + klogFlagSet.Parse([]string{"-logtostderr=false"}) +} diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index ed7d3e38da0..1ff4641fc67 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -32,9 +32,11 @@ import ( const ( testTemplateIDv4 = uint16(256) testTemplateIDv6 = uint16(257) - testExportInterval = 60 * time.Second testObservationDomainID = 0xabcd ) +var ( + testExportInterval = 60 * time.Second +) func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { ctrl := gomock.NewController(t) diff --git a/pkg/ipfix/ipfix_collector.go b/pkg/ipfix/ipfix_collector.go index b4f7b51d33c..ace07a35c96 100644 --- a/pkg/ipfix/ipfix_collector.go +++ b/pkg/ipfix/ipfix_collector.go @@ -28,6 +28,7 @@ type IPFIXCollectingProcess interface { Start() Stop() GetMsgChan() chan *ipfixentities.Message + GetCollectingProcess() *ipfixcollect.CollectingProcess } type ipfixCollectingProcess struct { @@ -56,3 +57,7 @@ func (cp *ipfixCollectingProcess) Stop() { func (cp *ipfixCollectingProcess) GetMsgChan() chan *ipfixentities.Message { return cp.CollectingProcess.GetMsgChan() } + +func (cp *ipfixCollectingProcess) GetCollectingProcess() *ipfixcollect.CollectingProcess { + return cp.CollectingProcess +}