Skip to content

Commit

Permalink
Fix FlowAggregator e2e test ANP deployment issue (#42)
Browse files Browse the repository at this point in the history
The original e2e test cases only check IngressRuleTable/EgressTable flow
count, and use the result to decide the if the NP rule is realized. But
these two tables are used to realize K8s NP rules. For a normal ANP
rule, AntreaPolicyIngressRuleTable/AntreaPolicyEgressRuleTable are used,
but never checked in the case. Besides, the original case always uses
number 2 to check the rule flow existence, but the fact is there always
no less than 2 flows in IngressRule/EgressRule table, which are used to
bypass the established and related packets in a valid connection. So the
case actually doesn't check if the rule is realized or not.

Signed-off-by: wenyingd <[email protected]>
Co-authored-by: heanlan <[email protected]>
  • Loading branch information
heanlan authored Jun 10, 2022
1 parent d38144f commit a16e7ed
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 23 deletions.
59 changes: 41 additions & 18 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/antctl"
"antrea.io/antrea/pkg/antctl/runtime"
secv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
Expand Down Expand Up @@ -134,7 +135,11 @@ const (
// Set target bandwidth(bits/sec) of iPerf traffic to a relatively small value
// (default unlimited for TCP), to reduce the variances caused by network performance
// during 12s, and make the throughput test more stable.
iperfBandwidth = "10m"
iperfBandwidth = "10m"
antreaEgressTableInitFlowCount = 3
antreaIngressTableInitFlowCount = 6
ingressTableInitFlowCount = 1
egressTableInitFlowCount = 1
)

var (
Expand Down Expand Up @@ -215,7 +220,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
// applied to destination Pod (one reject rule, one drop rule) and their flow information is exported as IPFIX flow records.
// perftest-a -> perftest-b (Ingress reject), perftest-a -> perftest-d (Ingress drop)
t.Run("IntraNodeDenyConnIngressANP", func(t *testing.T) {
anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", true)
anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true)
defer func() {
if anp1 != nil {
if err = data.deleteAntreaNetworkpolicy(anp1); err != nil {
Expand Down Expand Up @@ -249,7 +254,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
// applied to source Pods (one reject rule, one drop rule) and their flow information is exported as IPFIX flow records.
// perftest-a (Egress reject) -> perftest-b , perftest-a (Egress drop) -> perftest-d
t.Run("IntraNodeDenyConnEgressANP", func(t *testing.T) {
anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", false)
anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false)
defer func() {
if anp1 != nil {
if err = data.deleteAntreaNetworkpolicy(anp1); err != nil {
Expand Down Expand Up @@ -283,7 +288,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
// applied to one destination Pod, one source Pod, respectively and their flow information is exported as IPFIX flow records.
// perftest-a -> perftest-b (Ingress deny), perftest-d (Egress deny) -> perftest-a
t.Run("IntraNodeDenyConnNP", func(t *testing.T) {
np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d")
np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName())
defer func() {
if np1 != nil {
if err = data.deleteNetworkpolicy(np1); err != nil {
Expand Down Expand Up @@ -317,7 +322,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
// and their flow information is exported as IPFIX flow records.
// Antrea network policies are being tested here.
t.Run("InterNodeFlows", func(t *testing.T) {
anp1, anp2 := deployAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c")
anp1, anp2 := deployAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", controlPlaneNodeName(), workerNodeName(1))
defer func() {
if anp1 != nil {
data.DeleteANP(testNamespace, anp1.Name)
Expand All @@ -337,7 +342,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
// applied to destination Pod (one reject rule, one drop rule) and their flow information is exported as IPFIX flow records.
// perftest-a -> perftest-c (Ingress reject), perftest-a -> perftest-e (Ingress drop)
t.Run("InterNodeDenyConnIngressANP", func(t *testing.T) {
anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", true)
anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true)
defer func() {
if anp1 != nil {
if err = data.deleteAntreaNetworkpolicy(anp1); err != nil {
Expand Down Expand Up @@ -371,7 +376,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
// applied to source Pod (one reject rule, one drop rule) and their flow information is exported as IPFIX flow records.
// perftest-a (Egress reject) -> perftest-c, perftest-a (Egress drop)-> perftest-e
t.Run("InterNodeDenyConnEgressANP", func(t *testing.T) {
anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", false)
anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false)
defer func() {
if anp1 != nil {
if err = data.deleteAntreaNetworkpolicy(anp1); err != nil {
Expand Down Expand Up @@ -405,7 +410,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
// applied to one destination Pod, one source Pod, respectively and their flow information is exported as IPFIX flow records.
// perftest-a -> perftest-c (Ingress deny), perftest-b (Egress deny) -> perftest-e
t.Run("InterNodeDenyConnNP", func(t *testing.T) {
np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b")
np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b", workerNodeName(1), controlPlaneNodeName())
defer func() {
if np1 != nil {
if err = data.deleteNetworkpolicy(np1); err != nil {
Expand Down Expand Up @@ -878,14 +883,17 @@ func deployK8sNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod strin
t.Errorf("Error when creating Network Policy: %v", err)
}
// Wait for network policies to be realized.
if err := data.WaitNetworkPolicyRealize(2); err != nil {
t.Errorf("Error when waiting for Network Policy to be realized: %v", err)
if err := data.WaitNetworkPolicyRealize(controlPlaneNodeName(), openflow.IngressRuleTable, ingressTableInitFlowCount+1); err != nil {
t.Errorf("Error when waiting for ingress Network Policy to be realized: %v", err)
}
if err := data.WaitNetworkPolicyRealize(controlPlaneNodeName(), openflow.IngressRuleTable, egressTableInitFlowCount+1); err != nil {
t.Errorf("Error when waiting for egress Network Policy to be realized: %v", err)
}
t.Log("Network Policies are realized.")
return np1, np2
}

func deployAntreaNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) (anp1 *secv1alpha1.NetworkPolicy, anp2 *secv1alpha1.NetworkPolicy) {
func deployAntreaNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod, srcNode, dstNode string) (anp1 *secv1alpha1.NetworkPolicy, anp2 *secv1alpha1.NetworkPolicy) {
builder1 := &utils.AntreaNetworkPolicySpecBuilder{}
// apply anp to dstPod, allow ingress from srcPod
builder1 = builder1.SetName(testNamespace, ingressAntreaNetworkPolicyName).
Expand Down Expand Up @@ -913,17 +921,23 @@ func deployAntreaNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod st
}

// Wait for network policies to be realized.
if err := data.WaitNetworkPolicyRealize(2); err != nil {
t.Errorf("Error when waiting for Antrea Network Policy to be realized: %v", err)
if err := data.WaitNetworkPolicyRealize(dstNode, openflow.AntreaPolicyIngressRuleTable, antreaIngressTableInitFlowCount+1); err != nil {
t.Errorf("Error when waiting for Antrea ingress Network Policy to be realized: %v", err)
}
if err := data.WaitNetworkPolicyRealize(srcNode, openflow.AntreaPolicyEgressRuleTable, antreaEgressTableInitFlowCount+1); err != nil {
t.Errorf("Error when waiting for Antrea egress Network Policy to be realized: %v", err)
}
t.Log("Antrea Network Policies are realized.")
return anp1, anp2
}

func deployDenyAntreaNetworkPolicies(t *testing.T, data *TestData, srcPod, podReject, podDrop string, isIngress bool) (anp1 *secv1alpha1.NetworkPolicy, anp2 *secv1alpha1.NetworkPolicy) {
func deployDenyAntreaNetworkPolicies(t *testing.T, data *TestData, srcPod, podReject, podDrop string, srcNode, dstNode string, isIngress bool) (anp1 *secv1alpha1.NetworkPolicy, anp2 *secv1alpha1.NetworkPolicy) {
var err error
builder1 := &utils.AntreaNetworkPolicySpecBuilder{}
builder2 := &utils.AntreaNetworkPolicySpecBuilder{}
var table *openflow.Table
var flowCount int
var nodeName string
if isIngress {
// apply reject and drop ingress rule to destination pods
builder1 = builder1.SetName(testNamespace, ingressRejectANPName).
Expand All @@ -936,6 +950,9 @@ func deployDenyAntreaNetworkPolicies(t *testing.T, data *TestData, srcPod, podRe
SetAppliedToGroup([]utils.ANPAppliedToSpec{{PodSelector: map[string]string{"antrea-e2e": podDrop}}})
builder2 = builder2.AddIngress(corev1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"antrea-e2e": srcPod}, map[string]string{},
nil, nil, nil, secv1alpha1.RuleActionDrop, testIngressRuleName)
table = openflow.AntreaPolicyIngressRuleTable
flowCount = antreaIngressTableInitFlowCount + 2
nodeName = dstNode
} else {
// apply reject and drop egress rule to source pod
builder1 = builder1.SetName(testNamespace, egressRejectANPName).
Expand All @@ -948,6 +965,9 @@ func deployDenyAntreaNetworkPolicies(t *testing.T, data *TestData, srcPod, podRe
SetAppliedToGroup([]utils.ANPAppliedToSpec{{PodSelector: map[string]string{"antrea-e2e": srcPod}}})
builder2 = builder2.AddEgress(corev1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"antrea-e2e": podDrop}, map[string]string{},
nil, nil, nil, secv1alpha1.RuleActionDrop, testEgressRuleName)
table = openflow.AntreaPolicyEgressRuleTable
flowCount = antreaEgressTableInitFlowCount + 2
nodeName = srcNode
}
anp1 = builder1.Get()
anp1, err = data.CreateOrUpdateANP(anp1)
Expand All @@ -960,14 +980,14 @@ func deployDenyAntreaNetworkPolicies(t *testing.T, data *TestData, srcPod, podRe
failOnError(fmt.Errorf("Error when creating Antrea Network Policy: %v", err), t, data)
}
// Wait for Antrea NetworkPolicy to be realized.
if err := data.WaitNetworkPolicyRealize(2); err != nil {
if err := data.WaitNetworkPolicyRealize(nodeName, table, flowCount); err != nil {
t.Errorf("Error when waiting for Antrea Network Policy to be realized: %v", err)
}
t.Log("Antrea Network Policies are realized.")
return anp1, anp2
}

func deployDenyNetworkPolicies(t *testing.T, data *TestData, pod1, pod2 string) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) {
func deployDenyNetworkPolicies(t *testing.T, data *TestData, pod1, pod2 string, node1, node2 string) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) {
np1, err := data.createNetworkPolicy(ingressDenyNPName, &networkingv1.NetworkPolicySpec{
PodSelector: metav1.LabelSelector{
MatchLabels: map[string]string{
Expand All @@ -993,8 +1013,11 @@ func deployDenyNetworkPolicies(t *testing.T, data *TestData, pod1, pod2 string)
t.Errorf("Error when creating Network Policy: %v", err)
}
// Wait for NetworkPolicy to be realized.
if err := data.WaitNetworkPolicyRealize(2); err != nil {
t.Errorf("Error when waiting for Network Policies to be realized: %v", err)
if err := data.WaitNetworkPolicyRealize(node1, openflow.IngressRuleTable, ingressTableInitFlowCount+1); err != nil {
t.Errorf("Error when waiting for ingress Network Policies to be realized: %v", err)
}
if err := data.WaitNetworkPolicyRealize(node2, openflow.EgressRuleTable, egressTableInitFlowCount+1); err != nil {
t.Errorf("Error when waiting for egress Network Policies to be realized: %v", err)
}
t.Log("Network Policies are realized.")
return np1, np2
Expand Down
10 changes: 5 additions & 5 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,9 +1005,9 @@ func (data *TestData) deleteService(namespace, name string) error {
return nil
}

func (data *TestData) WaitNetworkPolicyRealize(policyRules int) error {
func (data *TestData) WaitNetworkPolicyRealize(nodeName string, table *openflow.Table, policyRules int) error {
return wait.PollImmediate(50*time.Millisecond, realizeTimeout, func() (bool, error) {
return data.checkRealize(policyRules)
return data.checkRealize(nodeName, table, policyRules)
})
}

Expand All @@ -1017,13 +1017,13 @@ func (data *TestData) WaitNetworkPolicyRealize(policyRules int) error {
// IngressRule. checkRealize returns true when the number of flows exceeds the number of CIDR, because each table has a
// default flow entry which is used for default matching.
// Since the check is done over SSH, the time measurement is not completely accurate.
func (data *TestData) checkRealize(policyRules int) (bool, error) {
antreaPodName, err := data.getAntreaPodOnNode(controlPlaneNodeName())
func (data *TestData) checkRealize(nodeName string, table *openflow.Table, policyRules int) (bool, error) {
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
return false, err
}
// table IngressRule is the ingressRuleTable where the rules in workload network policy is being applied to.
cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName, fmt.Sprintf("table=%s", openflow.IngressRuleTable.GetName())}
cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName, fmt.Sprintf("table=%s", table.GetName())}
stdout, _, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, "antrea-agent", cmd)
if err != nil {
return false, err
Expand Down

0 comments on commit a16e7ed

Please sign in to comment.