diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index e65ac84bc8d..bdbb7d52a8c 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -1200,9 +1200,10 @@ data: # Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener. #enablePrometheusMetrics: true - # Provide flow collector address as string with format :[:], where proto is tcp or udp. This also enables - # the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, - # we consider tcp as default. + # Provide flow collector address as string with format :[:], where proto is tcp or udp. + # IP can be either IPv4 or IPv6. However, IPv6 address should be wrapped with []. + # This also enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. + # If no L4 transport proto is given, we consider tcp as default. #flowCollectorAddr: "" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. @@ -1270,7 +1271,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-t4t2mdfhkc + name: antrea-config-mdmtkcfh59 namespace: kube-system --- apiVersion: v1 @@ -1377,7 +1378,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-t4t2mdfhkc + name: antrea-config-mdmtkcfh59 name: antrea-config - name: antrea-controller-tls secret: @@ -1641,7 +1642,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-t4t2mdfhkc + name: antrea-config-mdmtkcfh59 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 9559970640c..2c4a426cdd1 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -1200,9 +1200,10 @@ data: # Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener. #enablePrometheusMetrics: true - # Provide flow collector address as string with format :[:], where proto is tcp or udp. This also enables - # the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, - # we consider tcp as default. + # Provide flow collector address as string with format :[:], where proto is tcp or udp. + # IP can be either IPv4 or IPv6. However, IPv6 address should be wrapped with []. + # This also enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. + # If no L4 transport proto is given, we consider tcp as default. #flowCollectorAddr: "" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. @@ -1270,7 +1271,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-t4t2mdfhkc + name: antrea-config-mdmtkcfh59 namespace: kube-system --- apiVersion: v1 @@ -1377,7 +1378,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-t4t2mdfhkc + name: antrea-config-mdmtkcfh59 name: antrea-config - name: antrea-controller-tls secret: @@ -1643,7 +1644,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-t4t2mdfhkc + name: antrea-config-mdmtkcfh59 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index dd2fff2b487..5e38e9bf11e 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -1200,9 +1200,10 @@ data: # Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener. #enablePrometheusMetrics: true - # Provide flow collector address as string with format :[:], where proto is tcp or udp. This also enables - # the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, - # we consider tcp as default. + # Provide flow collector address as string with format :[:], where proto is tcp or udp. + # IP can be either IPv4 or IPv6. However, IPv6 address should be wrapped with []. + # This also enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. + # If no L4 transport proto is given, we consider tcp as default. #flowCollectorAddr: "" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. @@ -1270,7 +1271,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-gmt86d9t68 + name: antrea-config-b5dkk776t2 namespace: kube-system --- apiVersion: v1 @@ -1377,7 +1378,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-gmt86d9t68 + name: antrea-config-b5dkk776t2 name: antrea-config - name: antrea-controller-tls secret: @@ -1641,7 +1642,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-gmt86d9t68 + name: antrea-config-b5dkk776t2 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 5e84502b745..dec09d5e46f 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -1205,9 +1205,10 @@ data: # Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener. #enablePrometheusMetrics: true - # Provide flow collector address as string with format :[:], where proto is tcp or udp. This also enables - # the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, - # we consider tcp as default. + # Provide flow collector address as string with format :[:], where proto is tcp or udp. + # IP can be either IPv4 or IPv6. However, IPv6 address should be wrapped with []. + # This also enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. + # If no L4 transport proto is given, we consider tcp as default. #flowCollectorAddr: "" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. @@ -1275,7 +1276,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-2k6g59bdkg + name: antrea-config-6kg9kdbg49 namespace: kube-system --- apiVersion: v1 @@ -1391,7 +1392,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-2k6g59bdkg + name: antrea-config-6kg9kdbg49 name: antrea-config - name: antrea-controller-tls secret: @@ -1690,7 +1691,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-2k6g59bdkg + name: antrea-config-6kg9kdbg49 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-windows.yml b/build/yamls/antrea-windows.yml index 1d1ca6ab6ae..6c9bbfed1a4 100644 --- a/build/yamls/antrea-windows.yml +++ b/build/yamls/antrea-windows.yml @@ -56,9 +56,10 @@ data: # Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener. #enablePrometheusMetrics: true - # Provide flow collector address as string with format :[:], where proto is tcp or udp. This also enables - # the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, - # we consider tcp as default. + # Provide flow collector address as string with format :[:], where proto is tcp or udp. + # IP can be either IPv4 or IPv6. However, IPv6 address should be wrapped with []. + # This also enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. + # If no L4 transport proto is given, we consider tcp as default. #flowCollectorAddr: "" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. @@ -88,7 +89,7 @@ kind: ConfigMap metadata: labels: app: antrea - name: antrea-windows-config-6d4gc5kdc8 + name: antrea-windows-config-kc6bfhk4mg namespace: kube-system --- apiVersion: apps/v1 @@ -176,7 +177,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-windows-config-6d4gc5kdc8 + name: antrea-windows-config-kc6bfhk4mg name: antrea-windows-config - configMap: defaultMode: 420 diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index c9cb7c4a851..0efed9096ba 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -1205,9 +1205,10 @@ data: # Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener. #enablePrometheusMetrics: true - # Provide flow collector address as string with format :[:], where proto is tcp or udp. This also enables - # the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, - # we consider tcp as default. + # Provide flow collector address as string with format :[:], where proto is tcp or udp. + # IP can be either IPv4 or IPv6. However, IPv6 address should be wrapped with []. + # This also enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. + # If no L4 transport proto is given, we consider tcp as default. #flowCollectorAddr: "" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. @@ -1275,7 +1276,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-9c7h568bgf + name: antrea-config-669cb7d7kt namespace: kube-system --- apiVersion: v1 @@ -1382,7 +1383,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-9c7h568bgf + name: antrea-config-669cb7d7kt name: antrea-config - name: antrea-controller-tls secret: @@ -1646,7 +1647,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-9c7h568bgf + name: antrea-config-669cb7d7kt name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index a72f353f448..39f268f68ef 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -89,9 +89,10 @@ featureGates: # Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener. #enablePrometheusMetrics: true -# Provide flow collector address as string with format :[:], where proto is tcp or udp. This also enables -# the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, -# we consider tcp as default. +# Provide flow collector address as string with format :[:], where proto is tcp or udp. +# IP can be either IPv4 or IPv6. However, IPv6 address should be wrapped with []. +# This also enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. +# If no L4 transport proto is given, we consider tcp as default. #flowCollectorAddr: "" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. diff --git a/build/yamls/windows/base/conf/antrea-agent.conf b/build/yamls/windows/base/conf/antrea-agent.conf index a272a5a407b..c1fc62f0914 100644 --- a/build/yamls/windows/base/conf/antrea-agent.conf +++ b/build/yamls/windows/base/conf/antrea-agent.conf @@ -38,9 +38,10 @@ featureGates: # Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener. #enablePrometheusMetrics: true -# Provide flow collector address as string with format :[:], where proto is tcp or udp. This also enables -# the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, -# we consider tcp as default. +# Provide flow collector address as string with format :[:], where proto is tcp or udp. +# IP can be either IPv4 or IPv6. However, IPv6 address should be wrapped with []. +# This also enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. +# If no L4 transport proto is given, we consider tcp as default. #flowCollectorAddr: "" # Provide flow poll interval as a duration string. This determines how often the flow exporter dumps connections from the conntrack module. diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 221c4d2ffa6..1a6fb9a6d21 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -17,6 +17,7 @@ package main import ( "fmt" "net" + "regexp" "time" "k8s.io/apimachinery/pkg/util/wait" @@ -298,9 +299,20 @@ func run(o *Options) error { // Initialize flow exporter to start go routines to poll conntrack flows and export IPFIX flow records if features.DefaultFeatureGate.Enabled(features.FlowExporter) { + match, err := regexp.MatchString("\\[.*\\]:.*", o.config.FlowCollectorAddr) + if err != nil { + return fmt.Errorf("Failed to parse FlowCollectorAddr: %s", o.config.FlowCollectorAddr) + } + svcCIDR := serviceCIDRNet + addrFamily := "ipv4" + if match { + svcCIDR = serviceCIDRNetv6 + addrFamily = "ipv6" + } connStore := connections.NewConnectionStore( - connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, o.config.OVSDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)), + connections.InitializeConnTrackDumper(nodeConfig, svcCIDR, o.config.OVSDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)), ifaceStore, + addrFamily, proxier, networkPolicyController, o.pollInterval) @@ -309,7 +321,8 @@ func run(o *Options) error { flowExporter := exporter.NewFlowExporter( flowrecords.NewFlowRecords(connStore), - o.config.FlowExportFrequency) + o.config.FlowExportFrequency, + addrFamily) go wait.Until(func() { flowExporter.Export(o.flowCollector, stopCh, pollDone) }, 0, stopCh) } diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 79ff70ab856..4a6dfb8678d 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -18,6 +18,7 @@ import ( "fmt" "io/ioutil" "net" + "regexp" "strings" "time" @@ -194,7 +195,10 @@ func (o *Options) validateFlowExporterConfig() error { return fmt.Errorf("IPFIX flow collector address should be provided") } else { // Check if it is TCP or UDP - strSlice := strings.Split(o.config.FlowCollectorAddr, ":") + strSlice, err := parseFlowCollectorAddr(o.config.FlowCollectorAddr) + if err != nil { + return err + } var proto string if len(strSlice) == 2 { // If no separator ":" and proto is given, then default to TCP. @@ -210,7 +214,7 @@ func (o *Options) validateFlowExporterConfig() error { // Convert the string input in net.Addr format hostPortAddr := strSlice[0] + ":" + strSlice[1] - _, _, err := net.SplitHostPort(hostPortAddr) + _, _, err = net.SplitHostPort(hostPortAddr) if err != nil { return fmt.Errorf("IPFIX flow collector is given in invalid format: %v", err) } @@ -239,3 +243,19 @@ func (o *Options) validateFlowExporterConfig() error { } return nil } + +func parseFlowCollectorAddr(addr string) ([]string, error) { + var strSlice []string + match, err := regexp.MatchString("\\[.*\\]:.*", addr) + if err != nil { + return strSlice, fmt.Errorf("Failed to parse FlowCollectorAddr: %s", addr) + } + if match { + idx := strings.Index(addr, "]") + strSlice = append(strSlice, addr[:idx+1]) + strSlice = append(strSlice, strings.Split(addr[idx+2:], ":")...) + } else { + strSlice = strings.Split(addr, ":") + } + return strSlice, nil +} diff --git a/cmd/antrea-agent/options_test.go b/cmd/antrea-agent/options_test.go index db0dfd183db..1a689b6c92a 100644 --- a/cmd/antrea-agent/options_test.go +++ b/cmd/antrea-agent/options_test.go @@ -51,3 +51,28 @@ func TestOptions_validateFlowExporterConfig(t *testing.T) { } } + +func TestParseFlowCollectorAddr(t *testing.T) { + testcases := []struct { + addr string + expected []string + }{ + { + "1.2.3.4:80:udp", + []string{"1.2.3.4", "80", "udp"}, + }, + { + "1.2.3.4:80", + []string{"1.2.3.4", "80"}, + }, + { + "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:80:tcp", + []string{"[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]", "80", "tcp"}, + }, + } + for _, tc := range testcases { + res, err := parseFlowCollectorAddr(tc.addr) + assert.Nil(t, err) + assert.Equal(t, tc.expected, res) + } +} diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index f09780f635e..e6e796651c1 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -41,6 +41,7 @@ type ConnectionStore struct { connections map[flowexporter.ConnectionKey]flowexporter.Connection connDumper ConnTrackDumper ifaceStore interfacestore.InterfaceStore + addrFamily string antreaProxier proxy.Provider networkPolicyQuerier querier.AgentNetworkPolicyInfoQuerier pollInterval time.Duration @@ -50,6 +51,7 @@ type ConnectionStore struct { func NewConnectionStore( connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, + addrFamily string, proxier proxy.Provider, npQuerier querier.AgentNetworkPolicyInfoQuerier, pollInterval time.Duration, @@ -58,6 +60,7 @@ func NewConnectionStore( connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection), connDumper: connTrackDumper, ifaceStore: ifaceStore, + addrFamily: addrFamily, antreaProxier: proxier, networkPolicyQuerier: npQuerier, pollInterval: pollInterval, @@ -234,7 +237,11 @@ func (cs *ConnectionStore) Poll() (int, error) { // We do not expect any error as resetConn is not returning any error cs.ForAllConnectionsDo(resetConn) - filteredConnsList, totalConns, err := cs.connDumper.DumpFlows(openflow.CtZone) + var zone uint16 = openflow.CtZone + if cs.addrFamily == "ipv6" { + zone = openflow.CtZoneV6 + } + filteredConnsList, totalConns, err := cs.connDumper.DumpFlows(zone) if err != nil { return 0, err } diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index f7b73ca5123..6ba30434f8c 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -165,7 +165,7 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) mockProxier := k8proxytest.NewMockProvider(ctrl) npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) - connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, mockProxier, npQuerier, testPollInterval) + connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, "ipv4", mockProxier, npQuerier, testPollInterval) // Add flow1conn to the Connection map testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1) @@ -268,7 +268,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { // Create ConnectionStore mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval) + connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, "ipv4", nil, nil, testPollInterval) // Add flows to the Connection store for i, flow := range testFlows { connStore.connections[*testFlowKeys[i]] = *flow @@ -333,7 +333,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { // Create ConnectionStore mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval) + connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, "ipv4", nil, nil, testPollInterval) // Add flows to the connection store. for i, flow := range testFlows { connStore.connections[*testFlowKeys[i]] = *flow @@ -357,7 +357,7 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) { // Create ConnectionStore mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval) + connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, "ipv4", nil, nil, testPollInterval) // Hard-coded conntrack occupancy metrics for test TotalConnections := 0 MaxConnections := 300000 diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index a76f7644011..236fe4b3471 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -30,7 +30,7 @@ import ( ) var ( - IANAInfoElements = []string{ + IANAInfoElementsIPv4 = []string{ "flowStartSeconds", "flowEndSeconds", "sourceIPv4Address", @@ -43,13 +43,27 @@ var ( "packetDeltaCount", "octetDeltaCount", } + IANAInfoElementsIPv6 = []string{ + "flowStartSeconds", + "flowEndSeconds", + "sourceIPv6Address", + "destinationIPv6Address", + "sourceTransportPort", + "destinationTransportPort", + "protocolIdentifier", + "packetTotalCount", + "octetTotalCount", + "packetDeltaCount", + "octetDeltaCount", + } + // Substring "reverse" is an indication to get reverse element of go-ipfix library. IANAReverseInfoElements = []string{ "reversePacketTotalCount", "reverseOctetTotalCount", "reversePacketDeltaCount", "reverseOctetDeltaCount", } - AntreaInfoElements = []string{ + AntreaInfoElementsIPv4 = []string{ "sourcePodName", "sourcePodNamespace", "sourceNodeName", @@ -63,6 +77,20 @@ var ( "egressNetworkPolicyName", "egressNetworkPolicyNamespace", } + AntreaInfoElementsIPv6 = []string{ + "sourcePodName", + "sourcePodNamespace", + "sourceNodeName", + "destinationPodName", + "destinationPodNamespace", + "destinationNodeName", + "destinationClusterIPv6", + "destinationServicePortName", + "ingressNetworkPolicyName", + "ingressNetworkPolicyNamespace", + "egressNetworkPolicyName", + "egressNetworkPolicyNamespace", + } ) type flowExporter struct { @@ -73,6 +101,7 @@ type flowExporter struct { pollCycle uint templateID uint16 registry ipfix.IPFIXRegistry + addrFamily string } func genObservationID() (uint32, error) { @@ -85,7 +114,7 @@ func genObservationID() (uint32, error) { return h.Sum32(), nil } -func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *flowExporter { +func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint, addrFamily string) *flowExporter { registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() return &flowExporter{ @@ -96,6 +125,7 @@ func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *fl 0, 0, registry, + addrFamily, } } @@ -165,6 +195,12 @@ func (exp *flowExporter) initFlowExporter(collector net.Addr) error { exp.process = expProcess exp.templateID = expProcess.NewTemplateID() + IANAInfoElements := IANAInfoElementsIPv4 + AntreaInfoElements := AntreaInfoElementsIPv4 + if exp.addrFamily == "ipv6" { + IANAInfoElements = IANAInfoElementsIPv6 + AntreaInfoElements = AntreaInfoElementsIPv6 + } templateRec := ipfix.NewIPFIXTemplateRecord(uint16(len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements)), exp.templateID) sentBytes, err := exp.sendTemplateRecord(templateRec) @@ -201,6 +237,12 @@ func (exp *flowExporter) sendTemplateRecord(templateRec ipfix.IPFIXRecord) (int, return 0, fmt.Errorf("error when writing template header: %v", err) } + IANAInfoElements := IANAInfoElementsIPv4 + AntreaInfoElements := AntreaInfoElementsIPv4 + if exp.addrFamily == "ipv6" { + IANAInfoElements = IANAInfoElementsIPv6 + AntreaInfoElements = AntreaInfoElementsIPv6 + } for _, ie := range IANAInfoElements { element, err := exp.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID) if err != nil { @@ -254,6 +296,10 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress) case "destinationIPv4Address": _, err = dataRec.AddInfoElement(ie, record.Conn.TupleReply.SourceAddress) + case "sourceIPv6Address": + _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress) + case "destinationIPv6Address": + _, err = dataRec.AddInfoElement(ie, record.Conn.TupleReply.SourceAddress) case "sourceTransportPort": _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourcePort) case "destinationTransportPort": @@ -335,6 +381,13 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex // this dummy IP address. _, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0}) } + case "destinationClusterIPv6": + if record.Conn.DestinationServicePortName != "" { + _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.DestinationAddress) + } else { + // Same as destinationClusterIPv4. + _, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) + } case "destinationServicePortName": _, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServicePortName) case "ingressNetworkPolicyName": diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 0f8ea4663ce..6a25a783649 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -34,6 +34,19 @@ const ( ) func TestFlowExporter_sendTemplateRecord(t *testing.T) { + for _, tc := range []struct { + ianaIE []string + antreaIE []string + addrFamily string + }{ + {IANAInfoElementsIPv4, AntreaInfoElementsIPv4, "ipv4"}, + {IANAInfoElementsIPv6, AntreaInfoElementsIPv6, "ipv6"}, + } { + testFlowExporter_sendTemplateRecord(t, tc.ianaIE, tc.antreaIE, tc.addrFamily) + } +} + +func testFlowExporter_sendTemplateRecord(t *testing.T, ianaIE []string, antreaIE []string, addrFamily string) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -48,17 +61,18 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) { 0, testTemplateID, mockIPFIXRegistry, + addrFamily, } // Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals) // Only the element name is needed, other arguments have dummy values. elemList := make([]*ipfixentities.InfoElement, 0) - for _, ie := range IANAInfoElements { + for _, ie := range ianaIE { elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0)) } for _, ie := range IANAReverseInfoElements { elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0)) } - for _, ie := range AntreaInfoElements { + for _, ie := range antreaIE { elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0)) } // Expect calls for different mock objects @@ -66,17 +80,17 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) { var templateRecord ipfixentities.Record mockTempRec.EXPECT().PrepareRecord().Return(tempBytes, nil) - for i, ie := range IANAInfoElements { + for i, ie := range ianaIE { mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i], nil) mockTempRec.EXPECT().AddInfoElement(elemList[i], nil).Return(tempBytes, nil) } for i, ie := range IANAReverseInfoElements { - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(IANAInfoElements)], nil) - mockTempRec.EXPECT().AddInfoElement(elemList[i+len(IANAInfoElements)], nil).Return(tempBytes, nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaIE)], nil) + mockTempRec.EXPECT().AddInfoElement(elemList[i+len(ianaIE)], nil).Return(tempBytes, nil) } - for i, ie := range AntreaInfoElements { - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)], nil) - mockTempRec.EXPECT().AddInfoElement(elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)], nil).Return(tempBytes, nil) + for i, ie := range antreaIE { + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaIE)+len(IANAReverseInfoElements)], nil) + mockTempRec.EXPECT().AddInfoElement(elemList[i+len(ianaIE)+len(IANAReverseInfoElements)], nil).Return(tempBytes, nil) } mockTempRec.EXPECT().GetRecord().Return(templateRecord) mockTempRec.EXPECT().GetTemplateElements().Return(elemList) @@ -89,12 +103,25 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) { t.Errorf("Error in sending templated record: %v", err) } - assert.Equal(t, len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements), len(flowExp.elementsList), flowExp.elementsList, "flowExp.elementsList and template record should have same number of elements") + assert.Equal(t, len(ianaIE)+len(IANAReverseInfoElements)+len(antreaIE), len(flowExp.elementsList), flowExp.elementsList, "flowExp.elementsList and template record should have same number of elements") } // TestFlowExporter_sendDataRecord tests essentially if element names in the switch-case matches globals // IANAInfoElements and AntreaInfoElements. func TestFlowExporter_sendDataRecord(t *testing.T) { + for _, tc := range []struct { + ianaIE []string + antreaIE []string + addrFamily string + }{ + {IANAInfoElementsIPv4, AntreaInfoElementsIPv4, "ipv4"}, + {IANAInfoElementsIPv6, AntreaInfoElementsIPv6, "ipv6"}, + } { + testFlowExporter_sendDataRecord(t, tc.ianaIE, tc.antreaIE, tc.addrFamily) + } +} + +func testFlowExporter_sendDataRecord(t *testing.T, ianaIE []string, antreaIE []string, addrFamily string) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -134,15 +161,15 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { } // Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals) // Need only element name and other are dummys - elemList := make([]*ipfixentities.InfoElement, len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements)) - for i, ie := range IANAInfoElements { + elemList := make([]*ipfixentities.InfoElement, len(ianaIE)+len(IANAReverseInfoElements)+len(antreaIE)) + for i, ie := range ianaIE { elemList[i] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0) } for i, ie := range IANAReverseInfoElements { - elemList[i+len(IANAInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0) + elemList[i+len(ianaIE)] = ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0) } - for i, ie := range AntreaInfoElements { - elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0) + for i, ie := range antreaIE { + elemList[i+len(ianaIE)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0) } mockIPFIXExpProc := ipfixtest.NewMockIPFIXExportingProcess(ctrl) @@ -156,6 +183,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { 0, testTemplateID, mockIPFIXRegistry, + addrFamily, } // Expect calls required var dataRecord ipfixentities.Record @@ -164,10 +192,12 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { switch ieName := ie.Name; ieName { case "flowStartSeconds", "flowEndSeconds": mockDataRec.EXPECT().AddInfoElement(ie, uint32(time.Time{}.Unix())).Return(tempBytes, nil) - case "sourceIPv4Address", "destinationIPv4Address": + case "sourceIPv4Address", "destinationIPv4Address", "sourceIPv6Address", "destinationIPv6Address": mockDataRec.EXPECT().AddInfoElement(ie, nil).Return(tempBytes, nil) case "destinationClusterIPv4": mockDataRec.EXPECT().AddInfoElement(ie, net.IP{0, 0, 0, 0}).Return(tempBytes, nil) + case "destinationClusterIPv6": + mockDataRec.EXPECT().AddInfoElement(ie, net.IP{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}).Return(tempBytes, nil) case "sourceTransportPort", "destinationTransportPort": mockDataRec.EXPECT().AddInfoElement(ie, uint16(0)).Return(tempBytes, nil) case "protocolIdentifier": diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index de73a763094..36533faa8f3 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -16,6 +16,7 @@ package e2e import ( "fmt" + "net" "os" "path/filepath" "sync" @@ -118,39 +119,43 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error) { +func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) { data := &TestData{} + isIPv6 := false if err := data.setupLogDirectoryForTest(tb.Name()); err != nil { tb.Errorf("Error creating logs directory '%s': %v", data.logsDirForTestCase, err) - return nil, err + return nil, err, isIPv6 } tb.Logf("Creating K8s clientset") if err := data.createClient(); err != nil { - return nil, err + return nil, err, isIPv6 } tb.Logf("Creating '%s' K8s Namespace", testNamespace) if err := data.createTestNamespace(); err != nil { - return nil, err + return nil, err, isIPv6 } // Create pod using ipfix collector image if err := data.createPodOnNode("ipfix-collector", masterNodeName(), ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil { tb.Fatalf("Error when creating the ipfix collector Pod: %v", err) } ipfixCollectorIP, err := data.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace) - if err != nil { + if err != nil || len(ipfixCollectorIP.ipStrings) == 0 { tb.Fatalf("Error when waiting to get ipfix collector Pod IP: %v", err) } tb.Logf("Applying Antrea YAML with ipfix collector address") - // TODO: Deploy the collector using IPv6 address after flow_exporter supports IPv6. - ipStr := ipfixCollectorIP.ipv4.String() - if err := data.deployAntreaFlowExporter(ipStr + ":" + ipfixCollectorPort + ":tcp"); err != nil { - return data, err + ipStr := ipfixCollectorIP.ipStrings[0] + if net.ParseIP(ipStr).To4() == nil { + ipStr = fmt.Sprintf("[%s]", ipStr) + isIPv6 = true + } + if err := data.deployAntreaFlowExporter(fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)); err != nil { + return data, err, isIPv6 } tb.Logf("Checking CoreDNS deployment") if err := data.checkCoreDNSPods(defaultTimeout); err != nil { - return data, err + return data, err, isIPv6 } - return data, nil + return data, nil, isIPv6 } func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) { diff --git a/test/e2e/flowexporter_test.go b/test/e2e/flowexporter_test.go index 57e88af2da2..522aa51d847 100644 --- a/test/e2e/flowexporter_test.go +++ b/test/e2e/flowexporter_test.go @@ -32,10 +32,8 @@ import ( // TestFlowExporter runs flow exporter to export flow records for flows. // Flows are deployed between Pods on same node. func TestFlowExporter(t *testing.T) { - // TODO: remove this limitation after flow_exporter supports IPv6 - skipIfIPv6Cluster(t) - skipIfNotIPv4Cluster(t) - data, err := setupTestWithIPFIXCollector(t) + // Should I add skipBenchmark as this runs iperf? + data, err, isIPv6 := setupTestWithIPFIXCollector(t) if err != nil { t.Fatalf("Error when setting up test: %v", err) } @@ -104,9 +102,12 @@ func TestFlowExporter(t *testing.T) { t.Fatalf("Error when waiting for network policy to be realized: %v", err) } t.Log("Network policies are realized.") - podAIPStr := podAIP.ipv4.String() - podBIPStr := podBIP.ipv4.String() - checkRecordsWithPodIPs(t, data, podAIPStr, podBIPStr, false) + + if !isIPv6 { + checkRecordsWithPodIPs(t, data, podAIP.ipv4.String(), podBIP.ipv4.String(), isIPv6) + } else { + checkRecordsWithPodIPs(t, data, podAIP.ipv6.String(), podBIP.ipv6.String(), isIPv6) + } } func checkRecordsWithPodIPs(t *testing.T, data *TestData, podAIP string, podBIP string, isIPv6 bool) { @@ -181,7 +182,7 @@ func checkRecordsWithPodIPs(t *testing.T, data *TestData, podAIP string, podBIP templateRecords = templateRecords + 1 } - if strings.Contains(record, podAIP) && strings.Contains(record, podBIP) { + if strings.Contains(record, lengthenIPv6Addr(isIPv6, podAIP)) && strings.Contains(record, lengthenIPv6Addr(isIPv6, podBIP)) { dataRecordsIntraNode = dataRecordsIntraNode + 1 // Check if records have both Pod name and Pod namespace or not. if !strings.Contains(record, hex.EncodeToString([]byte("perftest-a"))) { @@ -236,8 +237,23 @@ func checkRecordsWithPodIPs(t *testing.T, data *TestData, podAIP string, podBIP } } } - assert.Equal(t, templateRecords, clusterInfo.numNodes, "Each agent should send out template record") + assert.Equal(t, clusterInfo.numNodes, templateRecords, "Each agent should send out template record") // Single iperf resulting in two connections with separate ports. Suspecting second flow to be control flow to exchange // stats info. As 5s is export interval and iperf traffic runs for 10s, we expect 4 records. assert.GreaterOrEqual(t, dataRecordsIntraNode, 4, "Iperf flow should have expected number of flow records") } + +// IPv6 address is collected as like fd74:ca9b:172:16:2:0:0:66, instead of fd74:ca9b:172:16:2::66. +// go-ipfix will fix this issue later. +func lengthenIPv6Addr(isIPv6 bool, ip string) string { + if !isIPv6 { + return ip + } + l := "0" + for i := 0; i < (7 - strings.Count(ip, ":")); i++ { + l = fmt.Sprintf("%s%s", l, ":0") + + } + p := strings.Index(ip, "::") + return fmt.Sprintf("%s%s%s", ip[:p+1], l, ip[p+1:]) +} diff --git a/test/integration/agent/flowexporter_test.go b/test/integration/agent/flowexporter_test.go index c901b240076..68ffacbcd10 100644 --- a/test/integration/agent/flowexporter_test.go +++ b/test/integration/agent/flowexporter_test.go @@ -145,7 +145,7 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) { ifStoreMock := interfacestoretest.NewMockInterfaceStore(ctrl) npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) // TODO: Enhance the integration test by testing service. - connStore := connections.NewConnectionStore(connDumperMock, ifStoreMock, nil, npQuerier, testPollInterval) + connStore := connections.NewConnectionStore(connDumperMock, ifStoreMock, "ipv4", nil, npQuerier, testPollInterval) // Expect calls for connStore.poll and other callees connDumperMock.EXPECT().DumpFlows(uint16(openflow.CtZone)).Return(testConns, 0, nil) connDumperMock.EXPECT().GetMaxConnections().Return(0, nil)