Skip to content

Commit

Permalink
Fix collector close issue and add perf test for multiple exporters to…
Browse files Browse the repository at this point in the history
… collector
  • Loading branch information
zyiou committed Aug 6, 2021
1 parent c65cc6d commit adaf81e
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 235 deletions.
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -518,7 +517,6 @@ golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -576,8 +574,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down
9 changes: 3 additions & 6 deletions pkg/collector/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,18 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) {
length, err := getMessageLength(bytes.NewBuffer(buffBytes))
if err != nil {
klog.Error(err)
cp.deleteClient(address)
return
break
}
if size < length {
klog.Errorf("Message length %v is larger than size read from buffer %v", length, size)
cp.deleteClient(address)
return
break
}
size = size - length
// get the message here
message, err := cp.decodePacket(bytes.NewBuffer(buffBytes[0:length]), address)
if err != nil {
klog.Error(err)
cp.deleteClient(address)
return
break
}
klog.V(4).Infof("Processed message from exporter %v, number of records: %v, observation domain ID: %v",
message.GetExportAddress(), message.GetSet().GetNumberOfRecords(), message.GetObsDomainID())
Expand Down
1 change: 1 addition & 0 deletions pkg/exporter/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (ep *ExportingProcess) createAndSendMsg(set entities.Set) (int, error) {
ep.connMutex.Lock()
defer ep.connMutex.Unlock()
bytesSent, err := ep.connToCollector.Write(bytesSlice)

if err != nil {
return bytesSent, fmt.Errorf("error when sending message on the connection: %v", err)
} else if bytesSent != msgLen {
Expand Down
2 changes: 1 addition & 1 deletion pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

var (
MaxRetries = 5
MaxRetries = 2
MinExpiryTime = 100 * time.Millisecond
)

Expand Down
130 changes: 130 additions & 0 deletions pkg/test/collector_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2021 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !race

package test

import (
"flag"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"github.com/vmware/go-ipfix/pkg/collector"
"github.com/vmware/go-ipfix/pkg/exporter"
"github.com/vmware/go-ipfix/pkg/registry"
)

func init() {
// Load the global registry
registry.LoadRegistry()
}

const (
numOfExporters = 1000
numOfRecords = 5
)

/*
Sample output:
go test -test.v -run=Benchmark -test.benchmem -bench=BenchmarkMultipleExportersToCollector -memprofile memprofile.out -cpuprofile profile.out
goos: darwin
goarch: amd64
pkg: github.com/vmware/go-ipfix/pkg/test
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkMultipleExportersToCollector
BenchmarkMultipleExportersToCollector-12 103 38734531 ns/op 283923 B/op 8547 allocs/op
PASS
ok github.com/vmware/go-ipfix/pkg/test 7.090s
*/
func BenchmarkMultipleExportersToCollector(b *testing.B) {
disableLogToStderr()
ctrl := gomock.NewController(b)
defer ctrl.Finish()
collectorInput := collector.CollectorInput{
Address: "127.0.0.1:0",
Protocol: "tcp",
MaxBufferSize: 1024,
TemplateTTL: 0,
}
cp, err := collector.InitCollectingProcess(collectorInput)
if err != nil {
b.Fatalf("cannot start collecting process on %s: %v", cp.GetAddress().String(), err)
}
go cp.Start()
waitForCollectorStatus(b, cp, true)
b.ResetTimer()
for i := 0; i < numOfExporters; i++ {
exporterInput := exporter.ExporterInput{
CollectorAddress: cp.GetAddress().String(),
CollectorProtocol: cp.GetAddress().Network(),
ObservationDomainID: uint32(i),
}
exporter, err := exporter.InitExportingProcess(exporterInput)
if err != nil {
b.Errorf("cannot start exporting process: %v", err)
}
templateID := exporter.NewTemplateID()
go func() {
exporter.SendSet(createTemplateSet(templateID, false))
for j := 0; j < numOfRecords; j++ {
exporter.SendSet(createDataSet(templateID, true, false, false))
}
}()
}
count := 0
for range cp.GetMsgChan() {
count++
if count == numOfRecords*numOfExporters {
cp.Stop()
break
}
}
waitForCollectorStatus(b, cp, false)
b.StopTimer()
}

func disableLogToStderr() {
klogFlagSet := flag.NewFlagSet("klog", flag.ContinueOnError)
klog.InitFlags(klogFlagSet)
klogFlagSet.Parse([]string{"-logtostderr=false"})
}

func waitForCollectorStatus(b *testing.B, cp *collector.CollectingProcess, checkReady bool) {
checkConn := func() (bool, error) {
if conn, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String()); err != nil {
if checkReady {
return false, err
} else {
return true, nil
}
} else {
if checkReady {
conn.Close()
return true, nil
} else {
return false, err
}
}
}
if err := wait.Poll(100*time.Millisecond, 500*time.Millisecond, checkConn); err != nil {
b.Fatalf("cannot establish connection to %s", cp.GetAddress().String())
}
}
Loading

0 comments on commit adaf81e

Please sign in to comment.