Skip to content

Commit

Permalink
Port Antrea Flow Visibility Improvments
Browse files Browse the repository at this point in the history
This change ports various improvments for the current flow visibility
e2e test in antrea main repo:
antrea-io/antrea#3673

Other minor changes:
- removed coverage related code and option from the test
- changed busybox image to the correct one

Signed-off-by: Shawn Wang <[email protected]>
  • Loading branch information
wsquan171 committed May 10, 2022
1 parent 2600d8f commit be708e1
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 111 deletions.
12 changes: 7 additions & 5 deletions test/e2e/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func ensureAntreaRunning(data *TestData) error {
}

func teardownTest(tb testing.TB, data *TestData) {
// exportLogs(tb, data, "beforeTeardown", true)
exportLogs(tb, data, "beforeTeardown", true)
if empty, _ := IsDirEmpty(data.logsDirForTestCase); empty {
_ = os.Remove(data.logsDirForTestCase)
}
Expand Down Expand Up @@ -194,6 +194,9 @@ func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs
// dump the logs for flow-visibility Pods to disk.
data.forAllMatchingPodsInNamespace("", flowVisibilityNamespace, writePodLogs)

// dump the logs for clickhouse operator Pods to disk.
data.forAllMatchingPodsInNamespace("app=clickhouse-operator", kubeNamespace, writePodLogs)

// dump the output of "kubectl describe" for Antrea pods to disk.
data.forAllMatchingPodsInNamespace("app=antrea", antreaNamespace, func(nodeName, podName, nsName string) error {
w := getPodWriter(nodeName, podName, "describe")
Expand Down Expand Up @@ -274,7 +277,7 @@ func setupTest(tb testing.TB) (*TestData, error) {
return testData, nil
}

func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) {
func setupTestForFlowAggregator(tb testing.TB) (*TestData, bool, bool, error) {
v4Enabled := clusterInfo.podV4NetworkCIDR != ""
v6Enabled := clusterInfo.podV6NetworkCIDR != ""
testData, err := setupTest(tb)
Expand All @@ -283,12 +286,11 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) {
}

tb.Logf("Deploying ClickHouse")
chPodIPs, err := testData.deployFlowVisibilityClickHouse()
chSvcIP, err := testData.deployFlowVisibilityClickHouse()
if err != nil {
return testData, v4Enabled, v6Enabled, err
}
tb.Logf("ClickHouse Pod running on address: %s", chPodIPs.String())

tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP)
tb.Logf("Applying flow aggregator YAML")
if err := testData.deployFlowAggregator(); err != nil {
return testData, v4Enabled, v6Enabled, err
Expand Down
34 changes: 3 additions & 31 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type testFlow struct {
}

func TestFlowAggregator(t *testing.T) {
data, v4Enabled, v6Enabled, err := setupTestWithIPFIXCollector(t)
data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
Expand Down Expand Up @@ -495,13 +495,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
}
podName := flowAggPod.Name
for _, args := range antctl.CommandList.GetDebugCommands(runtime.ModeFlowAggregator) {
command := []string{}
if testOptions.enableCoverage {
antctlCovArgs := antctlCoverageArgs("antctl-coverage")
command = append(antctlCovArgs, args...)
} else {
command = append([]string{"antctl", "-v"}, args...)
}
command := append([]string{"antctl", "-v"}, args...)
t.Logf("Run command: %s", command)

t.Run(strings.Join(command, " "), func(t *testing.T) {
Expand Down Expand Up @@ -534,14 +528,8 @@ func checkAntctlGetFlowRecordsJson(t *testing.T, data *TestData, podName string,
_, srcPort, dstPort := getBandwidthAndPorts(stdout)

// run antctl command on flow aggregator to get flow records
var command []string
args := []string{"get", "flowrecords", "-o", "json", "--srcip", srcIP, "--srcport", srcPort}
if testOptions.enableCoverage {
antctlCovArgs := antctlCoverageArgs("antctl-coverage")
command = append(antctlCovArgs, args...)
} else {
command = append([]string{"antctl"}, args...)
}
command := append([]string{"antctl"}, args...)
t.Logf("Run command: %s", command)
stdout, stderr, err := runAntctl(podName, command, data)
require.NoErrorf(t, err, "Error when running 'antctl get flowrecords -o json' from %s: %v\n%s", podName, err, antctlOutput(stdout, stderr))
Expand Down Expand Up @@ -793,22 +781,6 @@ func checkFlowTypeClickHouse(t *testing.T, record *ClickHouseFullRow, flowType u
assert.Equal(t, record.FlowType, flowType, "Record does not have correct flowType")
}

func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 {
if strings.Contains(record, "TEMPLATE SET") {
return 0
}
splitLines := strings.Split(record, "\n")
for _, line := range splitLines {
if strings.Contains(line, field) {
lineSlice := strings.Split(line, ":")
value, err := strconv.ParseUint(strings.TrimSpace(lineSlice[1]), 10, 64)
require.NoError(t, err, "Error when converting %s to uint64 type", field)
return value
}
}
return 0
}

// getClickHouseOutput queries clickhouse with built-in client and checks if we have
// received all the expected records for a given flow with source IP, destination IP
// and source port. We send source port to ignore the control flows during the iperf test.
Expand Down
102 changes: 54 additions & 48 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
crdclientset "antrea.io/antrea/pkg/client/clientset/versioned"

"antrea.io/theia/test/e2e/providers"
)

Expand All @@ -66,26 +67,25 @@ const (
flowVisibilityNamespace string = "flow-visibility"
testNamespace string = "antrea-test"
iperfPort int32 = 5201
clickHouseHTTPPort string = "8123"
busyboxContainerName string = "busybox"
defaultBridgeName string = "br-int"
antreaYML string = "antrea.yml"
antreaDaemonSet string = "antrea-agent"
antreaDeployment string = "antrea-controller"
flowAggregatorDeployment string = "flow-aggregator"
flowAggregatorYML string = "flow-aggregator.yml"
flowAggregatorCovYML string = "flow-aggregator-coverage.yml"
flowVisibilityYML string = "flow-visibility.yml"
chOperatorYML string = "clickhouse-operator-install-bundle.yml"
flowVisibilityCHPodName string = "chi-clickhouse-clickhouse-0-0-0"

agnhostImage = "k8s.gcr.io/e2e-test-images/agnhost:2.29"
busyboxImage = "projects.registry.vmware.com/library/busybox"
busyboxImage = "projects.registry.vmware.com/antrea/busybox"
perftoolImage = "projects.registry.vmware.com/antrea/perftool"

exporterActiveFlowExportTimeout = 2 * time.Second
exporterIdleFlowExportTimeout = 1 * time.Second
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
aggregatorInactiveFlowRecordTimeout = 6 * time.Second
aggregatorClickHouseCommitInterval = 1 * time.Second
exporterActiveFlowExportTimeout = 2 * time.Second
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
aggregatorClickHouseCommitInterval = 1 * time.Second
)

type ClusterNode struct {
Expand Down Expand Up @@ -143,15 +143,13 @@ type TestOptions struct {
providerConfigPath string
logsExportDir string
logsExportOnSuccess bool
enableCoverage bool
coverageDir string
skipCases string
}

var testOptions TestOptions

var (
pods, namespaces []string
namespaces []string
)

type PodIPs struct {
Expand Down Expand Up @@ -624,6 +622,11 @@ func (data *TestData) DeletePod(namespace, name string) error {
return nil
}

// GetService is a convenience function for getting Service
func (data *TestData) GetService(namespace, name string) (*corev1.Service, error) {
return data.clientset.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

type PodCondition func(*corev1.Pod) (bool, error)

// PodWaitFor polls the K8s apiserver until the specified Pod is found (in the test Namespace) and
Expand Down Expand Up @@ -719,6 +722,20 @@ func (data *TestData) podWaitForIPs(timeout time.Duration, name, namespace strin
return ips, nil
}

// podWaitForReady polls the k8s apiserver until the specified Pod is in the "Ready" status (or
// until the provided timeout expires).
func (data *TestData) podWaitForReady(timeout time.Duration, name, namespace string) error {
_, err := data.PodWaitFor(timeout, name, namespace, func(p *corev1.Pod) (bool, error) {
for _, condition := range p.Status.Conditions {
if condition.Type == corev1.PodReady {
return condition.Status == corev1.ConditionTrue, nil
}
}
return false, nil
})
return err
}

// getImageName gets the image name from the fully qualified URI.
// For example: "gcr.io/kubernetes-e2e-test-images/agnhost:2.8" gets "agnhost".
func getImageName(uri string) string {
Expand Down Expand Up @@ -1073,12 +1090,16 @@ func (data *TestData) getClickHouseOperator() (*corev1.Pod, error) {
}

// deployFlowVisibilityClickHouse deploys ClickHouse operator and DB.
func (data *TestData) deployFlowVisibilityClickHouse() (*PodIPs, error) {
func (data *TestData) deployFlowVisibilityClickHouse() (string, error) {
err := data.CreateNamespace(flowVisibilityNamespace, nil)
if err != nil {
return nil, err
return "", err
}

rc, _, _, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", chOperatorYML))
if err != nil || rc != 0 {
return "", fmt.Errorf("error when deploying the ClickHouse Operator YML; %s not available on the control-plane Node", chOperatorYML)
}
if err := wait.Poll(2*time.Second, 10*time.Second, func() (bool, error) {
rc, stdout, stderr, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", flowVisibilityYML))
if err != nil || rc != 0 {
Expand All @@ -1092,54 +1113,39 @@ func (data *TestData) deployFlowVisibilityClickHouse() (*PodIPs, error) {
}
return true, nil
}); err != nil {
return nil, err
return "", err
}

// check for clickhouse operator running
chOperator, err := data.getClickHouseOperator()
if err != nil {
return nil, fmt.Errorf("error when getting clickhouse-operator Pod: %v", err)
// check for clickhouse pod Ready. Wait for 2x timeout as ch operator needs to be running first to handle chi
if err = data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil {
return "", err
}
podName := chOperator.Name
_, err = data.PodWaitFor(defaultTimeout, podName, kubeNamespace, func(p *corev1.Pod) (bool, error) {
for _, condition := range p.Status.Conditions {
if condition.Type == corev1.PodReady {
return condition.Status == corev1.ConditionTrue, nil
}
}
return false, nil
})

// check clickhouse service http port for service connectivity
chSvc, err := data.GetService("flow-visibility", "clickhouse-clickhouse")
if err != nil {
_, stdout, stderr, podErr := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl get po %s -n %s -o yaml", podName, kubeNamespace))
return nil, fmt.Errorf("error when waiting for ClickHouse Operator Ready: %v; stdout: %s, stderr: %s, %v", err, stdout, stderr, podErr)
return "", err
}
// check for clickhouse pod running
_, err = data.PodWaitFor(defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace, func(p *corev1.Pod) (bool, error) {
for _, condition := range p.Status.Conditions {
if condition.Type == corev1.PodReady {
return condition.Status == corev1.ConditionTrue, nil
}
if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (bool, error) {
rc, stdout, stderr, err := testData.RunCommandOnNode(controlPlaneNodeName(),
fmt.Sprintf("curl -Ss %s:%s", chSvc.Spec.ClusterIP, clickHouseHTTPPort))
if rc != 0 || err != nil {
log.Infof("Failed to curl clickhouse Service: %s", strings.Trim(stderr, "\n"))
return false, nil
} else {
log.Infof("Successfully curl'ed clickhouse Service: %s", strings.Trim(stdout, "\n"))
return true, nil
}
return false, nil
})
if err != nil {
_, stdout, stderr, podErr := data.provider.RunCommandOnNode(controlPlaneNodeName(),
fmt.Sprintf("kubectl get po %s -n %s -o yaml", flowVisibilityCHPodName, flowVisibilityNamespace))
return nil, fmt.Errorf("error when waiting for ClickHouse Ready: %v; stdout: %s; stderr: %s; %v", err, stdout, stderr, podErr)
}
podIPs, err := data.podWaitForIPs(defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace)
if err != nil {
return nil, fmt.Errorf("unable to get clickhouse server IP %s: %v", flowVisibilityCHPodName, err)
}); err != nil {
return "", fmt.Errorf("timeout checking http port connectivity of clickhouse service: %v", err)
}
return podIPs, nil

return chSvc.Spec.ClusterIP, nil
}

// deployFlowAggregator deploys the Flow Aggregator with ipfix collector and clickHouse address.
func (data *TestData) deployFlowAggregator() error {
flowAggYaml := flowAggregatorYML
if testOptions.enableCoverage {
flowAggYaml = flowAggregatorCovYML
}
rc, _, _, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", flowAggYaml))
if err != nil || rc != 0 {
return fmt.Errorf("error when deploying the Flow Aggregator; %s not available on the control-plane Node", flowAggYaml)
Expand Down
20 changes: 0 additions & 20 deletions test/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,12 @@ func (tOptions *TestOptions) setupLogging() func() {
return func() {}
}

// setupCoverage checks if the directory provided by the user exists.
func (tOptions *TestOptions) setupCoverage() func() {
if tOptions.coverageDir != "" {
fInfo, err := os.Stat(tOptions.coverageDir)
if err != nil {
log.Fatalf("Cannot stat provided directory '%s': %v", tOptions.coverageDir, err)
}
if !fInfo.Mode().IsDir() {
log.Fatalf("'%s' is not a valid directory", tOptions.coverageDir)
}

}
// no-op cleanup function
return func() {}

}

// testMain is meant to be called by TestMain and enables the use of defer statements.
func testMain(m *testing.M) int {
flag.StringVar(&testOptions.providerName, "provider", "vagrant", "K8s test cluster provider")
flag.StringVar(&testOptions.providerConfigPath, "provider-cfg-path", "", "Optional config file for provider")
flag.StringVar(&testOptions.logsExportDir, "logs-export-dir", "", "Export directory for test logs")
flag.BoolVar(&testOptions.logsExportOnSuccess, "logs-export-on-success", false, "Export logs even when a test is successful")
flag.BoolVar(&testOptions.enableCoverage, "coverage", false, "Run tests and measure coverage")
flag.StringVar(&testOptions.coverageDir, "coverage-dir", "", "Directory for coverage data files")
flag.StringVar(&testOptions.skipCases, "skip", "", "Key words to skip cases")
flag.Parse()

Expand Down Expand Up @@ -123,7 +104,6 @@ func testMain(m *testing.M) int {
log.Fatalf("Error when getting antrea-config configmap: %v", err)
}
rand.Seed(time.Now().UnixNano())
defer testOptions.setupCoverage()
ret := m.Run()
return ret
}
Expand Down
7 changes: 0 additions & 7 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math/rand"
"os"
"strings"
"time"
)

const nameSuffixLength int = 8
Expand Down Expand Up @@ -61,12 +60,6 @@ func randName(prefix string) string {
return prefix + randSeq(nameSuffixLength)
}

func antctlCoverageArgs(antctlPath string) []string {
const timeFormat = "20060102T150405Z0700"
timeStamp := time.Now().Format(timeFormat)
return []string{antctlPath, "-test.run=TestBincoverRunMain", fmt.Sprintf("-test.coverprofile=antctl-%s.out", timeStamp)}
}

// runAntctl runs antctl commands on antrea Pods, the controller, or agents.
func runAntctl(podName string, cmds []string, data *TestData) (string, string, error) {
var containerName string
Expand Down

0 comments on commit be708e1

Please sign in to comment.