Skip to content

Commit

Permalink
Fix unexpected reporting of empty MulticastGroupInfo
Browse files Browse the repository at this point in the history
The empty MulticastGroupInfo will be reported unexpectedly during a single report session when:
1. Statistics of NP, ANP, or ANCP are getting updated.
2. MulticastGroupInfo is not refreshed.
Fix: Always report updated MulticastGroupInfo when reporting is necessary.
Also, add a minor refactor for the collector report function.

Signed-off-by: ceclinux <[email protected]>
  • Loading branch information
Ruochen authored and ceclinux committed Oct 8, 2022
1 parent b36a884 commit a928e26
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 45 deletions.
106 changes: 61 additions & 45 deletions pkg/agent/stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,58 +204,27 @@ func isIdenticalMulticastGroupMap(a, b map[string][]cpv1beta.PodReference) bool
return true
}

// report calculates the delta of the stats and pushes it to the antrea-controller summary API.
// If multicast feature gate is enabled, it also sends the full multicast group and IGMP report stats to the antrea-controller.
func (m *Collector) report(curStatsCollection *statsCollection) error {
npStats := calculateDiff(curStatsCollection.networkPolicyStats, m.lastStatsCollection.networkPolicyStats)
acnpStats := calculateRuleDiff(curStatsCollection.antreaClusterNetworkPolicyStats, m.lastStatsCollection.antreaClusterNetworkPolicyStats)
anpStats := calculateRuleDiff(curStatsCollection.antreaNetworkPolicyStats, m.lastStatsCollection.antreaNetworkPolicyStats)
func (m *Collector) calculateNPStats(curStatsCollection *statsCollection) (npStats, acnpStats, anpStats []cpv1beta.NetworkPolicyStats) {
npStats = calculateDiff(curStatsCollection.networkPolicyStats, m.lastStatsCollection.networkPolicyStats)
acnpStats = calculateRuleDiff(curStatsCollection.antreaClusterNetworkPolicyStats, m.lastStatsCollection.antreaClusterNetworkPolicyStats)
anpStats = calculateRuleDiff(curStatsCollection.antreaNetworkPolicyStats, m.lastStatsCollection.antreaNetworkPolicyStats)
return npStats, acnpStats, anpStats
}

func (m *Collector) calculateNodeStatsSummary(curStatsCollection *statsCollection) *cpv1beta.NodeStatsSummary {
var multicastGroups []cpv1beta.MulticastGroupInfo
multicastGroupsUpdated := false
npStats, acnpStats, anpStats := m.calculateNPStats(curStatsCollection)
if m.multicastEnabled {
// multicastGroups should be reported if the multicast group Pod membership has changed since the last collect.
if !isIdenticalMulticastGroupMap(m.lastStatsCollection.multicastGroups, curStatsCollection.multicastGroups) {
multicastGroupsUpdated = true
multicastGroups = make([]cpv1beta.MulticastGroupInfo, 0, len(curStatsCollection.multicastGroups))
for group, pods := range curStatsCollection.multicastGroups {
multicastGroups = append(multicastGroups, cpv1beta.MulticastGroupInfo{Group: group, Pods: pods})
}
}

// Collect statistics of IGMP report messages hit by ANP or ACNP, and merge them to anpStats and acnpStats.
// Note IGMP reports statistics may be lost if NodeStatsSummary is not reported successfully.
multicastANPStatsMap, multicastACNPStatsMap := m.multicastQuerier.CollectIGMPReportNPStats()
mergeReportStats := func(igmpReportStatsMap map[types.UID]map[string]*agenttypes.RuleMetric, originalStatsList []cpv1beta.NetworkPolicyStats) []cpv1beta.NetworkPolicyStats {
uidIndexMap := make(map[types.UID]int)
for i, stats := range originalStatsList {
uidIndexMap[stats.NetworkPolicy.UID] = i
}
for uid, npStats := range igmpReportStatsMap {
ruleStatsList := make([]statsv1alpha1.RuleTrafficStats, 0, len(npStats))
for ruleName, ruleStats := range npStats {
ruleStatsList = append(ruleStatsList, statsv1alpha1.RuleTrafficStats{Name: ruleName, TrafficStats: statsv1alpha1.TrafficStats{Packets: int64(ruleStats.Packets), Bytes: int64(ruleStats.Bytes)}})
}
index, exist := uidIndexMap[uid]
if !exist {
originalStatsList = append(originalStatsList, cpv1beta.NetworkPolicyStats{NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: uid}, RuleTrafficStats: ruleStatsList})
} else {
originalStatsList[index].RuleTrafficStats = append(originalStatsList[index].RuleTrafficStats, ruleStatsList...)
}
}
return originalStatsList
}

acnpStats = mergeReportStats(multicastACNPStatsMap, acnpStats)
anpStats = mergeReportStats(multicastANPStatsMap, anpStats)
multicastGroupsUpdated = !isIdenticalMulticastGroupMap(curStatsCollection.multicastGroups, m.lastStatsCollection.multicastGroups)
acnpStats, anpStats = m.mergeStatsWithIGMPReports(acnpStats, anpStats)
multicastGroups = m.convertMulticastGroups(curStatsCollection.multicastGroups)
}

// Semantically, reporting networkpolicy statistics with zero length is equal to reporting the same multicastGroupInfo.
if len(npStats) == 0 && len(acnpStats) == 0 && len(anpStats) == 0 && !multicastGroupsUpdated {
klog.V(4).Info("No stats to report, skip reporting")
return nil
}

summary := &cpv1beta.NodeStatsSummary{
return &cpv1beta.NodeStatsSummary{
ObjectMeta: metav1.ObjectMeta{
Name: m.nodeName,
},
Expand All @@ -264,8 +233,55 @@ func (m *Collector) report(curStatsCollection *statsCollection) error {
AntreaNetworkPolicies: anpStats,
Multicast: multicastGroups,
}
klog.V(6).Infof("Reporting NodeStatsSummary: %v", summary)
}

// mergeStatsWithIGMPReports merges acnpStats or anpStats with IGMP report statistics.
// Unlike other networkpolicystats collection process, IGMP report statistics is not collected from OVS flows. It was collected during IGMP packetIn process by a local cache.
// IGMP report statistics collected for a rule should be merged into already defined networkpolicy statistics before reporting.
func (m *Collector) mergeStatsWithIGMPReports(acnpStats, anpStats []cpv1beta.NetworkPolicyStats) ([]cpv1beta.NetworkPolicyStats, []cpv1beta.NetworkPolicyStats) {
multicastANPStatsMap, multicastACNPStatsMap := m.multicastQuerier.CollectIGMPReportNPStats()
mergeReportStats := func(igmpReportStatsMap map[types.UID]map[string]*agenttypes.RuleMetric, originalStatsList []cpv1beta.NetworkPolicyStats) []cpv1beta.NetworkPolicyStats {
uidIndexMap := make(map[types.UID]int)
for i, stats := range originalStatsList {
uidIndexMap[stats.NetworkPolicy.UID] = i
}
for uid, npStats := range igmpReportStatsMap {
ruleStatsList := make([]statsv1alpha1.RuleTrafficStats, 0, len(npStats))
for ruleName, ruleStats := range npStats {
ruleStatsList = append(ruleStatsList, statsv1alpha1.RuleTrafficStats{Name: ruleName, TrafficStats: statsv1alpha1.TrafficStats{Packets: int64(ruleStats.Packets), Bytes: int64(ruleStats.Bytes)}})
}
index, exist := uidIndexMap[uid]
if !exist {
originalStatsList = append(originalStatsList, cpv1beta.NetworkPolicyStats{NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: uid}, RuleTrafficStats: ruleStatsList})
} else {
originalStatsList[index].RuleTrafficStats = append(originalStatsList[index].RuleTrafficStats, ruleStatsList...)
}
}
return originalStatsList
}

return mergeReportStats(multicastACNPStatsMap, acnpStats), mergeReportStats(multicastANPStatsMap, anpStats)
}

// convertMulticastGroups converts multicastGroupMap into a slice of multicastGroups.
// Calculating diff is not needed because we report full multicast group of the local node.
func (m *Collector) convertMulticastGroups(multicastGroupMap map[string][]cpv1beta.PodReference) []cpv1beta.MulticastGroupInfo {
multicastGroups := make([]cpv1beta.MulticastGroupInfo, 0, len(multicastGroupMap))
for group, pods := range multicastGroupMap {
multicastGroups = append(multicastGroups, cpv1beta.MulticastGroupInfo{Group: group, Pods: pods})
}
return multicastGroups
}

// report calculates the delta of the stats and pushes it to the antrea-controller summary API.
// If multicast feature gate is enabled, it also sends the full multicast group and IGMP report stats to the antrea-controller.
func (m *Collector) report(curStatsCollection *statsCollection) error {
summary := m.calculateNodeStatsSummary(curStatsCollection)
if summary == nil {
klog.V(4).Info("No stats to report, skip reporting")
return nil
}
klog.V(4).InfoS("Reporting NodeStatsSummary", "NodeStatsSummary", summary)
antreaClient, err := m.antreaClientProvider.GetAntreaClient()
if err != nil {
return err
Expand Down
83 changes: 83 additions & 0 deletions pkg/agent/stats/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,89 @@ func TestCalculateDiff(t *testing.T) {
}
}

func TestCalculateNodeStatsSummary(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

tests := []struct {
name string
lastStatsCollection *statsCollection
curStatsCollection *statsCollection
expectedSummary *cpv1beta.NodeStatsSummary
}{
{
name: "only multicaststats",
lastStatsCollection: &statsCollection{
multicastGroups: map[string][]cpv1beta.PodReference{
"225.3.4.5": {
{Name: "bar2", Namespace: "foo2"},
},
},
},
curStatsCollection: &statsCollection{
multicastGroups: map[string][]cpv1beta.PodReference{
"225.3.4.5": {
{Name: "bar2", Namespace: "foo2"},
},
},
},
expectedSummary: nil,
},
{
name: "anp and multicaststats",
lastStatsCollection: &statsCollection{
multicastGroups: map[string][]cpv1beta.PodReference{
"225.3.4.5": {
{Name: "bar3", Namespace: "foo3"},
},
},
},
curStatsCollection: &statsCollection{
networkPolicyStats: map[types.UID]*statsv1alpha1.TrafficStats{
np1.UID: {
Bytes: 25,
Packets: 3,
Sessions: 2,
},
},
multicastGroups: map[string][]cpv1beta.PodReference{
"225.3.4.5": {
{Name: "bar3", Namespace: "foo3"},
},
},
},
expectedSummary: &cpv1beta.NodeStatsSummary{
Multicast: []cpv1beta.MulticastGroupInfo{
{
Group: "225.3.4.5", Pods: []cpv1beta.PodReference{
{Name: "bar3", Namespace: "foo3"}},
},
},
NetworkPolicies: []cpv1beta.NetworkPolicyStats{
{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: "uid1"},
TrafficStats: statsv1alpha1.TrafficStats{
Bytes: 25,
Packets: 3,
Sessions: 2,
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mcQuerier := queriertest.NewMockAgentMulticastInfoQuerier(ctrl)
mcQuerier.EXPECT().CollectIGMPReportNPStats().AnyTimes()

m := &Collector{multicastQuerier: mcQuerier, lastStatsCollection: tt.lastStatsCollection, multicastEnabled: true}
summary := m.calculateNodeStatsSummary(tt.curStatsCollection)
assert.Equal(t, tt.expectedSummary, summary)
})
}
}

func TestCalculateRuleDiff(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit a928e26

Please sign in to comment.