Skip to content

Commit

Permalink
Fix aggregation bug for throughput common fields (#324)
Browse files Browse the repository at this point in the history
Just like for stats, throughput common fields should only be updated for
the "latest" record (based on "flowEndSeconds").

This bug explains why the TestCollectorToIntermediate integration test
has been flaky for a long time. In some cases, the record from the
destination was processed before the record from the source (we have 2
aggregation workers, so records are not guaranteed to be processed in
the order in which they are received). While the destination record has
a larger "flowEndSeconds", the "throughput" and "reverseThroughput"
common elements were overriden by values from the source record.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Oct 2, 2023
1 parent a678292 commit f5c5793
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
8 changes: 5 additions & 3 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,10 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent
ie, _, _ := existingRecord.GetInfoElementWithValue(antreaDestinationThroughputElements[i])
ie.SetUnsigned64Value(throughputVals[i])
}
ie, _, _ := existingRecord.GetInfoElementWithValue(element)
ie.SetUnsigned64Value(throughputVals[i])
if isLatest {
ie, _, _ := existingRecord.GetInfoElementWithValue(element)
ie.SetUnsigned64Value(throughputVals[i])
}
}
return nil
}
Expand Down Expand Up @@ -756,7 +758,7 @@ func (a *AggregationProcess) addFieldsForThroughputCalculation(record entities.R
return err
}
value := uint32(0)
if fillSrcStats && strings.Contains(ieName, "Source") || fillDstStats && strings.Contains(ieName, "Destination") {
if (fillSrcStats && strings.Contains(ieName, "Source")) || (fillDstStats && strings.Contains(ieName, "Destination")) {
value = timeEnd
}
if err = record.AddInfoElement(entities.NewUnsigned32InfoElement(ie, value)); err != nil {
Expand Down
10 changes: 6 additions & 4 deletions pkg/test/collector_intermediate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// Run TestSingleRecordTCPTransport and TestSingleRecordTCPTransportIPv6 along with
// debug log for the message in pkg/exporter/process.go before sending it to get following
// raw bytes for template and data packets.
// Following data packets are generated with getTestRecord in exporter_collector_test.go
// Following data packets are generated with getTestRecord in util.go
// dataPacket1IPv4: getTestRecord(true, false)
// dataPacket2IPv4: getTestRecord(false, false)
// dataPacket1IPv6: getTestRecord(true, true)
Expand Down Expand Up @@ -164,7 +164,8 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) {
ap, _ := intermediate.InitAggregationProcess(apInput)
go cp.Start()
waitForCollectorReady(t, cp)
go func() {
go ap.Start()
func() {
collectorAddr, _ := net.ResolveTCPAddr("tcp", cp.GetAddress().String())
conn, err := net.DialTCP("tcp", nil, collectorAddr)
if err != nil {
Expand All @@ -181,7 +182,6 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) {
conn.Write(dataPacket2IPv4)
}
}()
go ap.Start()
if isIPv6 {
waitForAggregationToFinish(t, ap, flowKey2)
} else {
Expand Down Expand Up @@ -276,9 +276,11 @@ func waitForCollectorReady(t *testing.T, cp *collector.CollectingProcess) {
if strings.Split(cp.GetAddress().String(), ":")[1] == "0" {
return false, fmt.Errorf("random port is not resolved")
}
if _, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String()); err != nil {
conn, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String())
if err != nil {
return false, err
}
conn.Close()
return true, nil
}
if err := wait.Poll(100*time.Millisecond, 500*time.Millisecond, checkConn); err != nil {
Expand Down

0 comments on commit f5c5793

Please sign in to comment.