Skip to content

Commit

Permalink
Debug dual stack support
Browse files Browse the repository at this point in the history
  • Loading branch information
Yongming Ding committed Feb 16, 2021
1 parent b51a71f commit c1bc648
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 41 deletions.
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
clusterIP := conn.TupleOrig.DestinationAddress.String()
svcPort := conn.TupleOrig.DestinationPort
protocol, err := lookupServiceProtocol(conn.TupleOrig.Protocol)
klog.V(4).Infof("Detect pod-to-service flow to %s:%d/%s", clusterIP, svcPort, protocol)
if err != nil {
klog.Warningf("Could not retrieve Service protocol: %v", err)
} else {
Expand Down
84 changes: 43 additions & 41 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,58 +108,60 @@ func TestFlowAggregator(t *testing.T) {
defer teardownFlowAggregator(t, data)
defer teardownTest(t, data)

podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data, isIPv6)
podAIP, _, _, svcB, svcC, err := createPerftestPods(data, isIPv6)
if err != nil {
t.Fatalf("Error when creating perftest pods and services: %v", err)
}
// Wait for the Service to be realized.
time.Sleep(3 * time.Second)

// IntraNodeFlows tests the case, where Pods are deployed on same Node and their flow information is exported as IPFIX flow records.
t.Run("IntraNodeFlows", func(t *testing.T) {
np1, np2 := deployNetworkPolicies(t, data, "perftest-a", "perftest-b")
defer func() {
if np1 != nil {
if err = data.deleteNetworkpolicy(np1); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
/*
// IntraNodeFlows tests the case, where Pods are deployed on same Node and their flow information is exported as IPFIX flow records.
t.Run("IntraNodeFlows", func(t *testing.T) {
np1, np2 := deployNetworkPolicies(t, data, "perftest-a", "perftest-b")
defer func() {
if np1 != nil {
if err = data.deleteNetworkpolicy(np1); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
}
}
}
if np2 != nil {
if err = data.deleteNetworkpolicy(np2); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
if np2 != nil {
if err = data.deleteNetworkpolicy(np2); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
}
}
}()
// TODO: Skipping bandwidth check for Intra-Node flows as it is flaky.
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), podBIP.ipv4.String(), isIPv6, true, false, true, false)
} else {
checkRecordsForFlows(t, data, podAIP.ipv6.String(), podBIP.ipv6.String(), isIPv6, true, false, true, false)
}
}()
// TODO: Skipping bandwidth check for Intra-Node flows as it is flaky.
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), podBIP.ipv4.String(), isIPv6, true, false, true, false)
} else {
checkRecordsForFlows(t, data, podAIP.ipv6.String(), podBIP.ipv6.String(), isIPv6, true, false, true, false)
}
})
})
// InterNodeFlows tests the case, where Pods are deployed on different Nodes
// and their flow information is exported as IPFIX flow records.
t.Run("InterNodeFlows", func(t *testing.T) {
np1, np2 := deployNetworkPolicies(t, data, "perftest-a", "perftest-c")
defer func() {
if np1 != nil {
if err = data.deleteNetworkpolicy(np1); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
// InterNodeFlows tests the case, where Pods are deployed on different Nodes
// and their flow information is exported as IPFIX flow records.
t.Run("InterNodeFlows", func(t *testing.T) {
np1, np2 := deployNetworkPolicies(t, data, "perftest-a", "perftest-c")
defer func() {
if np1 != nil {
if err = data.deleteNetworkpolicy(np1); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
}
}
}
if np2 != nil {
if err = data.deleteNetworkpolicy(np2); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
if np2 != nil {
if err = data.deleteNetworkpolicy(np2); err != nil {
t.Errorf("Error when deleting network policy: %v", err)
}
}
}()
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), podCIP.ipv4.String(), isIPv6, false, false, true, true)
} else {
checkRecordsForFlows(t, data, podAIP.ipv6.String(), podCIP.ipv6.String(), isIPv6, false, false, true, true)
}
}()
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), podCIP.ipv4.String(), isIPv6, false, false, true, true)
} else {
checkRecordsForFlows(t, data, podAIP.ipv6.String(), podCIP.ipv6.String(), isIPv6, false, false, true, true)
}
})
})
*/

// LocalServiceAccess tests the case, where Pod and Service are deployed on the same Node and their flow information is exported as IPFIX flow records.
t.Run("LocalServiceAccess", func(t *testing.T) {
Expand Down Expand Up @@ -209,8 +211,8 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
if err != nil {
_, logStdout, _, _ := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n flow-aggregator logs --since=%v -l app=flow-aggregator", time.Since(timeStart).String()))
t.Logf("Flow aggregator logs: %s", logStdout)
// _, collectorOutput, _, _ := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --since=%v ipfix-collector -n antrea-test", time.Since(timeStart).String()))
// t.Logf("Collector output: %s", collectorOutput)
_, collectorOutput, _, _ := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --since=%v ipfix-collector -n antrea-test", time.Since(timeStart).String()))
t.Logf("Collector output: %s", collectorOutput)
_, exporterLog, _, _ := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n kube-system logs --since=%v -l component=antrea-agent --container=antrea-agent", time.Since(timeStart).String()))
t.Logf("Flow exporter logs: %s", exporterLog)

Expand Down

0 comments on commit c1bc648

Please sign in to comment.