Skip to content

Commit

Permalink
Flow aggregator IPv6 support
Browse files Browse the repository at this point in the history
Add IPv6 support in flow aggregator implementation.
  • Loading branch information
Yongming Ding committed Feb 11, 2021
1 parent 44b884f commit b51a71f
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 86 deletions.
7 changes: 7 additions & 0 deletions ci/jenkins/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ function deliver_antrea {
docker tag "${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3" "sonobuoy/systemd-logs:v0.3"
fi
DOCKER_REGISTRY=${DOCKER_REGISTRY} make
DOCKER_REGISTRY=${DOCKER_REGISTRY} make flow-aggregator-ubuntu

echo "====== Delivering Antrea to all the Nodes ======"
echo "=== Fill serviceCIDRv6 and serviceCIDR ==="
Expand All @@ -309,12 +310,18 @@ function deliver_antrea {
echo "---" >> build/yamls/antrea.yml
cat build/yamls/antrea-prometheus.yml >> build/yamls/antrea.yml

# Increase verbose for flow aggregator dual stack e2e test
sed -i "s|--v=0|--v=4|g" build/yamls/antrea.yml

cp -f build/yamls/*.yml $WORKDIR
docker save -o antrea-ubuntu.tar projects.registry.vmware.com/antrea/antrea-ubuntu:latest
docker save -o flow-aggregator.tar projects.registry.vmware.com/antrea/flow-aggregator:latest

kubectl get nodes -o wide --no-headers=true | awk -v role="$CONTROL_PLANE_NODE_ROLE" '$3 != role {print $6}' | while read IP; do
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" antrea-ubuntu.tar jenkins@[${IP}]:${WORKDIR}/antrea-ubuntu.tar
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" flow-aggregator.tar jenkins@[${IP}]:${WORKDIR}/flow-aggregator.tar
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker images | grep 'antrea-ubuntu' | awk '{print \$3}' | xargs -r docker rmi ; docker load -i ${WORKDIR}/antrea-ubuntu.tar ; docker images | grep '<none>' | awk '{print \$3}' | xargs -r docker rmi" || true
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker images | grep 'flow-aggregator' | awk '{print \$3}' | xargs -r docker rmi ; docker load -i ${WORKDIR}/flow-aggregator.tar ; docker images | grep '<none>' | awk '{print \$3}' | xargs -r docker rmi" || true
if [[ "${DOCKER_REGISTRY}" != "" ]]; then
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker pull ${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3 ; docker tag ${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3 sonobuoy/systemd-logs:v0.3"
fi
Expand Down
10 changes: 2 additions & 8 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,8 @@ func (exp *flowExporter) initFlowExporter(collectorAddr string, collectorProto s
if err != nil {
return err
}
// Currently, supporting only IPv4 for Flow Aggregator.
ip := hostIPs[0].To4()
if ip != nil {
// Update the collector address with resolved IP of flow aggregator
collectorAddr = net.JoinHostPort(ip.String(), defaultIPFIXPort)
} else {
return fmt.Errorf("resolved Flow Aggregator address %v is not supported", hostIPs[0])
}
// Update the collector address with resolved IP of flow aggregator
collectorAddr = net.JoinHostPort(hostIPs[0].String(), defaultIPFIXPort)
}

var expInput exporter.ExporterInput
Expand Down
66 changes: 49 additions & 17 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package flowaggregator
import (
"fmt"
"hash/fnv"
"net"
"time"

"github.com/vmware/go-ipfix/pkg/collector"
Expand All @@ -30,7 +31,7 @@ import (
)

var (
ianaInfoElements = []string{
ianaInfoElementsCommon = []string{
"flowStartSeconds",
"flowEndSeconds",
"sourceTransportPort",
Expand All @@ -40,16 +41,16 @@ var (
"octetTotalCount",
"packetDeltaCount",
"octetDeltaCount",
"sourceIPv4Address",
"destinationIPv4Address",
}
ianaInfoElementsIPv4 = append(ianaInfoElementsCommon, []string{"sourceIPv4Address", "destinationIPv4Address"}...)
ianaInfoElementsIPv6 = append(ianaInfoElementsCommon, []string{"sourceIPv6Address", "destinationIPv6Address"}...)
ianaReverseInfoElements = []string{
"reversePacketTotalCount",
"reverseOctetTotalCount",
"reversePacketDeltaCount",
"reverseOctetDeltaCount",
}
antreaInfoElements = []string{
antreaInfoElementsCommon = []string{
"sourcePodName",
"sourcePodNamespace",
"sourceNodeName",
Expand All @@ -62,12 +63,14 @@ var (
"ingressNetworkPolicyNamespace",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
"destinationClusterIPv4",
}
aggregatorElements = []string{
"originalExporterIPv4Address",
antreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
antreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
aggregatorElementsCommon = []string{
"originalObservationDomainId",
}
aggregatorElementsIPv4 = append([]string{"originalExporterIPv4Address"}, aggregatorElementsCommon...)
aggregatorElementsIPv6 = append([]string{"originalExporterIPv6Address"}, aggregatorElementsCommon...)

nonStatsElementList = []string{
"flowEndSeconds",
Expand Down Expand Up @@ -117,6 +120,7 @@ var (
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIPv4",
"destinationClusterIPv6",
"destinationServicePort",
"destinationServicePortName",
"ingressNetworkPolicyName",
Expand Down Expand Up @@ -149,7 +153,8 @@ type flowAggregator struct {
aggregationProcess ipfix.IPFIXAggregationProcess
exportInterval time.Duration
exportingProcess ipfix.IPFIXExportingProcess
templateID uint16
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
}

Expand All @@ -165,6 +170,7 @@ func NewFlowAggregator(externalFlowCollectorAddr string, externalFlowCollectorPr
exportInterval,
nil,
0,
0,
registry,
}
return fa
Expand Down Expand Up @@ -247,17 +253,29 @@ func (fa *flowAggregator) initExportingProcess() error {
return fmt.Errorf("got error when initializing IPFIX exporting process: %v", err)
}
fa.exportingProcess = ep
fa.templateID = fa.exportingProcess.NewTemplateID()
templateSet := ipfix.NewSet(ipfixentities.Template, fa.templateID, false)

bytesSent, err := fa.sendTemplateSet(templateSet)
fa.templateIDv4 = fa.exportingProcess.NewTemplateID()
templateSet := ipfix.NewSet(ipfixentities.Template, fa.templateIDv4, false)
bytesSent, err := fa.sendTemplateSet(templateSet, false)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.templateID = 0
fa.templateIDv4 = 0
return fmt.Errorf("sending template set failed, err: %v", err)
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of template set", bytesSent)
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv4 template set", bytesSent)

fa.templateIDv6 = fa.exportingProcess.NewTemplateID()
templateSet = ipfix.NewSet(ipfixentities.Template, fa.templateIDv6, false)
bytesSent, err = fa.sendTemplateSet(templateSet, true)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.templateIDv6 = 0
return fmt.Errorf("sending template set failed, err: %v", err)
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv6 template set", bytesSent)

return nil
}

Expand Down Expand Up @@ -302,9 +320,13 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
klog.V(4).Info("Skip sending record that is not correlated.")
return nil
}
templateID := fa.templateIDv4
if net.ParseIP(key.SourceAddress).To4() == nil || net.ParseIP(key.DestinationAddress).To4() == nil {
templateID = fa.templateIDv6
}
// TODO: more records per data set will be supported when go-ipfix supports size check when adding records
dataSet := ipfix.NewSet(ipfixentities.Data, fa.templateID, false)
err := dataSet.AddRecord(record.Record.GetOrderedElementList(), fa.templateID)
dataSet := ipfix.NewSet(ipfixentities.Data, templateID, false)
err := dataSet.AddRecord(record.Record.GetOrderedElementList(), templateID)
if err != nil {
return fmt.Errorf("error when adding the record to the set: %v", err)
}
Expand All @@ -316,8 +338,18 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
return nil
}

func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet) (int, error) {
func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet, isIPv6 bool) (int, error) {
elements := make([]*ipfixentities.InfoElementWithValue, 0)
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
templateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
templateID = fa.templateIDv6
}
for _, ie := range ianaInfoElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
Expand Down Expand Up @@ -366,7 +398,7 @@ func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet) (int, erro
ie := ipfixentities.NewInfoElementWithValue(element, nil)
elements = append(elements, ie)
}
err := templateSet.AddRecord(elements, fa.templateID)
err := templateSet.AddRecord(elements, templateID)
if err != nil {
return 0, fmt.Errorf("error when adding record to set, error: %v", err)
}
Expand Down
88 changes: 51 additions & 37 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
testTemplateID = uint16(256)
testTemplateIDv4 = uint16(256)
testTemplateIDv6 = uint16(257)
testExportInterval = 60 * time.Second
)

Expand All @@ -51,46 +52,59 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
nil,
testExportInterval,
mockIPFIXExpProc,
testTemplateID,
testTemplateIDv4,
testTemplateIDv6,
mockIPFIXRegistry,
}

// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range ianaReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].Element, nil)
}
for i, ie := range antreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
}
for _, isIPv6 := range []bool{false, true} {
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
testTemplateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
testTemplateID = fa.templateIDv6
}
// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range ianaReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].Element, nil)
}
for i, ie := range antreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
}

for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
}
var tempSet ipfixentities.Set
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
mockTempSet.EXPECT().GetSet().Return(tempSet)
for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
}
var tempSet ipfixentities.Set
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
mockTempSet.EXPECT().GetSet().Return(tempSet)

// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(tempSet).Return(0, nil)
// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(tempSet).Return(0, nil)

_, err := fa.sendTemplateSet(mockTempSet)
assert.NoErrorf(t, err, "Error in sending template record: %v", err)
_, err := fa.sendTemplateSet(mockTempSet, isIPv6)
assert.NoErrorf(t, err, "Error in sending template record: %v, isIPv6: %v", err, isIPv6)
}
}
2 changes: 1 addition & 1 deletion pkg/util/flowexport/flowexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func ParseFlowCollectorAddr(addr string, defaultPort string, defaultProtocol str
}
if match {
idx := strings.Index(addr, "]")
strSlice = append(strSlice, addr[:idx+1])
strSlice = append(strSlice, addr[1:idx])
strSlice = append(strSlice, strings.Split(addr[idx+2:], ":")...)
} else {
strSlice = strings.Split(addr, ":")
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/flowexport/flowexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestParseFlowCollectorAddr(t *testing.T) {
},
{
addr: "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:80:tcp",
expectedHost: "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]",
expectedHost: "fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
expectedPort: "80",
expectedProto: "tcp",
expectedError: nil,
Expand Down
31 changes: 20 additions & 11 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package e2e

import (
"fmt"
"net"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -128,11 +129,14 @@ func setupTest(tb testing.TB) (*TestData, error) {
return testData, nil
}

func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
// TODO: remove hardcoding to IPv4 after flow aggregator supports IPv6
func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
isIPv6 := false
if _, err := setupTest(tb); err != nil {
return nil, err, isIPv6
if clusterInfo.podV6NetworkCIDR != "" {
isIPv6 = true
}
testData, err := setupTest(tb)
if err != nil {
return testData, isIPv6, err
}

// Create pod using ipfix collector image
Expand All @@ -142,14 +146,19 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
if err != nil || len(ipfixCollectorIP.ipStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
return nil, err, isIPv6
return nil, isIPv6, err
}
var ipStr string
if isIPv6 && ipfixCollectorIP.ipv6 != nil {
ipStr = ipfixCollectorIP.ipv6.String()
} else {
ipStr = ipfixCollectorIP.ipv4.String()
}
ipStr := ipfixCollectorIP.ipv4.String()
ipfixCollectorAddr := fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)
ipfixCollectorAddr := fmt.Sprintf("%s:tcp", net.JoinHostPort(ipStr, ipfixCollectorPort))
tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr)
faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr)
if err != nil {
return testData, err, isIPv6
return testData, isIPv6, err
}

faClusterIPAddr := ""
Expand All @@ -160,14 +169,14 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
}
tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr)
if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return testData, err, isIPv6
return testData, isIPv6, err
}

tb.Logf("Checking CoreDNS deployment")
if err = testData.checkCoreDNSPods(defaultTimeout); err != nil {
return testData, err, isIPv6
return testData, isIPv6, err
}
return testData, nil, isIPv6
return testData, isIPv6, nil
}

func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) {
Expand Down
Loading

0 comments on commit b51a71f

Please sign in to comment.