Skip to content

Commit

Permalink
Add more visibility metrics for Flow Aggregator (#2668)
Browse files Browse the repository at this point in the history
This commit adds `antrea_agent_flow_collector_reconnection_count`
Prometheus metrics to expose visibility of connection status
between Flow Exporter and Flow Aggregator. It also adds a
periodical log for checking Flow Aggregator's ingest rate,
throughput and connection status.

Signed-off-by: zyiou <[email protected]>
  • Loading branch information
zyiou authored Oct 11, 2021
1 parent 95e836d commit 4d0eea7
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 16 deletions.
5 changes: 3 additions & 2 deletions docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ We support following connection metrics as Prometheus metrics that are exposed
through [Antrea Agent apiserver endpoint](prometheus-integration.md):
`antrea_agent_conntrack_total_connection_count`,
`antrea_agent_conntrack_antrea_connection_count`,
`antrea_agent_denied_connection_count` and
`antrea_agent_conntrack_max_connection_count`
`antrea_agent_denied_connection_count`,
`antrea_agent_conntrack_max_connection_count`, and
`antrea_agent_flow_collector_reconnection_count`

## Flow Aggregator

Expand Down
4 changes: 4 additions & 0 deletions docs/prometheus-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ detected by Flow Exporter deny connections tracking. This metric gets updated
when a flow is rejected/dropped by network policy.
- **antrea_agent_egress_networkpolicy_rule_count:** Number of egress
NetworkPolicy rules on local Node which are managed by the Antrea Agent.
- **antrea_agent_flow_collector_reconnection_count:** Number of re-connections
between Flow Exporter and flow collector. This metric gets updated whenever the
connection is re-established between the Flow Exporter and the flow collector
(e.g. the Flow Aggregator).
- **antrea_agent_ingress_networkpolicy_rule_count:** Number of ingress
NetworkPolicy rules on local Node which are managed by the Antrea Agent.
- **antrea_agent_local_pod_count:** Number of Pods on local Node which are
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/ipfix"
"antrea.io/antrea/pkg/util/env"
Expand Down Expand Up @@ -286,6 +287,7 @@ func (exp *flowExporter) initFlowExporter() error {
}
klog.V(2).Infof("Initialized flow exporter for IPv6 flow records and sent %d bytes size of template record", sentBytes)
}
metrics.ReconnectionsToFlowCollector.Inc()
return nil
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@ package exporter

import (
"net"
"strings"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing"
"github.com/vmware/go-ipfix/pkg/exporter"
"github.com/vmware/go-ipfix/pkg/registry"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/component-base/metrics/legacyregistry"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/metrics"
ipfixtest "antrea.io/antrea/pkg/ipfix/testing"
)

Expand Down Expand Up @@ -258,6 +263,39 @@ func testSendDataSet(t *testing.T, v4Enabled bool, v6Enabled bool) {
}
}

func TestFlowExporter_initFlowExporter(t *testing.T) {
metrics.InitializeConnectionMetrics()
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("error when resolving UDP address: %v", err)
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
t.Fatalf("error when creating a local server: %v", err)
}
defer conn.Close()
exp := &flowExporter{
process: nil,
exporterInput: exporter.ExporterInput{
CollectorProtocol: conn.LocalAddr().Network(),
CollectorAddress: conn.LocalAddr().String(),
},
}
err = exp.initFlowExporter()
assert.NoError(t, err)
checkTotalReconnectionsMetric(t)
}

func checkTotalReconnectionsMetric(t *testing.T) {
expected := `
# HELP antrea_agent_flow_collector_reconnection_count [ALPHA] Number of re-connections between Flow Exporter and flow collector. This metric gets updated whenever the connection is re-established between the Flow Exporter and the flow collector (e.g. the Flow Aggregator).
# TYPE antrea_agent_flow_collector_reconnection_count gauge
antrea_agent_flow_collector_reconnection_count 1
`
err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), "antrea_agent_flow_collector_reconnection_count")
assert.NoError(t, err)
}

func getElemList(ianaIE []string, antreaIE []string) []ipfixentities.InfoElementWithValue {
// Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals)
// Need only element name and other fields are set to dummy values
Expand Down
39 changes: 26 additions & 13 deletions pkg/agent/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ var (
},
)

ReconnectionsToFlowCollector = metrics.NewGauge(
&metrics.GaugeOpts{
Namespace: metricNamespaceAntrea,
Subsystem: metricSubsystemAgent,
Name: "flow_collector_reconnection_count",
Help: "Number of re-connections between Flow Exporter and flow collector. This metric gets updated whenever the connection is re-established between the Flow Exporter and the flow collector (e.g. the Flow Aggregator).",
StabilityLevel: metrics.ALPHA,
},
)

MaxConnectionsInConnTrackTable = metrics.NewGauge(
&metrics.GaugeOpts{
Namespace: metricNamespaceAntrea,
Expand All @@ -168,40 +178,40 @@ func InitializePrometheusMetrics() {

func InitializePodMetrics() {
if err := legacyregistry.Register(PodCount); err != nil {
klog.Error("Failed to register antrea_agent_local_pod_count with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_local_pod_count")
}
}

func InitializeNetworkPolicyMetrics() {
if err := legacyregistry.Register(EgressNetworkPolicyRuleCount); err != nil {
klog.Error("Failed to register antrea_agent_egress_networkpolicy_rule_count with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_egress_networkpolicy_rule_count")
}

if err := legacyregistry.Register(IngressNetworkPolicyRuleCount); err != nil {
klog.Error("Failed to register antrea_agent_ingress_networkpolicy_rule_count with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_ingress_networkpolicy_rule_count")
}

if err := legacyregistry.Register(NetworkPolicyCount); err != nil {
klog.Error("Failed to register antrea_agent_networkpolicy_count with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_networkpolicy_count")
}
}

func InitializeOVSMetrics() {
if err := legacyregistry.Register(OVSTotalFlowCount); err != nil {
klog.Error("Failed to register antrea_agent_ovs_total_flow_count with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_ovs_total_flow_count")
}
if err := legacyregistry.Register(OVSFlowCount); err != nil {
klog.Error("Failed to register antrea_agent_ovs_flow_count with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_ovs_flow_count")
}

if err := legacyregistry.Register(OVSFlowOpsCount); err != nil {
klog.Error("Failed to register antrea_agent_ovs_flow_ops_count with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_ovs_flow_ops_count")
}
if err := legacyregistry.Register(OVSFlowOpsErrorCount); err != nil {
klog.Error("Failed to register antrea_agent_ovs_flow_ops_error_count with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_ovs_flow_ops_error_count")
}
if err := legacyregistry.Register(OVSFlowOpsLatency); err != nil {
klog.Error("Failed to register antrea_agent_ovs_flow_ops_latency_milliseconds with Prometheus")
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_ovs_flow_ops_latency_milliseconds")
}
// Initialize OpenFlow operations metrics with label add, modify and delete
// since those metrics won't come out until observation.
Expand All @@ -215,15 +225,18 @@ func InitializeOVSMetrics() {

func InitializeConnectionMetrics() {
if err := legacyregistry.Register(TotalConnectionsInConnTrackTable); err != nil {
klog.Errorf("Failed to register antrea_agent_conntrack_total_connection_count with error: %v", err)
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_conntrack_total_connection_count")
}
if err := legacyregistry.Register(TotalAntreaConnectionsInConnTrackTable); err != nil {
klog.Errorf("Failed to register antrea_agent_conntrack_antrea_connection_count with error: %v", err)
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_conntrack_antrea_connection_count")
}
if err := legacyregistry.Register(TotalDenyConnections); err != nil {
klog.Errorf("Failed to register antrea_agent_denied_connection_count with error: %v", err)
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_denied_connection_count")
}
if err := legacyregistry.Register(ReconnectionsToFlowCollector); err != nil {
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_flow_collector_reconnection_count")
}
if err := legacyregistry.Register(MaxConnectionsInConnTrackTable); err != nil {
klog.Errorf("Failed to register antrea_agent_conntrack_max_connection_count with error: %v", err)
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_conntrack_max_connection_count")
}
}
11 changes: 10 additions & 1 deletion pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ type flowAggregator struct {
observationDomainID uint32
podInformer coreinformers.PodInformer
sendJSONRecord bool
numRecordsExported int64
numRecordsReceived int64
}

func NewFlowAggregator(
Expand Down Expand Up @@ -381,7 +383,7 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {

func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {
expireTimer := time.NewTimer(fa.activeFlowRecordTimeout)

logTicker := time.NewTicker(time.Minute)
for {
select {
case <-stopCh:
Expand Down Expand Up @@ -413,6 +415,12 @@ func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {
}
// Get the new expiry and reset the timer.
expireTimer.Reset(fa.aggregationProcess.GetExpiryFromExpirePriorityQueue())
case <-logTicker.C:
// Add visibility of processing stats of Flow Aggregator
klog.V(4).InfoS("Total number of records received", "count", fa.collectingProcess.GetNumRecordsReceived())
klog.V(4).InfoS("Total number of records exported", "count", fa.numRecordsExported)
klog.V(4).InfoS("Total number of flows stored in Flow Aggregator", "count", fa.aggregationProcess.GetNumFlows())
klog.V(4).InfoS("Number of exporters connected with Flow Aggregator", "count", fa.collectingProcess.GetNumConnToCollector())
}
}
}
Expand Down Expand Up @@ -450,6 +458,7 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
}

klog.V(4).Infof("Data set sent successfully: %d Bytes sent", sentBytes)
fa.numRecordsExported = fa.numRecordsExported + 1
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/ipfix/ipfix_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type IPFIXCollectingProcess interface {
Start()
Stop()
GetMsgChan() chan *ipfixentities.Message
GetNumRecordsReceived() int64
GetNumConnToCollector() int64
}

type ipfixCollectingProcess struct {
Expand Down Expand Up @@ -56,3 +58,11 @@ func (cp *ipfixCollectingProcess) Stop() {
func (cp *ipfixCollectingProcess) GetMsgChan() chan *ipfixentities.Message {
return cp.CollectingProcess.GetMsgChan()
}

func (cp *ipfixCollectingProcess) GetNumRecordsReceived() int64 {
return int64(cp.CollectingProcess.GetNumOfRecordsReceived())
}

func (cp *ipfixCollectingProcess) GetNumConnToCollector() int64 {
return int64(cp.CollectingProcess.GetNumOfConnToCollector())
}
5 changes: 5 additions & 0 deletions pkg/ipfix/ipfix_intermediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type IPFIXAggregationProcess interface {
IsAggregatedRecordIPv4(record ipfixintermediate.AggregationFlowRecord) bool
SetExternalFieldsFilled(record *ipfixintermediate.AggregationFlowRecord)
AreExternalFieldsFilled(record ipfixintermediate.AggregationFlowRecord) bool
GetNumFlows() int64
}

type ipfixAggregationProcess struct {
Expand Down Expand Up @@ -93,3 +94,7 @@ func (ap *ipfixAggregationProcess) SetExternalFieldsFilled(record *ipfixintermed
func (ap *ipfixAggregationProcess) AreExternalFieldsFilled(record ipfixintermediate.AggregationFlowRecord) bool {
return ap.AggregationProcess.AreExternalFieldsFilled(record)
}

func (ap *ipfixAggregationProcess) GetNumFlows() int64 {
return int64(ap.AggregationProcess.GetNumberOfFlows())
}
42 changes: 42 additions & 0 deletions pkg/ipfix/testing/mock_ipfix.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions test/e2e/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var antreaAgentMetrics = []string{
"antrea_agent_conntrack_total_connection_count",
"antrea_agent_conntrack_antrea_connection_count",
"antrea_agent_conntrack_max_connection_count",
"antrea_agent_denied_connection_count",
"antrea_agent_flow_collector_reconnection_count",
"antrea_proxy_sync_proxy_rules_duration_seconds",
"antrea_proxy_total_endpoints_installed",
"antrea_proxy_total_endpoints_updates",
Expand Down
14 changes: 14 additions & 0 deletions third_party/proxy/testing/mock_proxy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4d0eea7

Please sign in to comment.