Skip to content

Commit

Permalink
Avoid error logs in BenchmarkMultipleExportersToCollector (#323)
Browse files Browse the repository at this point in the history
* Gracefully shutdown exporter connections before stopping collector.
* Do not log an error for graceful client shutdown (EOF error).

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Sep 29, 2023
1 parent a7df05f commit a678292
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func getMessageLength(reader *bufio.Reader) (int, error) {
var msgLen uint16
err = util.Decode(bytes.NewBuffer(partialHeader[2:]), binary.BigEndian, &msgLen)
if err != nil {
return 0, fmt.Errorf("cannot decode message: %v", err)
return 0, fmt.Errorf("cannot decode message: %w", err)
}
return int(msgLen), nil
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/collector/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -70,25 +71,29 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) {
reader := bufio.NewReader(conn)
for {
length, err := getMessageLength(reader)
if errors.Is(err, io.EOF) {
klog.V(2).InfoS("Connection was closed by client")
return
}
if err != nil {
klog.Errorf("error when retrieving message length: %v", err)
klog.ErrorS(err, "Error when retrieving message length")
cp.deleteClient(address)
return
}
buff := make([]byte, length)
_, err = io.ReadFull(reader, buff)
if err != nil {
klog.Errorf("error when reading the message: %v", err)
klog.ErrorS(err, "Error when reading the message")
cp.deleteClient(address)
return
}
message, err := cp.decodePacket(bytes.NewBuffer(buff), address)
if err != nil {
klog.Error(err)
klog.ErrorS(err, "Error when decoding packet")
continue
}
klog.V(4).Infof("Processed message from exporter with observation domain ID: %v ser type: %v number of records: %v",
message.GetObsDomainID(), message.GetSet().GetSetType(), message.GetSet().GetNumberOfRecords())
klog.V(4).InfoS("Processed message from exporter",
"observationDomainID", message.GetObsDomainID(), "setType", message.GetSet().GetSetType(), "numRecords", message.GetSet().GetNumberOfRecords())
}
}()
<-cp.stopChan
Expand Down
11 changes: 10 additions & 1 deletion pkg/test/collector_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) {
}
go cp.Start()
waitForCollectorStatus(b, cp, true)
exporters := make([]*exporter.ExportingProcess, 0, numOfExporters)
b.ResetTimer()
for i := 0; i < numOfExporters; i++ {
b.StartTimer()
Expand All @@ -89,17 +90,25 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) {
exporter.SendSet(createDataSet(templateID, true, false, false))
}
b.StopTimer()
exporters = append(exporters, exporter)
time.Sleep(time.Millisecond)
}
b.StartTimer()
count := 0
for range cp.GetMsgChan() {
count++
if count == numOfRecords*numOfExporters {
cp.Stop()
break
}
}
b.StopTimer()
// Gracefully shutdown all the exporters to avoid "use of closed network connection" error
// logs.
for i := 0; i < numOfExporters; i++ {
exporters[i].CloseConnToCollector()
}
b.StartTimer()
cp.Stop()
waitForCollectorStatus(b, cp, false)
}

Expand Down

0 comments on commit a678292

Please sign in to comment.