Skip to content

Commit

Permalink
Support Rule Level NetworkPolicy statistics
Browse files Browse the repository at this point in the history
This PR supports collecting and querying the NetworkPolicy statistics for
Antrea Networkpolicies. Native k8s Networkpolicies are not supported
  • Loading branch information
ceclinux committed Mar 11, 2021
1 parent cdfaeb4 commit 8aa142c
Show file tree
Hide file tree
Showing 33 changed files with 1,795 additions and 370 deletions.
3 changes: 3 additions & 0 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type rule struct {
To v1beta.NetworkPolicyPeer
// Protocols and Ports of this rule.
Services []v1beta.Service
// Name of this rule. Empty for k8s NetworkPolicy.
Name string
// Action of this rule. nil for k8s NetworkPolicy.
Action *secv1alpha1.RuleAction
// Priority of this rule within the NetworkPolicy. Defaults to -1 for K8s NetworkPolicy.
Expand Down Expand Up @@ -548,6 +550,7 @@ func toRule(r *v1beta.NetworkPolicyRule, policy *v1beta.NetworkPolicy, maxPriori
PolicyPriority: policy.Priority,
TierPriority: policy.TierPriority,
AppliedToGroups: appliedToGroups,
Name: r.Name,
PolicyUID: policy.UID,
SourceRef: policy.SourceRef,
EnableLogging: r.EnableLogging,
Expand Down
10 changes: 9 additions & 1 deletion pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,14 @@ func (c *Controller) GetAppliedToGroups() []v1beta2.AppliedToGroup {
}

func (c *Controller) GetNetworkPolicyByRuleFlowID(ruleFlowID uint32) *v1beta2.NetworkPolicyReference {
rule := c.GetRuleByFlowID(ruleFlowID)
if rule == nil {
return nil
}
return rule.PolicyRef
}

func (c *Controller) GetRuleByFlowID(ruleFlowID uint32) *types.PolicyRule {
rule, exists, err := c.reconciler.GetRuleByFlowID(ruleFlowID)
if err != nil {
klog.Errorf("Error when getting network policy by rule flow ID: %v", err)
Expand All @@ -354,7 +362,7 @@ func (c *Controller) GetNetworkPolicyByRuleFlowID(ruleFlowID uint32) *v1beta2.Ne
if !exists {
return nil
}
return rule.PolicyRef
return rule
}

func (c *Controller) GetControllerConnectionStatus() bool {
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
To: ofPortsToOFAddresses(ofPorts),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: rule.Action,
Name: rule.Name,
Priority: ofPriority,
TableID: table,
PolicyRef: rule.SourceRef,
Expand All @@ -450,6 +451,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: rule.Action,
Priority: ofPriority,
Name: rule.Name,
TableID: table,
PolicyRef: rule.SourceRef,
EnableLogging: rule.EnableLogging,
Expand Down
122 changes: 98 additions & 24 deletions pkg/agent/stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/vmware-tanzu/antrea/pkg/agent"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
agenttypes "github.com/vmware-tanzu/antrea/pkg/agent/types"
cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
statsv1alpha1 "github.com/vmware-tanzu/antrea/pkg/apis/stats/v1alpha1"
"github.com/vmware-tanzu/antrea/pkg/querier"
Expand All @@ -40,9 +41,9 @@ type statsCollection struct {
// networkPolicyStats is a mapping from K8s NetworkPolicy UIDs to their traffic stats.
networkPolicyStats map[types.UID]*statsv1alpha1.TrafficStats
// antreaClusterNetworkPolicyStats is a mapping from Antrea ClusterNetworkPolicy UIDs to their traffic stats.
antreaClusterNetworkPolicyStats map[types.UID]*statsv1alpha1.TrafficStats
antreaClusterNetworkPolicyStats map[types.UID]map[string]*statsv1alpha1.TrafficStats
// antreaNetworkPolicyStats is a mapping from Antrea NetworkPolicy UIDs to their traffic stats.
antreaNetworkPolicyStats map[types.UID]*statsv1alpha1.TrafficStats
antreaNetworkPolicyStats map[types.UID]map[string]*statsv1alpha1.TrafficStats
}

// Collector is responsible for collecting stats from the Openflow client, calculating the delta compared with the last
Expand Down Expand Up @@ -104,36 +105,27 @@ func (m *Collector) Run(stopCh <-chan struct{}) {
func (m *Collector) collect() *statsCollection {
ruleStatsMap := m.ofClient.NetworkPolicyMetrics()
npStatsMap := map[types.UID]*statsv1alpha1.TrafficStats{}
acnpStatsMap := map[types.UID]*statsv1alpha1.TrafficStats{}
anpStatsMap := map[types.UID]*statsv1alpha1.TrafficStats{}
acnpStatsMap := map[types.UID]map[string]*statsv1alpha1.TrafficStats{}
anpStatsMap := map[types.UID]map[string]*statsv1alpha1.TrafficStats{}

for ofID, ruleStats := range ruleStatsMap {
policyRef := m.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(ofID)
if policyRef == nil {
rule := m.networkPolicyQuerier.GetRuleByFlowID(ofID)
if rule == nil {
// This should not happen because the rule flow ID to rule mapping is
// preserved for at least 5 seconds even after the rule deletion.
klog.Warningf("Cannot find NetworkPolicy that has ofID %v", ofID)
klog.Warningf("Cannot find NetworkPolicy Rule that has ofID %v", ofID)
continue
}
klog.V(4).Infof("Converting ofID %v to policy %s", ofID, policyRef.ToString())

var statsMap map[types.UID]*statsv1alpha1.TrafficStats
switch policyRef.Type {
klog.V(4).Infof("Converting ofID %v to policy %s", ofID, rule.PolicyRef.ToString())
switch rule.PolicyRef.Type {
case cpv1beta.K8sNetworkPolicy:
statsMap = npStatsMap
addPolicyStatsUp(npStatsMap, ruleStats, rule)
case cpv1beta.AntreaClusterNetworkPolicy:
statsMap = acnpStatsMap
addRuleStatsUp(acnpStatsMap, ruleStats, rule)
case cpv1beta.AntreaNetworkPolicy:
statsMap = anpStatsMap
}

policyStats, exists := statsMap[policyRef.UID]
if !exists {
policyStats = new(statsv1alpha1.TrafficStats)
statsMap[policyRef.UID] = policyStats
addRuleStatsUp(anpStatsMap, ruleStats, rule)
}
policyStats.Bytes += int64(ruleStats.Bytes)
policyStats.Sessions += int64(ruleStats.Sessions)
policyStats.Packets += int64(ruleStats.Packets)
}
return &statsCollection{
networkPolicyStats: npStatsMap,
Expand All @@ -142,11 +134,40 @@ func (m *Collector) collect() *statsCollection {
}
}

func addPolicyStatsUp(statsMap map[types.UID]*statsv1alpha1.TrafficStats, ruleStats *agenttypes.RuleMetric, rule *agenttypes.PolicyRule) {
policyStats, exists := statsMap[rule.PolicyRef.UID]
if !exists {
policyStats = new(statsv1alpha1.TrafficStats)
statsMap[rule.PolicyRef.UID] = policyStats
}
addUp(policyStats, ruleStats)
}

func addRuleStatsUp(ruleStatsMap map[types.UID]map[string]*statsv1alpha1.TrafficStats, ruleStats *agenttypes.RuleMetric, rule *agenttypes.PolicyRule) {
lastRuleStats, exists := ruleStatsMap[rule.PolicyRef.UID]
if !exists {
lastRuleStats = make(map[string]*statsv1alpha1.TrafficStats)
ruleStatsMap[rule.PolicyRef.UID] = lastRuleStats
}
trafficStats, trafficStatsExists := lastRuleStats[rule.Name]
if !trafficStatsExists {
trafficStats = new(statsv1alpha1.TrafficStats)
lastRuleStats[rule.Name] = trafficStats
}
addUp(trafficStats, ruleStats)
}

func addUp(stats *statsv1alpha1.TrafficStats, inc *agenttypes.RuleMetric) {
stats.Sessions += int64(inc.Sessions)
stats.Packets += int64(inc.Packets)
stats.Bytes += int64(inc.Bytes)
}

// report calculates the delta of the stats and pushes it to the antrea-controller summary API.
func (m *Collector) report(curStatsCollection *statsCollection) error {
npStats := calculateDiff(curStatsCollection.networkPolicyStats, m.lastStatsCollection.networkPolicyStats)
acnpStats := calculateDiff(curStatsCollection.antreaClusterNetworkPolicyStats, m.lastStatsCollection.antreaClusterNetworkPolicyStats)
anpStats := calculateDiff(curStatsCollection.antreaNetworkPolicyStats, m.lastStatsCollection.antreaNetworkPolicyStats)
acnpStats := calculateRuleDiff(curStatsCollection.antreaClusterNetworkPolicyStats, m.lastStatsCollection.antreaClusterNetworkPolicyStats)
anpStats := calculateRuleDiff(curStatsCollection.antreaNetworkPolicyStats, m.lastStatsCollection.antreaNetworkPolicyStats)
if len(npStats) == 0 && len(acnpStats) == 0 && len(anpStats) == 0 {
klog.V(4).Info("No stats to report, skip reporting")
return nil
Expand All @@ -173,6 +194,59 @@ func (m *Collector) report(curStatsCollection *statsCollection) error {
return nil
}

func calculateRuleDiff(curStatsMap, lastStatsMap map[types.UID]map[string]*statsv1alpha1.TrafficStats) []cpv1beta.NetworkPolicyStats {
if len(curStatsMap) == 0 {
return nil
}
statsList := make([]cpv1beta.NetworkPolicyStats, 0, len(curStatsMap))
for uid, curStats := range curStatsMap {
lastStats, exists := lastStatsMap[uid]
stats := make([]statsv1alpha1.RuleTrafficStats, 0, len(curStats))
if !exists {
for name, curRuleStats := range curStats {
if curRuleStats.Bytes != 0 {
ruleTrafficStats := statsv1alpha1.RuleTrafficStats{
Name: name,
TrafficStats: *curRuleStats,
}
stats = append(stats, ruleTrafficStats)
}
}
} else {
for name, curRuleStats := range curStats {
lastRuleStats, ruleStatsExists := lastStats[name]
// curRuleStats.Bytes < lastRuleStats.Bytes could happen
// as rules with same name can be deleted and recreated later.
if (!ruleStatsExists || curRuleStats.Bytes < lastRuleStats.Bytes) && curRuleStats.Bytes != 0 {
ruleTrafficStats := statsv1alpha1.RuleTrafficStats{
Name: name,
TrafficStats: *curRuleStats,
}
stats = append(stats, ruleTrafficStats)
} else if curRuleStats.Bytes > lastRuleStats.Bytes {
ruleTrafficStats := statsv1alpha1.RuleTrafficStats{
Name: name,
TrafficStats: statsv1alpha1.TrafficStats{
Bytes: curRuleStats.Bytes - lastRuleStats.Bytes,
Sessions: curRuleStats.Sessions - lastRuleStats.Sessions,
Packets: curRuleStats.Packets - lastRuleStats.Packets,
},
}
stats = append(stats, ruleTrafficStats)
}
}
}
if len(stats) != 0 {
policyStats := cpv1beta.NetworkPolicyStats{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: uid},
RuleTrafficStats: stats,
}
statsList = append(statsList, policyStats)
}
}
return statsList
}

func calculateDiff(curStatsMap, lastStatsMap map[types.UID]*statsv1alpha1.TrafficStats) []cpv1beta.NetworkPolicyStats {
if len(curStatsMap) == 0 {
return nil
Expand Down
Loading

0 comments on commit 8aa142c

Please sign in to comment.