Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ceclinux committed Jan 28, 2021
1 parent 25bd340 commit 2a8ecf9
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 74 deletions.
48 changes: 33 additions & 15 deletions pkg/agent/stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/vmware-tanzu/antrea/pkg/agent"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
agenttypes "github.com/vmware-tanzu/antrea/pkg/agent/types"
cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
statsv1alpha1 "github.com/vmware-tanzu/antrea/pkg/apis/stats/v1alpha1"
"github.com/vmware-tanzu/antrea/pkg/querier"
Expand Down Expand Up @@ -129,9 +130,7 @@ func (m *Collector) collect() *statsCollection {
policyStats = new(statsv1alpha1.TrafficStats)
statsMap[rule.PolicyRef.UID] = policyStats
}
policyStats.Bytes += int64(ruleStats.Bytes)
policyStats.Sessions += int64(ruleStats.Sessions)
policyStats.Packets += int64(ruleStats.Packets)
addUp(policyStats, ruleStats)
case cpv1beta.AntreaClusterNetworkPolicy:
ruleStatsMap = acnpStatsMap
lastRuleStats, exists := ruleStatsMap[rule.PolicyRef.UID]
Expand All @@ -144,9 +143,7 @@ func (m *Collector) collect() *statsCollection {
trafficStats = new(statsv1alpha1.TrafficStats)
lastRuleStats[rule.Name] = trafficStats
}
trafficStats.Bytes += int64(ruleStats.Bytes)
trafficStats.Packets += int64(ruleStats.Packets)
trafficStats.Sessions += int64(ruleStats.Sessions)
addUp(trafficStats, ruleStats)
case cpv1beta.AntreaNetworkPolicy:
ruleStatsMap = anpStatsMap
lastRuleStats, exists := ruleStatsMap[rule.PolicyRef.UID]
Expand All @@ -160,9 +157,7 @@ func (m *Collector) collect() *statsCollection {
trafficStats = new(statsv1alpha1.TrafficStats)
lastRuleStats[rule.Name] = trafficStats
}
trafficStats.Bytes += int64(ruleStats.Bytes)
trafficStats.Packets += int64(ruleStats.Packets)
trafficStats.Sessions += int64(ruleStats.Sessions)
addUp(trafficStats, ruleStats)
}
}
return &statsCollection{
Expand All @@ -172,6 +167,12 @@ func (m *Collector) collect() *statsCollection {
}
}

func addUp(stats *statsv1alpha1.TrafficStats, inc *agenttypes.RuleMetric) {
stats.Sessions += int64(inc.Sessions)
stats.Packets += int64(inc.Packets)
stats.Bytes += int64(inc.Bytes)
}

// report calculates the delta of the stats and pushes it to the antrea-controller summary API.
func (m *Collector) report(curStatsCollection *statsCollection) error {
npStats := calculateDiff(curStatsCollection.networkPolicyStats, m.lastStatsCollection.networkPolicyStats)
Expand Down Expand Up @@ -211,29 +212,46 @@ func calculateRuleDiff(curStatsMap, lastStatsMap map[types.UID]map[string]*stats
stats := make(map[string]statsv1alpha1.TrafficStats)
if !exists {
for name, curRuleStats := range curStats {
if curRuleStats.Bytes == 0 {
continue
}
stats[name] = statsv1alpha1.TrafficStats{
Packets: curRuleStats.Packets,
Sessions: curRuleStats.Sessions,
Bytes: curRuleStats.Bytes,
}
}
} else {
flag := false
for name, curRuleStats := range curStats {
lastRuleStats, ruleStatsExists := lastStats[name]
if !ruleStatsExists || curRuleStats.Bytes < lastRuleStats.Bytes {
if ruleStatsExists && curRuleStats.Bytes < lastRuleStats.Bytes {
// if one of the byte stats of last rule stats greater than
// curRuleStats's byte stats, it indicates OVS is restarted
// or NetworkPolicy is restarted
// In these cases, curStats is the delta it should report.
flag = true
}
}
for name, curRuleStats := range curStats {
lastRuleStats, ruleStatsExists := lastStats[name]
if flag || !ruleStatsExists {
stats[name] = *curRuleStats
} else {
if curRuleStats.Bytes == lastRuleStats.Bytes {
continue
}
stats[name] = statsv1alpha1.TrafficStats{
Packets: curRuleStats.Packets - lastRuleStats.Packets,
Sessions: curRuleStats.Sessions - lastRuleStats.Sessions,
Bytes: curRuleStats.Bytes - lastRuleStats.Bytes,
Sessions: curRuleStats.Sessions - lastRuleStats.Sessions,
Packets: curRuleStats.Packets - lastRuleStats.Packets,
}
}
if stats[name].Bytes == 0 {
continue
}
}
}
if len(stats) == 0 {
continue
}
policyStats := cpv1beta.AntreaNetworkPolicyStats{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: uid},
RuleTrafficStats: stats,
Expand Down
220 changes: 208 additions & 12 deletions pkg/agent/stats/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func TestCollect(t *testing.T) {
Sessions: 3,
},
},
antreaClusterNetworkPolicyStats: map[types.UID]*statsv1alpha1.TrafficStats{},
antreaNetworkPolicyStats: map[types.UID]*statsv1alpha1.TrafficStats{},
antreaClusterNetworkPolicyStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{},
antreaNetworkPolicyStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{},
},
},
{
Expand Down Expand Up @@ -138,18 +138,22 @@ func TestCollect(t *testing.T) {
Sessions: 1,
},
},
antreaClusterNetworkPolicyStats: map[types.UID]*statsv1alpha1.TrafficStats{
antreaClusterNetworkPolicyStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{
acnp1.UID: {
Bytes: 15,
Packets: 2,
Sessions: 1,
"rule1": {
Bytes: 15,
Packets: 2,
Sessions: 1,
},
},
},
antreaNetworkPolicyStats: map[types.UID]*statsv1alpha1.TrafficStats{
antreaNetworkPolicyStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{
anp1.UID: {
Bytes: 30,
Packets: 5,
Sessions: 3,
"rule2": {
Bytes: 30,
Packets: 5,
Sessions: 3,
},
},
},
},
Expand Down Expand Up @@ -180,8 +184,8 @@ func TestCollect(t *testing.T) {
Sessions: 1,
},
},
antreaClusterNetworkPolicyStats: map[types.UID]*statsv1alpha1.TrafficStats{},
antreaNetworkPolicyStats: map[types.UID]*statsv1alpha1.TrafficStats{},
antreaClusterNetworkPolicyStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{},
antreaNetworkPolicyStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{},
},
},
}
Expand Down Expand Up @@ -329,3 +333,195 @@ func TestCalculateDiff(t *testing.T) {
})
}
}

func TestCalculateRuleDiff(t *testing.T) {
tests := []struct {
name string
lastStats map[types.UID]map[string]*statsv1alpha1.TrafficStats
curStats map[types.UID]map[string]*statsv1alpha1.TrafficStats
expectedstatsList []cpv1beta.AntreaNetworkPolicyStats
}{
{
name: "new networkpolicy and existing networkpolicy",
lastStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{
"uid1": {
"rule1": {
Bytes: 1,
Packets: 1,
Sessions: 1,
},
},
},
curStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{
"uid1": {
"rule1": {
Bytes: 2,
Packets: 10,
Sessions: 10,
},
"rule2": {
Bytes: 5,
Packets: 5,
Sessions: 5,
},
},
"uid2": {
"rule3": {
Bytes: 1,
Packets: 1,
Sessions: 1,
},
},
},
expectedstatsList: []cpv1beta.AntreaNetworkPolicyStats{
{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: "uid1"},
RuleTrafficStats: map[string]statsv1alpha1.TrafficStats{
"rule1": {
Bytes: 1,
Packets: 9,
Sessions: 9,
},
"rule2": {
Bytes: 5,
Packets: 5,
Sessions: 5,
},
},
},
{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: "uid2"},
RuleTrafficStats: map[string]statsv1alpha1.TrafficStats{
"rule3": {
Bytes: 1,
Packets: 1,
Sessions: 1,
},
},
},
},
},
{
name: "unchanged networkpolicy",
lastStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{
"uid1": {
"rule20": {
Bytes: 0,
Packets: 0,
Sessions: 0,
},
},
"uid2": {
"rule1": {
Bytes: 1,
Packets: 10,
Sessions: 10,
},
"rule2": {
Bytes: 5,
Packets: 5,
Sessions: 5,
},
"rule3": {
Bytes: 1,
Packets: 1,
Sessions: 1,
},
},
},
curStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{
"uid1": {
"rule20": {
Bytes: 0,
Packets: 0,
Sessions: 0,
},
},
"uid2": {
"rule1": {
Bytes: 1,
Packets: 10,
Sessions: 10,
},
"rule2": {
Bytes: 5,
Packets: 5,
Sessions: 5,
},
"rule3": {
Bytes: 1,
Packets: 1,
Sessions: 1,
},
},
},
expectedstatsList: []cpv1beta.AntreaNetworkPolicyStats{},
},
{
name: "negative statistic",
lastStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{
"uid1": {
"rule1": {
Bytes: 10,
Packets: 100,
Sessions: 1,
},
},
},
curStats: map[types.UID]map[string]*statsv1alpha1.TrafficStats{
"uid1": {
"rule1": {
Bytes: 1,
Packets: 10,
Sessions: 10,
},
"rule2": {
Bytes: 5,
Packets: 5,
Sessions: 5,
},
},
"uid2": {
"rule3": {
Bytes: 1,
Packets: 1,
Sessions: 1,
},
},
},
expectedstatsList: []cpv1beta.AntreaNetworkPolicyStats{
{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: "uid1"},
RuleTrafficStats: map[string]statsv1alpha1.TrafficStats{
"rule1": {
Bytes: 1,
Packets: 10,
Sessions: 10,
},
"rule2": {
Bytes: 5,
Packets: 5,
Sessions: 5,
},
},
},
{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: "uid2"},
RuleTrafficStats: map[string]statsv1alpha1.TrafficStats{
"rule3": {
Bytes: 1,
Packets: 1,
Sessions: 1,
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetrics := calculateRuleDiff(tt.curStats, tt.lastStats)
assert.ElementsMatch(t, tt.expectedstatsList, actualMetrics)
})
}
}
4 changes: 2 additions & 2 deletions pkg/apis/controlplane/v1beta2/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/apis/stats/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
type AntreaClusterNetworkPolicyStats struct {
metav1.TypeMeta
metav1.ObjectMeta
TrafficStats TrafficStats

// The traffic stats of the Antrea ClusterNetworkPolicy.
TrafficStats TrafficStats
// The traffic stats of the Antrea NetworkPolicy, in rule perspective.
RuleTrafficStats []RuleTrafficStats
}

Expand All @@ -50,7 +50,7 @@ type AntreaNetworkPolicyStats struct {

// The traffic stats of the Antrea NetworkPolicy.
TrafficStats TrafficStats
// The traffic stats of the Antrea NetworkPolicy, in rule perspective
// The traffic stats of the Antrea NetworkPolicy, in rule perspective.
RuleTrafficStats []RuleTrafficStats
}

Expand Down Expand Up @@ -97,6 +97,7 @@ type TrafficStats struct {
Sessions int64
}

// RuleTrafficStats contains the traffic stats of a NetworkPolicy by separate rules.
type RuleTrafficStats struct {
Name string
TrafficStats TrafficStats
Expand Down
Loading

0 comments on commit 2a8ecf9

Please sign in to comment.