From df5fb933b22a2dc144e089bc04dd4999669aa5df Mon Sep 17 00:00:00 2001 From: graysonwu Date: Thu, 29 Sep 2022 18:15:15 -0700 Subject: [PATCH] Enhance ACNP Service related feature 1. Only load Service GroupID into reg when AntreaPolicy is enabled. Service GroupID is only used by AntreaPolicy "toServices" and "AppliedTo NodePort Serivces" features for now. 2. In IngressSecurityClassifierTable, only forward packet to AntreaPolicyIngressRuleTable when AntreaPolicy is enabled and proxyAll is enabled. This forward flow is only used by AntreaPolicy "AppliedTo NodePort Services" feature to avoid packets skip AntreaPolicyIngressRuleTable, where policy will be enforced, when the endpoint of this Service is not on current NodePort Node. 3. In ACNP appliedTo NodePort Service e2e test, change to add another netNS to fake external network. 4. Change to use gwOFPort as inPort of reject response for some cases. Signed-off-by: graysonwu --- pkg/agent/controller/networkpolicy/reject.go | 21 ++-- pkg/agent/openflow/client.go | 1 + pkg/agent/openflow/pipeline.go | 9 +- pkg/agent/openflow/service.go | 3 + test/e2e/antreapolicy_test.go | 109 ++++++++++--------- test/integration/agent/openflow_test.go | 87 ++++++++++++++- 6 files changed, 160 insertions(+), 70 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index 91e7e80800e..0544c9ec056 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -18,7 +18,6 @@ import ( "encoding/binary" "fmt" - "antrea.io/libOpenflow/openflow15" "antrea.io/libOpenflow/protocol" "antrea.io/ofnet/ofctrl" @@ -165,14 +164,8 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { srcMAC = sIface.MAC.String() dstMAC = dIface.MAC.String() } - tunPort := c.tunPort - if tunPort == 0 { - // openflow15.P_CONTROLLER is used with noEncap mode when tunnel interface is not found. - // It won't cause a loop with openflow15.P_CONTROLLER because it is used as the input port but not output port - // in the packet out message. - tunPort = uint32(openflow15.P_CONTROLLER) - } - inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, tunPort) + + inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort) mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType) if proto == protocol.Type_TCP { @@ -289,9 +282,19 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte outPort = gwOFPort case RejectNoAPServiceRemoteToLocal: inPort = tunOFPort + if inPort == 0 { + // If tunnel interface is not found, which means we are in noEncap mode, then use + // gateway port as inPort. + inPort = gwOFPort + } outPort = gwOFPort case RejectServiceRemoteToExternal: inPort = tunOFPort + if inPort == 0 { + // If tunnel interface is not found, which means we are in noEncap mode, then use + // gateway port as inPort. + inPort = gwOFPort + } } return inPort, outPort } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 007f6d447c5..a9ff1f34610 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -808,6 +808,7 @@ func (c *client) generatePipelines() { c.networkConfig, c.serviceConfig, c.bridge, + c.enableAntreaPolicy, c.enableProxy, c.proxyAll, c.connectUplinkToBridge) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 4d722b6781b..5be6f81344e 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2112,7 +2112,7 @@ func (f *featureNetworkPolicy) ingressClassifierFlows() []binding.Flow { Action().GotoTable(IngressMetricTable.GetID()). Done(), } - if f.proxyAll { + if f.enableAntreaPolicy && f.proxyAll { // This generates the flow to match the NodePort Service packets and forward them to AntreaPolicyIngressRuleTable. // Policies applied on NodePort Service will be enforced in AntreaPolicyIngressRuleTable. flows = append(flows, IngressSecurityClassifierTable.ofTable.BuildFlow(priorityNormal+1). @@ -2345,9 +2345,10 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType, MatchRegMark(EpToSelectRegMark). Action().LoadRegMark(regMarksToLoad...) } - return flowBuilder. - Action().LoadToRegField(ServiceGroupIDField, uint32(groupID)). - Action().Group(groupID).Done() + if f.enableAntreaPolicy { + flowBuilder = flowBuilder.Action().LoadToRegField(ServiceGroupIDField, uint32(groupID)) + } + return flowBuilder.Action().Group(groupID).Done() } // endpointDNATFlow generates the flow which transforms the Service Cluster IP to the Endpoint IP according to the Endpoint diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index d4e11e6a915..5b15661adec 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -44,6 +44,7 @@ type featureService struct { networkConfig *config.NetworkConfig gatewayPort uint32 + enableAntreaPolicy bool enableProxy bool proxyAll bool connectUplinkToBridge bool @@ -63,6 +64,7 @@ func newFeatureService( networkConfig *config.NetworkConfig, serviceConfig *config.ServiceConfig, bridge binding.Bridge, + enableAntreaPolicy, enableProxy, proxyAll, connectUplinkToBridge bool) *featureService { @@ -113,6 +115,7 @@ func newFeatureService( gatewayMAC: nodeConfig.GatewayConfig.MAC, gatewayPort: nodeConfig.GatewayConfig.OFPort, networkConfig: networkConfig, + enableAntreaPolicy: enableAntreaPolicy, enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index 0f05d780d08..5764224bbaa 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -3679,80 +3679,85 @@ func testACNPICMPSupport(t *testing.T, data *TestData) { func testACNPNodePortServiceSupport(t *testing.T, data *TestData) { skipIfProxyAllDisabled(t, data) - // Create a client on Node 0, one NodePort Service whose Endpoint is on Node 0 and - // another NodePort Service whose Endpoint is on Node 1. Initiate traffic from this - // client to these two Services Node 1 NodePort to simulate the traffic from - // external client to NodePort. - clientName := "agnhost-client" - failOnError(data.createAgnhostPodOnNode(clientName, data.testNamespace, nodeName(0), true), t) - defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace) - ips, err := data.podWaitForIPs(defaultTimeout, clientName, data.testNamespace) + // Create a NodePort Service. + ipProtocol := v1.IPv4Protocol + var nodePort int32 + nodePortSvc, err := data.createNginxNodePortService("test-nodeport-svc", false, false, &ipProtocol) failOnError(err, t) - - var cidr string - if clusterInfo.podV4NetworkCIDR != "" { - cidr = ips.ipv4.String() - } else { - cidr = ips.ipv6.String() + for _, port := range nodePortSvc.Spec.Ports { + if port.NodePort != 0 { + nodePort = port.NodePort + break + } } - cidr += "/32" - - svc1, cleanup1 := data.createAgnhostServiceAndBackendPods(t, "svc1", data.testNamespace, nodeName(0), v1.ServiceTypeNodePort) - defer cleanup1() - svc2, cleanup2 := data.createAgnhostServiceAndBackendPods(t, "svc2", data.testNamespace, nodeName(1), v1.ServiceTypeNodePort) - defer cleanup2() + backendPodName := "test-nodeport-backend-pod" + require.NoError(t, data.createNginxPodOnNode(backendPodName, data.testNamespace, nodeName(0), false)) + if err := data.podWaitForRunning(defaultTimeout, backendPodName, data.testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", backendPodName) + } + defer deletePodWrapper(t, data, data.testNamespace, backendPodName) + + // Create another netns to fake an external network on the host network Pod. + testNetns := "test-ns" + cmd := fmt.Sprintf(`ip netns add %[1]s && \ +ip link add dev %[1]s-a type veth peer name %[1]s-b && \ +ip link set dev %[1]s-a netns %[1]s && \ +ip addr add %[3]s/%[4]d dev %[1]s-b && \ +ip link set dev %[1]s-b up && \ +ip netns exec %[1]s ip addr add %[2]s/%[4]d dev %[1]s-a && \ +ip netns exec %[1]s ip link set dev %[1]s-a up && \ +ip netns exec %[1]s ip route replace default via %[3]s && \ +sleep 3600 +`, testNetns, "1.1.1.1", "1.1.1.254", 24) + clientNames := []string{"client0", "client1"} + for idx, clientName := range clientNames { + if err := NewPodBuilder(clientName, data.testNamespace, agnhostImage).OnNode(nodeName(idx)).WithCommand([]string{"sh", "-c", cmd}).InHostNetwork().Privileged().Create(data); err != nil { + t.Fatalf("Failed to create client Pod: %v", err) + } + defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace) + err = data.podWaitForRunning(defaultTimeout, clientName, data.testNamespace) + failOnError(err, t) + } + cidr := "1.1.1.1/24" builder := &ClusterNetworkPolicySpecBuilder{} builder = builder.SetName("test-acnp-nodeport-svc"). SetPriority(1.0). SetAppliedToGroup([]ACNPAppliedToSpec{ { Service: &crdv1alpha1.NamespacedName{ - Name: svc1.Name, - Namespace: svc1.Namespace, - }, - }, - { - Service: &crdv1alpha1.NamespacedName{ - Name: svc2.Name, - Namespace: svc2.Namespace, + Name: nodePortSvc.Name, + Namespace: nodePortSvc.Namespace, }, }, }) builder.AddIngress(ProtocolTCP, nil, nil, nil, nil, nil, nil, nil, &cidr, nil, nil, nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil) - testcases := []podToAddrTestStep{ - { - Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)), - nodeIP(1), - svc1.Spec.Ports[0].NodePort, - Rejected, - }, - { - Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)), - nodeIP(1), - svc2.Spec.Ports[0].NodePort, - Rejected, - }, - } - acnp, err := k8sUtils.CreateOrUpdateACNP(builder.Get()) failOnError(err, t) failOnError(waitForResourceReady(t, timeout, acnp), t) - for _, tc := range testcases { - log.Tracef("Probing: %s -> %s:%d", cidr, tc.destAddr, tc.destPort) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) - if err != nil { - t.Errorf("failure -- could not complete probe: %v", err) + for idx, clientName := range clientNames { + log.Tracef("Probing: 1.1.1.1 -> %s:%d", nodeIP(idx), nodePort) + // Connect to NodePort in the fake external network. + cmd = fmt.Sprintf("for i in $(seq 1 3); do ip netns exec %s /agnhost connect %s:%d --timeout=1s --protocol=tcp; done;", testNetns, nodeIP(idx), nodePort) + stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientName, agnhostContainerName, []string{"sh", "-c", cmd}) + connectivity := Connected + if err != nil || stderr != "" { + // log this error as trace since may be an expected failure + log.Tracef("1.1.1.1 -> %s:%d: error when running command: err - %v /// stdout - %s /// stderr - %s", nodeIP(idx), nodePort, err, stdout, stderr) + // If err != nil and stderr == "", then it means this probe failed because of + // the command instead of connectivity. For example, container name doesn't exist. + if stderr == "" { + connectivity = Error + } + connectivity = DecideProbeResult(stderr, 3) } - if connectivity != tc.expectedConnectivity { - t.Errorf("failure -- wrong results for probe: Source %s --> Dest %s:%d connectivity: %v, expected: %v", - cidr, tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity) + if connectivity != Rejected { + t.Errorf("failure -- wrong results for probe: Source 1.1.1.1 --> Dest %s:%d connectivity: %v, expected: Rej", nodeIP(idx), nodePort, connectivity) } } - // cleanup test resources failOnError(k8sUtils.DeleteACNP(builder.Name), t) } diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 7a59344e3b2..0803c84837a 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -604,7 +604,7 @@ type svcConfig struct { withSessionAffinity bool } -func TestProxyServiceFlows(t *testing.T) { +func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) { // Reset OVS metrics (Prometheus) and reinitialize them to test. legacyregistry.Reset() metrics.InitializeOVSMetrics() @@ -679,7 +679,77 @@ func TestProxyServiceFlows(t *testing.T) { for _, tc := range tcs { groupID := ofconfig.GroupIDType(tc.gid) - expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge) + expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, false) + installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge) + for _, tableFlow := range expTableFlows { + ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows) + } + ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, true) + + uninstallServiceFlowsFunc(t, tc.gid, tc.svc, tc.endpoints) + for _, tableFlow := range expTableFlows { + ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, false, tableFlow.flows) + } + ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, false) + } +} + +func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) { + // Reset OVS metrics (Prometheus) and reinitialize them to test. + legacyregistry.Reset() + metrics.InitializeOVSMetrics() + + c = ofClient.NewClient(br, bridgeMgmtAddr, true, true, false, false, false, false, false, false, false) + err := ofTestUtils.PrepareOVSBridge(br) + require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) + + config := prepareConfiguration(true, false) + _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) + require.Nil(t, err, "Failed to initialize OFClient") + + defer func() { + err = c.Disconnect() + assert.Nil(t, err, fmt.Sprintf("Error while disconnecting from OVS bridge: %v", err)) + err = ofTestUtils.DeleteOVSBridge(br) + assert.Nil(t, err, fmt.Sprintf("Error while deleting OVS bridge: %v", err)) + ofClient.CleanOFTableCache() + ofClient.ResetOFTable() + }() + + endpoints := []k8sproxy.Endpoint{ + k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{ + Endpoint: net.JoinHostPort("192.168.1.2", "8081"), + IsLocal: true, + }), + k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{ + Endpoint: net.JoinHostPort("10.20.1.11", "8081"), + IsLocal: false, + }), + } + + stickyMaxAgeSeconds := uint16(30) + + tcs := []struct { + svc svcConfig + gid uint32 + endpoints []k8sproxy.Endpoint + stickyAge uint16 + }{ + { + svc: svcConfig{ + protocol: ofconfig.ProtocolTCP, + ip: net.ParseIP("10.20.30.41"), + port: uint16(8000), + }, + gid: 2, + endpoints: endpoints, + stickyAge: stickyMaxAgeSeconds, + }, + } + + for _, tc := range tcs { + groupID := ofconfig.GroupIDType(tc.gid) + expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, true) installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge) for _, tableFlow := range expTableFlows { ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows) @@ -716,7 +786,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint } } -func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16) (tableFlows []expectTableFlows, groupBuckets []string) { +func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16, antreaPolicyEnabled bool) (tableFlows []expectTableFlows, groupBuckets []string) { nw_proto := 6 learnProtoField := "NXM_OF_TCP_DST[]" if svc.protocol == ofconfig.ProtocolUDP { @@ -732,10 +802,17 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [ serviceLearnReg = 3 } cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum) + + loadGourpID := "" + ctTable := "EgressRule" + if antreaPolicyEnabled { + loadGourpID = fmt.Sprintf("set_field:0x%x->reg7,", gid) + ctTable = "AntreaPolicyEgressRule" + } svcFlows := expectTableFlows{tableName: "ServiceLB", flows: []*ofTestUtils.ExpectFlow{ { MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port), - ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x%x->reg7,group:%d", serviceLearnReg<<16, gid, gid), + ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,%sgroup:%d", serviceLearnReg<<16, loadGourpID, gid), }, { MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port), @@ -754,7 +831,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [ unionVal := (0b010 << 16) + uint32(epPort) epDNATFlows.flows = append(epDNATFlows.flows, &ofTestUtils.ExpectFlow{ MatchStr: fmt.Sprintf("priority=200,%s,reg3=%s,reg4=0x%x/0x7ffff", string(svc.protocol), epIP, unionVal), - ActStr: fmt.Sprintf("ct(commit,table=EgressRule,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ep.IP(), epPort), + ActStr: fmt.Sprintf("ct(commit,table=%s,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ctTable, ep.IP(), epPort), }) if ep.GetIsLocal() {