Skip to content

Commit

Permalink
Fix the bug in Flow Exporter that causes intermittent errors in e2e t…
Browse files Browse the repository at this point in the history
…ests(#1959)

This commit fixes the bug in Flow Exporter code. This bug was introduced in PR #1714. Because of this, the IPFIX exporting process interface in Flow Exporter is assigned with typecasted nil (typecasted to corresponding structure ipfixExportingProcess). Therefore, the code comparing with nil breaks in exporting process initialization code of Flow Exporter.
This PR fixes the bug along with another issue in fetching certificates that exacerbates this bug when the Flow Exporter cannot connect to the Flow Aggregator.

Fixes #1956
  • Loading branch information
srikartati authored Mar 17, 2021
1 parent 796ea63 commit a9adf5a
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 68 deletions.
111 changes: 57 additions & 54 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,23 @@ const (
)

type flowExporter struct {
connStore connections.ConnectionStore
flowRecords *flowrecords.FlowRecords
process ipfix.IPFIXExportingProcess
elementsListv4 []*ipfixentities.InfoElementWithValue
elementsListv6 []*ipfixentities.InfoElementWithValue
ipfixSet ipfix.IPFIXSet
numDataSetsSent uint64 // used for unit tests.
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
v4Enabled bool
v6Enabled bool
exporterInput exporter.ExporterInput
activeFlowTimeout time.Duration
idleFlowTimeout time.Duration
connStore connections.ConnectionStore
flowRecords *flowrecords.FlowRecords
process ipfix.IPFIXExportingProcess
elementsListv4 []*ipfixentities.InfoElementWithValue
elementsListv6 []*ipfixentities.InfoElementWithValue
ipfixSet ipfix.IPFIXSet
numDataSetsSent uint64 // used for unit tests.
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
v4Enabled bool
v6Enabled bool
exporterInput exporter.ExporterInput
activeFlowTimeout time.Duration
idleFlowTimeout time.Duration
enableTLSToFlowAggregator bool
k8sClient kubernetes.Interface
}

func genObservationID() (uint32, error) {
Expand All @@ -107,41 +109,14 @@ func genObservationID() (uint32, error) {
return h.Sum32(), nil
}

func prepareExporterInputArgs(collectorAddr, collectorProto string, enableTLSToFlowAggregator bool, k8sClient kubernetes.Interface) (exporter.ExporterInput, error) {
func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.ExporterInput, error) {
expInput := exporter.ExporterInput{}
var err error
// Exporting process requires domain observation ID.
expInput.ObservationDomainID, err = genObservationID()
if err != nil {
return expInput, err
}

if enableTLSToFlowAggregator {
// if CA certificate, client certificate and key do not exist during initialization,
// it will retry to obtain the credentials in next export cycle
expInput.CACert, err = getCACert(k8sClient)
if err != nil {
return expInput, fmt.Errorf("cannot retrieve CA cert: %v", err)
}
expInput.ClientCert, expInput.ClientKey, err = getClientCertKey(k8sClient)
if err != nil {
return expInput, fmt.Errorf("cannot retrieve client cert and key: %v", err)
}
// TLS transport does not need any tempRefTimeout as it is applicable only
// for TCP transport, so sending 0.
expInput.TempRefTimeout = 0
expInput.IsEncrypted = true
} else if collectorProto == "tcp" {
// TCP transport does not need any tempRefTimeout, so sending 0.
// tempRefTimeout is the template refresh timeout, which specifies how often
// the exporting process should send the template again.
expInput.TempRefTimeout = 0
expInput.IsEncrypted = false
} else {
// For UDP transport, hardcoding tempRefTimeout value as 1800s.
expInput.TempRefTimeout = 1800
expInput.IsEncrypted = false
}
expInput.CollectorAddress = collectorAddr
expInput.CollectorProtocol = collectorProto
expInput.PathMTU = 0
Expand All @@ -157,20 +132,22 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
registry.LoadRegistry()

// Prepare input args for IPFIX exporting process.
expInput, err := prepareExporterInputArgs(collectorAddr, collectorProto, enableTLSToFlowAggregator, k8sClient)
expInput, err := prepareExporterInputArgs(collectorAddr, collectorProto)
if err != nil {
return nil, err
}
return &flowExporter{
connStore: connStore,
flowRecords: records,
registry: registry,
v4Enabled: v4Enabled,
v6Enabled: v6Enabled,
exporterInput: expInput,
activeFlowTimeout: activeFlowTimeout,
idleFlowTimeout: idleFlowTimeout,
ipfixSet: ipfix.NewSet(false),
connStore: connStore,
flowRecords: records,
registry: registry,
v4Enabled: v4Enabled,
v6Enabled: v6Enabled,
exporterInput: expInput,
activeFlowTimeout: activeFlowTimeout,
idleFlowTimeout: idleFlowTimeout,
ipfixSet: ipfix.NewSet(false),
enableTLSToFlowAggregator: enableTLSToFlowAggregator,
k8sClient: k8sClient,
}, nil
}

Expand Down Expand Up @@ -212,10 +189,36 @@ func (exp *flowExporter) Export() {

func (exp *flowExporter) initFlowExporter() error {
var err error
exp.process, err = ipfix.NewIPFIXExportingProcess(exp.exporterInput)
if exp.enableTLSToFlowAggregator {
// if CA certificate, client certificate and key do not exist during initialization,
// it will retry to obtain the credentials in next export cycle
exp.exporterInput.CACert, err = getCACert(exp.k8sClient)
if err != nil {
return fmt.Errorf("cannot retrieve CA cert: %v", err)
}
exp.exporterInput.ClientCert, exp.exporterInput.ClientKey, err = getClientCertKey(exp.k8sClient)
if err != nil {
return fmt.Errorf("cannot retrieve client cert and key: %v", err)
}
// TLS transport does not need any tempRefTimeout, so sending 0.
exp.exporterInput.TempRefTimeout = 0
exp.exporterInput.IsEncrypted = true
} else if exp.exporterInput.CollectorProtocol == "tcp" {
// TCP transport does not need any tempRefTimeout, so sending 0.
// tempRefTimeout is the template refresh timeout, which specifies how often
// the exporting process should send the template again.
exp.exporterInput.TempRefTimeout = 0
exp.exporterInput.IsEncrypted = false
} else {
// For UDP transport, hardcoding tempRefTimeout value as 1800s.
exp.exporterInput.TempRefTimeout = 1800
exp.exporterInput.IsEncrypted = false
}
expProcess, err := ipfix.NewIPFIXExportingProcess(exp.exporterInput)
if err != nil {
return fmt.Errorf("error when starting exporter: %v", err)
}
exp.process = expProcess
if exp.v4Enabled {
templateID := exp.process.NewTemplateID()
exp.templateIDv4 = templateID
Expand Down
25 changes: 13 additions & 12 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,42 +143,43 @@ func setupTest(tb testing.TB) (*TestData, error) {
func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
// TODO: remove hardcoding to IPv4 after flow aggregator supports IPv6
isIPv6 := false
if _, err := setupTest(tb); err != nil {
data, err := setupTest(tb)
if err != nil {
return nil, err, isIPv6
}
// Create pod using ipfix collector image
if err := testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
if err = data.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
tb.Errorf("Error when creating the ipfix collector Pod: %v", err)
}
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
ipfixCollectorIP, err := data.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
if err != nil || len(ipfixCollectorIP.ipStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
return nil, err, isIPv6
}
ipStr := ipfixCollectorIP.ipv4.String()
ipfixCollectorAddr := fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)

faClusterIPAddr := ""
tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr)
faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr)
faClusterIP, err := data.deployFlowAggregator(ipfixCollectorAddr)
if err != nil {
return testData, err, isIPv6
}

faClusterIPAddr := ""
if testOptions.providerName == "kind" {
// In Kind cluster, there are issues with DNS name resolution on worker nodes.
// Please note that CoreDNS services are forced on to control-plane Node.
faClusterIPAddr = fmt.Sprintf("%s:%s:tcp", faClusterIP, ipfixCollectorPort)
}
tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr)
if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return testData, err, isIPv6
if err = data.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return data, err, isIPv6
}

tb.Logf("Checking CoreDNS deployment")
if err = testData.checkCoreDNSPods(defaultTimeout); err != nil {
return testData, err, isIPv6
if err = data.checkCoreDNSPods(defaultTimeout); err != nil {
return data, err, isIPv6
}
return testData, nil, isIPv6
return data, nil, isIPv6
}

func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) {
Expand Down Expand Up @@ -304,7 +305,7 @@ func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs
}

func teardownFlowAggregator(tb testing.TB, data *TestData) {
if err := testData.gracefulExitFlowAggregator(testOptions.coverageDir); err != nil {
if err := data.gracefulExitFlowAggregator(testOptions.coverageDir); err != nil {
tb.Fatalf("Error when gracefully exiting Flow Aggregator: %v", err)
}
tb.Logf("Deleting '%s' K8s Namespace", flowAggregatorNamespace)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func TestFlowAggregator(t *testing.T) {
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownFlowAggregator(t, data)
defer teardownTest(t, data)
defer teardownFlowAggregator(t, data)

podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,8 @@ func (data *TestData) waitForAntreaDaemonSetPods(timeout time.Duration) error {
return true, nil
})
if err == wait.ErrWaitTimeout {
return fmt.Errorf("antrea-agent DaemonSet not ready within %v", defaultTimeout)
_, stdout, _, _ := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s describe pod", antreaNamespace))
return fmt.Errorf("antrea-agent DaemonSet not ready within %v; kubectl describe pod output: %v", defaultTimeout, stdout)
} else if err != nil {
return err
}
Expand Down

0 comments on commit a9adf5a

Please sign in to comment.