Skip to content

Commit

Permalink
Enhance ACNP Service related feature
Browse files Browse the repository at this point in the history
1. Only load Service GroupID into reg when AntreaPolicy is enabled.

   Service GroupID is only used by AntreaPolicy "toServices"
   and "AppliedTo NodePort Serivces" features for now.

2. In IngressSecurityClassifierTable, only forward packet to
AntreaPolicyIngressRuleTable when AntreaPolicy is enabled and
proxyAll is enabled.

   This forward flow is only used by AntreaPolicy "AppliedTo NodePort
   Services" feature to avoid packets skip
   AntreaPolicyIngressRuleTable, where policy will be enforced, when
   the endpoint of this Service is not on current NodePort Node.

3. In ACNP appliedTo NodePort Service e2e test, change to add another
netNS to fake external network.

4. Change to use gwOFPort as inPort of reject response for some cases.

Signed-off-by: graysonwu <[email protected]>
  • Loading branch information
GraysonWu committed Oct 14, 2022
1 parent 1e31a78 commit 8f3a6c9
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 70 deletions.
21 changes: 12 additions & 9 deletions pkg/agent/controller/networkpolicy/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"encoding/binary"
"fmt"

"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/ofnet/ofctrl"

Expand Down Expand Up @@ -165,14 +164,8 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
srcMAC = sIface.MAC.String()
dstMAC = dIface.MAC.String()
}
tunPort := c.tunPort
if tunPort == 0 {
// openflow15.P_CONTROLLER is used with noEncap mode when tunnel interface is not found.
// It won't cause a loop with openflow15.P_CONTROLLER because it is used as the input port but not output port
// in the packet out message.
tunPort = uint32(openflow15.P_CONTROLLER)
}
inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, tunPort)

inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort)
mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType)

if proto == protocol.Type_TCP {
Expand Down Expand Up @@ -289,9 +282,19 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte
outPort = gwOFPort
case RejectNoAPServiceRemoteToLocal:
inPort = tunOFPort
if inPort == 0 {
// If tunnel interface is not found, which means we are in noEncap mode, then use
// gateway port as inPort.
inPort = gwOFPort
}
outPort = gwOFPort
case RejectServiceRemoteToExternal:
inPort = tunOFPort
if inPort == 0 {
// If tunnel interface is not found, which means we are in noEncap mode, then use
// gateway port as inPort.
inPort = gwOFPort
}
}
return inPort, outPort
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ func (c *client) generatePipelines() {
c.networkConfig,
c.serviceConfig,
c.bridge,
c.enableAntreaPolicy,
c.enableProxy,
c.proxyAll,
c.connectUplinkToBridge)
Expand Down
9 changes: 5 additions & 4 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2113,7 +2113,7 @@ func (f *featureNetworkPolicy) ingressClassifierFlows() []binding.Flow {
Action().GotoTable(IngressMetricTable.GetID()).
Done(),
}
if f.proxyAll {
if f.enableAntreaPolicy && f.proxyAll {
// This generates the flow to match the NodePort Service packets and forward them to AntreaPolicyIngressRuleTable.
// Policies applied on NodePort Service will be enforced in AntreaPolicyIngressRuleTable.
flows = append(flows, IngressSecurityClassifierTable.ofTable.BuildFlow(priorityNormal+1).
Expand Down Expand Up @@ -2344,9 +2344,10 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
MatchRegMark(EpToSelectRegMark).
Action().LoadRegMark(regMarksToLoad...)
}
return flowBuilder.
Action().LoadToRegField(ServiceGroupIDField, uint32(groupID)).
Action().Group(groupID).Done()
if f.enableAntreaPolicy {
flowBuilder = flowBuilder.Action().LoadToRegField(ServiceGroupIDField, uint32(groupID))
}
return flowBuilder.Action().Group(groupID).Done()
}

// endpointDNATFlow generates the flow which transforms the Service Cluster IP to the Endpoint IP according to the Endpoint
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/openflow/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type featureService struct {
networkConfig *config.NetworkConfig
gatewayPort uint32

enableAntreaPolicy bool
enableProxy bool
proxyAll bool
connectUplinkToBridge bool
Expand All @@ -63,6 +64,7 @@ func newFeatureService(
networkConfig *config.NetworkConfig,
serviceConfig *config.ServiceConfig,
bridge binding.Bridge,
enableAntreaPolicy,
enableProxy,
proxyAll,
connectUplinkToBridge bool) *featureService {
Expand Down Expand Up @@ -113,6 +115,7 @@ func newFeatureService(
gatewayMAC: nodeConfig.GatewayConfig.MAC,
gatewayPort: nodeConfig.GatewayConfig.OFPort,
networkConfig: networkConfig,
enableAntreaPolicy: enableAntreaPolicy,
enableProxy: enableProxy,
proxyAll: proxyAll,
connectUplinkToBridge: connectUplinkToBridge,
Expand Down
109 changes: 57 additions & 52 deletions test/e2e/antreapolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3687,80 +3687,85 @@ func testACNPICMPSupport(t *testing.T, data *TestData) {
func testACNPNodePortServiceSupport(t *testing.T, data *TestData) {
skipIfProxyAllDisabled(t, data)

// Create a client on Node 0, one NodePort Service whose Endpoint is on Node 0 and
// another NodePort Service whose Endpoint is on Node 1. Initiate traffic from this
// client to these two Services Node 1 NodePort to simulate the traffic from
// external client to NodePort.
clientName := "agnhost-client"
failOnError(data.createAgnhostPodOnNode(clientName, data.testNamespace, nodeName(0), true), t)
defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace)
ips, err := data.podWaitForIPs(defaultTimeout, clientName, data.testNamespace)
// Create a NodePort Service.
ipProtocol := v1.IPv4Protocol
var nodePort int32
nodePortSvc, err := data.createNginxNodePortService("test-nodeport-svc", false, false, &ipProtocol)
failOnError(err, t)

var cidr string
if clusterInfo.podV4NetworkCIDR != "" {
cidr = ips.ipv4.String()
} else {
cidr = ips.ipv6.String()
for _, port := range nodePortSvc.Spec.Ports {
if port.NodePort != 0 {
nodePort = port.NodePort
break
}
}
cidr += "/32"

svc1, cleanup1 := data.createAgnhostServiceAndBackendPods(t, "svc1", data.testNamespace, nodeName(0), v1.ServiceTypeNodePort)
defer cleanup1()

svc2, cleanup2 := data.createAgnhostServiceAndBackendPods(t, "svc2", data.testNamespace, nodeName(1), v1.ServiceTypeNodePort)
defer cleanup2()
backendPodName := "test-nodeport-backend-pod"
require.NoError(t, data.createNginxPodOnNode(backendPodName, data.testNamespace, nodeName(0), false))
if err := data.podWaitForRunning(defaultTimeout, backendPodName, data.testNamespace); err != nil {
t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", backendPodName)
}
defer deletePodWrapper(t, data, data.testNamespace, backendPodName)

// Create another netns to fake an external network on the host network Pod.
testNetns := "test-ns"
cmd := fmt.Sprintf(`ip netns add %[1]s && \
ip link add dev %[1]s-a type veth peer name %[1]s-b && \
ip link set dev %[1]s-a netns %[1]s && \
ip addr add %[3]s/%[4]d dev %[1]s-b && \
ip link set dev %[1]s-b up && \
ip netns exec %[1]s ip addr add %[2]s/%[4]d dev %[1]s-a && \
ip netns exec %[1]s ip link set dev %[1]s-a up && \
ip netns exec %[1]s ip route replace default via %[3]s && \
sleep 3600
`, testNetns, "1.1.1.1", "1.1.1.254", 24)
clientNames := []string{"client0", "client1"}
for idx, clientName := range clientNames {
if err := NewPodBuilder(clientName, data.testNamespace, agnhostImage).OnNode(nodeName(idx)).WithCommand([]string{"sh", "-c", cmd}).InHostNetwork().Privileged().Create(data); err != nil {
t.Fatalf("Failed to create client Pod: %v", err)
}
defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace)
err = data.podWaitForRunning(defaultTimeout, clientName, data.testNamespace)
failOnError(err, t)
}

cidr := "1.1.1.1/24"
builder := &ClusterNetworkPolicySpecBuilder{}
builder = builder.SetName("test-acnp-nodeport-svc").
SetPriority(1.0).
SetAppliedToGroup([]ACNPAppliedToSpec{
{
Service: &crdv1alpha1.NamespacedName{
Name: svc1.Name,
Namespace: svc1.Namespace,
},
},
{
Service: &crdv1alpha1.NamespacedName{
Name: svc2.Name,
Namespace: svc2.Namespace,
Name: nodePortSvc.Name,
Namespace: nodePortSvc.Namespace,
},
},
})
builder.AddIngress(ProtocolTCP, nil, nil, nil, nil, nil, nil, nil, &cidr, nil, nil,
nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil)

testcases := []podToAddrTestStep{
{
Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)),
nodeIP(1),
svc1.Spec.Ports[0].NodePort,
Rejected,
},
{
Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)),
nodeIP(1),
svc2.Spec.Ports[0].NodePort,
Rejected,
},
}

acnp, err := k8sUtils.CreateOrUpdateACNP(builder.Get())
failOnError(err, t)
failOnError(waitForResourceReady(t, timeout, acnp), t)
for _, tc := range testcases {
log.Tracef("Probing: %s -> %s:%d", cidr, tc.destAddr, tc.destPort)
connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP)
if err != nil {
t.Errorf("failure -- could not complete probe: %v", err)
for idx, clientName := range clientNames {
log.Tracef("Probing: 1.1.1.1 -> %s:%d", nodeIP(idx), nodePort)
// Connect to NodePort in the fake external network.
cmd = fmt.Sprintf("for i in $(seq 1 3); do ip netns exec %s /agnhost connect %s:%d --timeout=1s --protocol=tcp; done;", testNetns, nodeIP(idx), nodePort)
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientName, agnhostContainerName, []string{"sh", "-c", cmd})
connectivity := Connected
if err != nil || stderr != "" {
// log this error as trace since may be an expected failure
log.Tracef("1.1.1.1 -> %s:%d: error when running command: err - %v /// stdout - %s /// stderr - %s", nodeIP(idx), nodePort, err, stdout, stderr)
// If err != nil and stderr == "", then it means this probe failed because of
// the command instead of connectivity. For example, container name doesn't exist.
if stderr == "" {
connectivity = Error
}
connectivity = DecideProbeResult(stderr, 3)
}
if connectivity != tc.expectedConnectivity {
t.Errorf("failure -- wrong results for probe: Source %s --> Dest %s:%d connectivity: %v, expected: %v",
cidr, tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity)
if connectivity != Rejected {
t.Errorf("failure -- wrong results for probe: Source 1.1.1.1 --> Dest %s:%d connectivity: %v, expected: Rej", nodeIP(idx), nodePort, connectivity)
}
}
// cleanup test resources
failOnError(k8sUtils.DeleteACNP(builder.Name), t)
}

Expand Down
87 changes: 82 additions & 5 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ type svcConfig struct {
withSessionAffinity bool
}

func TestProxyServiceFlows(t *testing.T) {
func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) {
// Reset OVS metrics (Prometheus) and reinitialize them to test.
legacyregistry.Reset()
metrics.InitializeOVSMetrics()
Expand Down Expand Up @@ -679,7 +679,77 @@ func TestProxyServiceFlows(t *testing.T) {

for _, tc := range tcs {
groupID := ofconfig.GroupIDType(tc.gid)
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, false)
installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
for _, tableFlow := range expTableFlows {
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows)
}
ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, true)

uninstallServiceFlowsFunc(t, tc.gid, tc.svc, tc.endpoints)
for _, tableFlow := range expTableFlows {
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, false, tableFlow.flows)
}
ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, false)
}
}

func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) {
// Reset OVS metrics (Prometheus) and reinitialize them to test.
legacyregistry.Reset()
metrics.InitializeOVSMetrics()

c = ofClient.NewClient(br, bridgeMgmtAddr, true, true, false, false, false, false, false, false, false)
err := ofTestUtils.PrepareOVSBridge(br)
require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))

config := prepareConfiguration(true, false)
_, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{})
require.Nil(t, err, "Failed to initialize OFClient")

defer func() {
err = c.Disconnect()
assert.Nil(t, err, fmt.Sprintf("Error while disconnecting from OVS bridge: %v", err))
err = ofTestUtils.DeleteOVSBridge(br)
assert.Nil(t, err, fmt.Sprintf("Error while deleting OVS bridge: %v", err))
ofClient.CleanOFTableCache()
ofClient.ResetOFTable()
}()

endpoints := []k8sproxy.Endpoint{
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("192.168.1.2", "8081"),
IsLocal: true,
}),
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("10.20.1.11", "8081"),
IsLocal: false,
}),
}

stickyMaxAgeSeconds := uint16(30)

tcs := []struct {
svc svcConfig
gid uint32
endpoints []k8sproxy.Endpoint
stickyAge uint16
}{
{
svc: svcConfig{
protocol: ofconfig.ProtocolTCP,
ip: net.ParseIP("10.20.30.41"),
port: uint16(8000),
},
gid: 2,
endpoints: endpoints,
stickyAge: stickyMaxAgeSeconds,
},
}

for _, tc := range tcs {
groupID := ofconfig.GroupIDType(tc.gid)
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, true)
installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
for _, tableFlow := range expTableFlows {
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows)
Expand Down Expand Up @@ -716,7 +786,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint
}
}

func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16) (tableFlows []expectTableFlows, groupBuckets []string) {
func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16, antreaPolicyEnabled bool) (tableFlows []expectTableFlows, groupBuckets []string) {
nw_proto := 6
learnProtoField := "NXM_OF_TCP_DST[]"
if svc.protocol == ofconfig.ProtocolUDP {
Expand All @@ -732,10 +802,17 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
serviceLearnReg = 3
}
cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum)

loadGourpID := ""
ctTable := "EgressRule"
if antreaPolicyEnabled {
loadGourpID = fmt.Sprintf("set_field:0x%x->reg7,", gid)
ctTable = "AntreaPolicyEgressRule"
}
svcFlows := expectTableFlows{tableName: "ServiceLB", flows: []*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x%x->reg7,group:%d", serviceLearnReg<<16, gid, gid),
ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,%sgroup:%d", serviceLearnReg<<16, loadGourpID, gid),
},
{
MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
Expand All @@ -754,7 +831,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
unionVal := (0b010 << 16) + uint32(epPort)
epDNATFlows.flows = append(epDNATFlows.flows, &ofTestUtils.ExpectFlow{
MatchStr: fmt.Sprintf("priority=200,%s,reg3=%s,reg4=0x%x/0x7ffff", string(svc.protocol), epIP, unionVal),
ActStr: fmt.Sprintf("ct(commit,table=EgressRule,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ep.IP(), epPort),
ActStr: fmt.Sprintf("ct(commit,table=%s,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ctTable, ep.IP(), epPort),
})

if ep.GetIsLocal() {
Expand Down

0 comments on commit 8f3a6c9

Please sign in to comment.