diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index 91e7e80800e..0544c9ec056 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -18,7 +18,6 @@ import ( "encoding/binary" "fmt" - "antrea.io/libOpenflow/openflow15" "antrea.io/libOpenflow/protocol" "antrea.io/ofnet/ofctrl" @@ -165,14 +164,8 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { srcMAC = sIface.MAC.String() dstMAC = dIface.MAC.String() } - tunPort := c.tunPort - if tunPort == 0 { - // openflow15.P_CONTROLLER is used with noEncap mode when tunnel interface is not found. - // It won't cause a loop with openflow15.P_CONTROLLER because it is used as the input port but not output port - // in the packet out message. - tunPort = uint32(openflow15.P_CONTROLLER) - } - inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, tunPort) + + inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort) mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType) if proto == protocol.Type_TCP { @@ -289,9 +282,19 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte outPort = gwOFPort case RejectNoAPServiceRemoteToLocal: inPort = tunOFPort + if inPort == 0 { + // If tunnel interface is not found, which means we are in noEncap mode, then use + // gateway port as inPort. + inPort = gwOFPort + } outPort = gwOFPort case RejectServiceRemoteToExternal: inPort = tunOFPort + if inPort == 0 { + // If tunnel interface is not found, which means we are in noEncap mode, then use + // gateway port as inPort. + inPort = gwOFPort + } } return inPort, outPort } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 034f7471bbb..0fcb7650386 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -815,6 +815,7 @@ func (c *client) generatePipelines() { c.networkConfig, c.serviceConfig, c.bridge, + c.enableAntreaPolicy, c.enableProxy, c.proxyAll, c.connectUplinkToBridge) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 5fa6317d81f..de552ffd654 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2113,7 +2113,7 @@ func (f *featureNetworkPolicy) ingressClassifierFlows() []binding.Flow { Action().GotoTable(IngressMetricTable.GetID()). Done(), } - if f.proxyAll { + if f.enableAntreaPolicy && f.proxyAll { // This generates the flow to match the NodePort Service packets and forward them to AntreaPolicyIngressRuleTable. // Policies applied on NodePort Service will be enforced in AntreaPolicyIngressRuleTable. flows = append(flows, IngressSecurityClassifierTable.ofTable.BuildFlow(priorityNormal+1). @@ -2344,9 +2344,10 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType, MatchRegMark(EpToSelectRegMark). Action().LoadRegMark(regMarksToLoad...) } - return flowBuilder. - Action().LoadToRegField(ServiceGroupIDField, uint32(groupID)). - Action().Group(groupID).Done() + if f.enableAntreaPolicy { + flowBuilder = flowBuilder.Action().LoadToRegField(ServiceGroupIDField, uint32(groupID)) + } + return flowBuilder.Action().Group(groupID).Done() } // endpointDNATFlow generates the flow which transforms the Service Cluster IP to the Endpoint IP according to the Endpoint diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 10976086f28..5f7e4048972 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -44,6 +44,7 @@ type featureService struct { networkConfig *config.NetworkConfig gatewayPort uint32 + enableAntreaPolicy bool enableProxy bool proxyAll bool connectUplinkToBridge bool @@ -63,6 +64,7 @@ func newFeatureService( networkConfig *config.NetworkConfig, serviceConfig *config.ServiceConfig, bridge binding.Bridge, + enableAntreaPolicy, enableProxy, proxyAll, connectUplinkToBridge bool) *featureService { @@ -113,6 +115,7 @@ func newFeatureService( gatewayMAC: nodeConfig.GatewayConfig.MAC, gatewayPort: nodeConfig.GatewayConfig.OFPort, networkConfig: networkConfig, + enableAntreaPolicy: enableAntreaPolicy, enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index 60979921d40..6a5e012e3ac 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -3687,80 +3687,85 @@ func testACNPICMPSupport(t *testing.T, data *TestData) { func testACNPNodePortServiceSupport(t *testing.T, data *TestData) { skipIfProxyAllDisabled(t, data) - // Create a client on Node 0, one NodePort Service whose Endpoint is on Node 0 and - // another NodePort Service whose Endpoint is on Node 1. Initiate traffic from this - // client to these two Services Node 1 NodePort to simulate the traffic from - // external client to NodePort. - clientName := "agnhost-client" - failOnError(data.createAgnhostPodOnNode(clientName, data.testNamespace, nodeName(0), true), t) - defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace) - ips, err := data.podWaitForIPs(defaultTimeout, clientName, data.testNamespace) + // Create a NodePort Service. + ipProtocol := v1.IPv4Protocol + var nodePort int32 + nodePortSvc, err := data.createNginxNodePortService("test-nodeport-svc", false, false, &ipProtocol) failOnError(err, t) - - var cidr string - if clusterInfo.podV4NetworkCIDR != "" { - cidr = ips.ipv4.String() - } else { - cidr = ips.ipv6.String() + for _, port := range nodePortSvc.Spec.Ports { + if port.NodePort != 0 { + nodePort = port.NodePort + break + } } - cidr += "/32" - - svc1, cleanup1 := data.createAgnhostServiceAndBackendPods(t, "svc1", data.testNamespace, nodeName(0), v1.ServiceTypeNodePort) - defer cleanup1() - svc2, cleanup2 := data.createAgnhostServiceAndBackendPods(t, "svc2", data.testNamespace, nodeName(1), v1.ServiceTypeNodePort) - defer cleanup2() + backendPodName := "test-nodeport-backend-pod" + require.NoError(t, data.createNginxPodOnNode(backendPodName, data.testNamespace, nodeName(0), false)) + if err := data.podWaitForRunning(defaultTimeout, backendPodName, data.testNamespace); err != nil { + t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", backendPodName) + } + defer deletePodWrapper(t, data, data.testNamespace, backendPodName) + + // Create another netns to fake an external network on the host network Pod. + testNetns := "test-ns" + cmd := fmt.Sprintf(`ip netns add %[1]s && \ +ip link add dev %[1]s-a type veth peer name %[1]s-b && \ +ip link set dev %[1]s-a netns %[1]s && \ +ip addr add %[3]s/%[4]d dev %[1]s-b && \ +ip link set dev %[1]s-b up && \ +ip netns exec %[1]s ip addr add %[2]s/%[4]d dev %[1]s-a && \ +ip netns exec %[1]s ip link set dev %[1]s-a up && \ +ip netns exec %[1]s ip route replace default via %[3]s && \ +sleep 3600 +`, testNetns, "1.1.1.1", "1.1.1.254", 24) + clientNames := []string{"client0", "client1"} + for idx, clientName := range clientNames { + if err := NewPodBuilder(clientName, data.testNamespace, agnhostImage).OnNode(nodeName(idx)).WithCommand([]string{"sh", "-c", cmd}).InHostNetwork().Privileged().Create(data); err != nil { + t.Fatalf("Failed to create client Pod: %v", err) + } + defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace) + err = data.podWaitForRunning(defaultTimeout, clientName, data.testNamespace) + failOnError(err, t) + } + cidr := "1.1.1.1/24" builder := &ClusterNetworkPolicySpecBuilder{} builder = builder.SetName("test-acnp-nodeport-svc"). SetPriority(1.0). SetAppliedToGroup([]ACNPAppliedToSpec{ { Service: &crdv1alpha1.NamespacedName{ - Name: svc1.Name, - Namespace: svc1.Namespace, - }, - }, - { - Service: &crdv1alpha1.NamespacedName{ - Name: svc2.Name, - Namespace: svc2.Namespace, + Name: nodePortSvc.Name, + Namespace: nodePortSvc.Namespace, }, }, }) builder.AddIngress(ProtocolTCP, nil, nil, nil, nil, nil, nil, nil, &cidr, nil, nil, nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil) - testcases := []podToAddrTestStep{ - { - Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)), - nodeIP(1), - svc1.Spec.Ports[0].NodePort, - Rejected, - }, - { - Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)), - nodeIP(1), - svc2.Spec.Ports[0].NodePort, - Rejected, - }, - } - acnp, err := k8sUtils.CreateOrUpdateACNP(builder.Get()) failOnError(err, t) failOnError(waitForResourceReady(t, timeout, acnp), t) - for _, tc := range testcases { - log.Tracef("Probing: %s -> %s:%d", cidr, tc.destAddr, tc.destPort) - connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP) - if err != nil { - t.Errorf("failure -- could not complete probe: %v", err) + for idx, clientName := range clientNames { + log.Tracef("Probing: 1.1.1.1 -> %s:%d", nodeIP(idx), nodePort) + // Connect to NodePort in the fake external network. + cmd = fmt.Sprintf("for i in $(seq 1 3); do ip netns exec %s /agnhost connect %s:%d --timeout=1s --protocol=tcp; done;", testNetns, nodeIP(idx), nodePort) + stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientName, agnhostContainerName, []string{"sh", "-c", cmd}) + connectivity := Connected + if err != nil || stderr != "" { + // log this error as trace since may be an expected failure + log.Tracef("1.1.1.1 -> %s:%d: error when running command: err - %v /// stdout - %s /// stderr - %s", nodeIP(idx), nodePort, err, stdout, stderr) + // If err != nil and stderr == "", then it means this probe failed because of + // the command instead of connectivity. For example, container name doesn't exist. + if stderr == "" { + connectivity = Error + } + connectivity = DecideProbeResult(stderr, 3) } - if connectivity != tc.expectedConnectivity { - t.Errorf("failure -- wrong results for probe: Source %s --> Dest %s:%d connectivity: %v, expected: %v", - cidr, tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity) + if connectivity != Rejected { + t.Errorf("failure -- wrong results for probe: Source 1.1.1.1 --> Dest %s:%d connectivity: %v, expected: Rej", nodeIP(idx), nodePort, connectivity) } } - // cleanup test resources failOnError(k8sUtils.DeleteACNP(builder.Name), t) } diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 7a59344e3b2..0803c84837a 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -604,7 +604,7 @@ type svcConfig struct { withSessionAffinity bool } -func TestProxyServiceFlows(t *testing.T) { +func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) { // Reset OVS metrics (Prometheus) and reinitialize them to test. legacyregistry.Reset() metrics.InitializeOVSMetrics() @@ -679,7 +679,77 @@ func TestProxyServiceFlows(t *testing.T) { for _, tc := range tcs { groupID := ofconfig.GroupIDType(tc.gid) - expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge) + expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, false) + installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge) + for _, tableFlow := range expTableFlows { + ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows) + } + ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, true) + + uninstallServiceFlowsFunc(t, tc.gid, tc.svc, tc.endpoints) + for _, tableFlow := range expTableFlows { + ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, false, tableFlow.flows) + } + ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, false) + } +} + +func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) { + // Reset OVS metrics (Prometheus) and reinitialize them to test. + legacyregistry.Reset() + metrics.InitializeOVSMetrics() + + c = ofClient.NewClient(br, bridgeMgmtAddr, true, true, false, false, false, false, false, false, false) + err := ofTestUtils.PrepareOVSBridge(br) + require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) + + config := prepareConfiguration(true, false) + _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) + require.Nil(t, err, "Failed to initialize OFClient") + + defer func() { + err = c.Disconnect() + assert.Nil(t, err, fmt.Sprintf("Error while disconnecting from OVS bridge: %v", err)) + err = ofTestUtils.DeleteOVSBridge(br) + assert.Nil(t, err, fmt.Sprintf("Error while deleting OVS bridge: %v", err)) + ofClient.CleanOFTableCache() + ofClient.ResetOFTable() + }() + + endpoints := []k8sproxy.Endpoint{ + k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{ + Endpoint: net.JoinHostPort("192.168.1.2", "8081"), + IsLocal: true, + }), + k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{ + Endpoint: net.JoinHostPort("10.20.1.11", "8081"), + IsLocal: false, + }), + } + + stickyMaxAgeSeconds := uint16(30) + + tcs := []struct { + svc svcConfig + gid uint32 + endpoints []k8sproxy.Endpoint + stickyAge uint16 + }{ + { + svc: svcConfig{ + protocol: ofconfig.ProtocolTCP, + ip: net.ParseIP("10.20.30.41"), + port: uint16(8000), + }, + gid: 2, + endpoints: endpoints, + stickyAge: stickyMaxAgeSeconds, + }, + } + + for _, tc := range tcs { + groupID := ofconfig.GroupIDType(tc.gid) + expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, true) installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge) for _, tableFlow := range expTableFlows { ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows) @@ -716,7 +786,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint } } -func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16) (tableFlows []expectTableFlows, groupBuckets []string) { +func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16, antreaPolicyEnabled bool) (tableFlows []expectTableFlows, groupBuckets []string) { nw_proto := 6 learnProtoField := "NXM_OF_TCP_DST[]" if svc.protocol == ofconfig.ProtocolUDP { @@ -732,10 +802,17 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [ serviceLearnReg = 3 } cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum) + + loadGourpID := "" + ctTable := "EgressRule" + if antreaPolicyEnabled { + loadGourpID = fmt.Sprintf("set_field:0x%x->reg7,", gid) + ctTable = "AntreaPolicyEgressRule" + } svcFlows := expectTableFlows{tableName: "ServiceLB", flows: []*ofTestUtils.ExpectFlow{ { MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port), - ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x%x->reg7,group:%d", serviceLearnReg<<16, gid, gid), + ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,%sgroup:%d", serviceLearnReg<<16, loadGourpID, gid), }, { MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port), @@ -754,7 +831,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [ unionVal := (0b010 << 16) + uint32(epPort) epDNATFlows.flows = append(epDNATFlows.flows, &ofTestUtils.ExpectFlow{ MatchStr: fmt.Sprintf("priority=200,%s,reg3=%s,reg4=0x%x/0x7ffff", string(svc.protocol), epIP, unionVal), - ActStr: fmt.Sprintf("ct(commit,table=EgressRule,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ep.IP(), epPort), + ActStr: fmt.Sprintf("ct(commit,table=%s,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ctTable, ep.IP(), epPort), }) if ep.GetIsLocal() {