Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #4261: Enhance ACNP Service related feature #4315

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions pkg/agent/controller/networkpolicy/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"encoding/binary"
"fmt"

"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/ofnet/ofctrl"

Expand Down Expand Up @@ -165,14 +164,8 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
srcMAC = sIface.MAC.String()
dstMAC = dIface.MAC.String()
}
tunPort := c.tunPort
if tunPort == 0 {
// openflow15.P_CONTROLLER is used with noEncap mode when tunnel interface is not found.
// It won't cause a loop with openflow15.P_CONTROLLER because it is used as the input port but not output port
// in the packet out message.
tunPort = uint32(openflow15.P_CONTROLLER)
}
inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, tunPort)

inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort)
mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType)

if proto == protocol.Type_TCP {
Expand Down Expand Up @@ -289,9 +282,19 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte
outPort = gwOFPort
case RejectNoAPServiceRemoteToLocal:
inPort = tunOFPort
if inPort == 0 {
// If tunnel interface is not found, which means we are in noEncap mode, then use
// gateway port as inPort.
inPort = gwOFPort
}
outPort = gwOFPort
case RejectServiceRemoteToExternal:
inPort = tunOFPort
if inPort == 0 {
// If tunnel interface is not found, which means we are in noEncap mode, then use
// gateway port as inPort.
inPort = gwOFPort
}
}
return inPort, outPort
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ func (c *client) generatePipelines() {
c.networkConfig,
c.serviceConfig,
c.bridge,
c.enableAntreaPolicy,
c.enableProxy,
c.proxyAll,
c.connectUplinkToBridge)
Expand Down
9 changes: 5 additions & 4 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2112,7 +2112,7 @@ func (f *featureNetworkPolicy) ingressClassifierFlows() []binding.Flow {
Action().GotoTable(IngressMetricTable.GetID()).
Done(),
}
if f.proxyAll {
if f.enableAntreaPolicy && f.proxyAll {
// This generates the flow to match the NodePort Service packets and forward them to AntreaPolicyIngressRuleTable.
// Policies applied on NodePort Service will be enforced in AntreaPolicyIngressRuleTable.
flows = append(flows, IngressSecurityClassifierTable.ofTable.BuildFlow(priorityNormal+1).
Expand Down Expand Up @@ -2345,9 +2345,10 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
MatchRegMark(EpToSelectRegMark).
Action().LoadRegMark(regMarksToLoad...)
}
return flowBuilder.
Action().LoadToRegField(ServiceGroupIDField, uint32(groupID)).
Action().Group(groupID).Done()
if f.enableAntreaPolicy {
flowBuilder = flowBuilder.Action().LoadToRegField(ServiceGroupIDField, uint32(groupID))
}
return flowBuilder.Action().Group(groupID).Done()
}

// endpointDNATFlow generates the flow which transforms the Service Cluster IP to the Endpoint IP according to the Endpoint
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/openflow/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type featureService struct {
networkConfig *config.NetworkConfig
gatewayPort uint32

enableAntreaPolicy bool
enableProxy bool
proxyAll bool
connectUplinkToBridge bool
Expand All @@ -63,6 +64,7 @@ func newFeatureService(
networkConfig *config.NetworkConfig,
serviceConfig *config.ServiceConfig,
bridge binding.Bridge,
enableAntreaPolicy,
enableProxy,
proxyAll,
connectUplinkToBridge bool) *featureService {
Expand Down Expand Up @@ -113,6 +115,7 @@ func newFeatureService(
gatewayMAC: nodeConfig.GatewayConfig.MAC,
gatewayPort: nodeConfig.GatewayConfig.OFPort,
networkConfig: networkConfig,
enableAntreaPolicy: enableAntreaPolicy,
enableProxy: enableProxy,
proxyAll: proxyAll,
connectUplinkToBridge: connectUplinkToBridge,
Expand Down
109 changes: 57 additions & 52 deletions test/e2e/antreapolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3679,80 +3679,85 @@ func testACNPICMPSupport(t *testing.T, data *TestData) {
func testACNPNodePortServiceSupport(t *testing.T, data *TestData) {
skipIfProxyAllDisabled(t, data)

// Create a client on Node 0, one NodePort Service whose Endpoint is on Node 0 and
// another NodePort Service whose Endpoint is on Node 1. Initiate traffic from this
// client to these two Services Node 1 NodePort to simulate the traffic from
// external client to NodePort.
clientName := "agnhost-client"
failOnError(data.createAgnhostPodOnNode(clientName, data.testNamespace, nodeName(0), true), t)
defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace)
ips, err := data.podWaitForIPs(defaultTimeout, clientName, data.testNamespace)
// Create a NodePort Service.
ipProtocol := v1.IPv4Protocol
var nodePort int32
nodePortSvc, err := data.createNginxNodePortService("test-nodeport-svc", false, false, &ipProtocol)
failOnError(err, t)

var cidr string
if clusterInfo.podV4NetworkCIDR != "" {
cidr = ips.ipv4.String()
} else {
cidr = ips.ipv6.String()
for _, port := range nodePortSvc.Spec.Ports {
if port.NodePort != 0 {
nodePort = port.NodePort
break
}
}
cidr += "/32"

svc1, cleanup1 := data.createAgnhostServiceAndBackendPods(t, "svc1", data.testNamespace, nodeName(0), v1.ServiceTypeNodePort)
defer cleanup1()

svc2, cleanup2 := data.createAgnhostServiceAndBackendPods(t, "svc2", data.testNamespace, nodeName(1), v1.ServiceTypeNodePort)
defer cleanup2()
backendPodName := "test-nodeport-backend-pod"
require.NoError(t, data.createNginxPodOnNode(backendPodName, data.testNamespace, nodeName(0), false))
if err := data.podWaitForRunning(defaultTimeout, backendPodName, data.testNamespace); err != nil {
t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", backendPodName)
}
defer deletePodWrapper(t, data, data.testNamespace, backendPodName)

// Create another netns to fake an external network on the host network Pod.
testNetns := "test-ns"
cmd := fmt.Sprintf(`ip netns add %[1]s && \
ip link add dev %[1]s-a type veth peer name %[1]s-b && \
ip link set dev %[1]s-a netns %[1]s && \
ip addr add %[3]s/%[4]d dev %[1]s-b && \
ip link set dev %[1]s-b up && \
ip netns exec %[1]s ip addr add %[2]s/%[4]d dev %[1]s-a && \
ip netns exec %[1]s ip link set dev %[1]s-a up && \
ip netns exec %[1]s ip route replace default via %[3]s && \
sleep 3600
`, testNetns, "1.1.1.1", "1.1.1.254", 24)
clientNames := []string{"client0", "client1"}
for idx, clientName := range clientNames {
if err := NewPodBuilder(clientName, data.testNamespace, agnhostImage).OnNode(nodeName(idx)).WithCommand([]string{"sh", "-c", cmd}).InHostNetwork().Privileged().Create(data); err != nil {
t.Fatalf("Failed to create client Pod: %v", err)
}
defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace)
err = data.podWaitForRunning(defaultTimeout, clientName, data.testNamespace)
failOnError(err, t)
}

cidr := "1.1.1.1/24"
builder := &ClusterNetworkPolicySpecBuilder{}
builder = builder.SetName("test-acnp-nodeport-svc").
SetPriority(1.0).
SetAppliedToGroup([]ACNPAppliedToSpec{
{
Service: &crdv1alpha1.NamespacedName{
Name: svc1.Name,
Namespace: svc1.Namespace,
},
},
{
Service: &crdv1alpha1.NamespacedName{
Name: svc2.Name,
Namespace: svc2.Namespace,
Name: nodePortSvc.Name,
Namespace: nodePortSvc.Namespace,
},
},
})
builder.AddIngress(ProtocolTCP, nil, nil, nil, nil, nil, nil, nil, &cidr, nil, nil,
nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil)

testcases := []podToAddrTestStep{
{
Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)),
nodeIP(1),
svc1.Spec.Ports[0].NodePort,
Rejected,
},
{
Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)),
nodeIP(1),
svc2.Spec.Ports[0].NodePort,
Rejected,
},
}

acnp, err := k8sUtils.CreateOrUpdateACNP(builder.Get())
failOnError(err, t)
failOnError(waitForResourceReady(t, timeout, acnp), t)
for _, tc := range testcases {
log.Tracef("Probing: %s -> %s:%d", cidr, tc.destAddr, tc.destPort)
connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP)
if err != nil {
t.Errorf("failure -- could not complete probe: %v", err)
for idx, clientName := range clientNames {
log.Tracef("Probing: 1.1.1.1 -> %s:%d", nodeIP(idx), nodePort)
// Connect to NodePort in the fake external network.
cmd = fmt.Sprintf("for i in $(seq 1 3); do ip netns exec %s /agnhost connect %s:%d --timeout=1s --protocol=tcp; done;", testNetns, nodeIP(idx), nodePort)
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientName, agnhostContainerName, []string{"sh", "-c", cmd})
connectivity := Connected
if err != nil || stderr != "" {
// log this error as trace since may be an expected failure
log.Tracef("1.1.1.1 -> %s:%d: error when running command: err - %v /// stdout - %s /// stderr - %s", nodeIP(idx), nodePort, err, stdout, stderr)
// If err != nil and stderr == "", then it means this probe failed because of
// the command instead of connectivity. For example, container name doesn't exist.
if stderr == "" {
connectivity = Error
}
connectivity = DecideProbeResult(stderr, 3)
}
if connectivity != tc.expectedConnectivity {
t.Errorf("failure -- wrong results for probe: Source %s --> Dest %s:%d connectivity: %v, expected: %v",
cidr, tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity)
if connectivity != Rejected {
t.Errorf("failure -- wrong results for probe: Source 1.1.1.1 --> Dest %s:%d connectivity: %v, expected: Rej", nodeIP(idx), nodePort, connectivity)
}
}
// cleanup test resources
failOnError(k8sUtils.DeleteACNP(builder.Name), t)
}

Expand Down
87 changes: 82 additions & 5 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ type svcConfig struct {
withSessionAffinity bool
}

func TestProxyServiceFlows(t *testing.T) {
func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) {
// Reset OVS metrics (Prometheus) and reinitialize them to test.
legacyregistry.Reset()
metrics.InitializeOVSMetrics()
Expand Down Expand Up @@ -679,7 +679,77 @@ func TestProxyServiceFlows(t *testing.T) {

for _, tc := range tcs {
groupID := ofconfig.GroupIDType(tc.gid)
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, false)
installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
for _, tableFlow := range expTableFlows {
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows)
}
ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, true)

uninstallServiceFlowsFunc(t, tc.gid, tc.svc, tc.endpoints)
for _, tableFlow := range expTableFlows {
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, false, tableFlow.flows)
}
ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, false)
}
}

func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) {
// Reset OVS metrics (Prometheus) and reinitialize them to test.
legacyregistry.Reset()
metrics.InitializeOVSMetrics()

c = ofClient.NewClient(br, bridgeMgmtAddr, true, true, false, false, false, false, false, false, false)
err := ofTestUtils.PrepareOVSBridge(br)
require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))

config := prepareConfiguration(true, false)
_, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{})
require.Nil(t, err, "Failed to initialize OFClient")

defer func() {
err = c.Disconnect()
assert.Nil(t, err, fmt.Sprintf("Error while disconnecting from OVS bridge: %v", err))
err = ofTestUtils.DeleteOVSBridge(br)
assert.Nil(t, err, fmt.Sprintf("Error while deleting OVS bridge: %v", err))
ofClient.CleanOFTableCache()
ofClient.ResetOFTable()
}()

endpoints := []k8sproxy.Endpoint{
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("192.168.1.2", "8081"),
IsLocal: true,
}),
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("10.20.1.11", "8081"),
IsLocal: false,
}),
}

stickyMaxAgeSeconds := uint16(30)

tcs := []struct {
svc svcConfig
gid uint32
endpoints []k8sproxy.Endpoint
stickyAge uint16
}{
{
svc: svcConfig{
protocol: ofconfig.ProtocolTCP,
ip: net.ParseIP("10.20.30.41"),
port: uint16(8000),
},
gid: 2,
endpoints: endpoints,
stickyAge: stickyMaxAgeSeconds,
},
}

for _, tc := range tcs {
groupID := ofconfig.GroupIDType(tc.gid)
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, true)
installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
for _, tableFlow := range expTableFlows {
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows)
Expand Down Expand Up @@ -716,7 +786,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint
}
}

func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16) (tableFlows []expectTableFlows, groupBuckets []string) {
func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16, antreaPolicyEnabled bool) (tableFlows []expectTableFlows, groupBuckets []string) {
nw_proto := 6
learnProtoField := "NXM_OF_TCP_DST[]"
if svc.protocol == ofconfig.ProtocolUDP {
Expand All @@ -732,10 +802,17 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
serviceLearnReg = 3
}
cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum)

loadGourpID := ""
ctTable := "EgressRule"
if antreaPolicyEnabled {
loadGourpID = fmt.Sprintf("set_field:0x%x->reg7,", gid)
ctTable = "AntreaPolicyEgressRule"
}
svcFlows := expectTableFlows{tableName: "ServiceLB", flows: []*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x%x->reg7,group:%d", serviceLearnReg<<16, gid, gid),
ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,%sgroup:%d", serviceLearnReg<<16, loadGourpID, gid),
},
{
MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
Expand All @@ -754,7 +831,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
unionVal := (0b010 << 16) + uint32(epPort)
epDNATFlows.flows = append(epDNATFlows.flows, &ofTestUtils.ExpectFlow{
MatchStr: fmt.Sprintf("priority=200,%s,reg3=%s,reg4=0x%x/0x7ffff", string(svc.protocol), epIP, unionVal),
ActStr: fmt.Sprintf("ct(commit,table=EgressRule,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ep.IP(), epPort),
ActStr: fmt.Sprintf("ct(commit,table=%s,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ctTable, ep.IP(), epPort),
})

if ep.GetIsLocal() {
Expand Down