Skip to content

Commit

Permalink
Fix issue of the maximum supporting Endpoints in AntreaProxy
Browse files Browse the repository at this point in the history
Fix #2092

Due to the message size and the implementation of Service in AtreaProxy,
the maximum Endpoint that AntreaProxy can support now is 800. If the
Endpoints of Service exceed 800, the exceeding Endpoints will be ignored.

In AntreaProxy, OVS group is the key part of Service implementation. For
now, Antrea is using Openflow 1.3 to communicate with OVS. In previous
design, every bucket of a OVS group has five actions. Two actions for loading
Endpoint IP and port to registers and resubmit action must be reserved.The
other two actions for loading values to register can be moved to flows (in
current patch, they are moved to table 41), and then one message can hold
more bucket items. As a result, the maximum Endpoint has changed from 511
to 800. Unfortunately, to ensure AntreaProxy running correctly, the exceeding
Endpoints will be ignored.
  • Loading branch information
hongliangl committed Apr 21, 2021
1 parent c67106c commit 54597d9
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 13 deletions.
4 changes: 3 additions & 1 deletion build/yamls/base/conf/antrea-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
featureGates:
# Enable AntreaProxy which provides ServiceLB for in-cluster Services in antrea-agent.
# It should be enabled on Windows, otherwise NetworkPolicy will not take effect on
# Service traffic.
# Service traffic. Note that, due to the implementation of Service in AntreaProxy, the maximum
# Endpoint that AntreaProxy can support now is 800. If the Endpoints of Service exceed 800, the
# exceeding Endpoints will be ignored.
# AntreaProxy: true

# Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice
Expand Down
14 changes: 12 additions & 2 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ import (
"github.com/vmware-tanzu/antrea/third_party/proxy"
)

const maxRetryForOFSwitch = 5
const (
maxRetryForOFSwitch = 5
// Due to the message size of openflow 1.3 and implementation of Service in Antrea, the maximum Endpoint that Antrea
// can support now is 800. If the Endpoints of Service exceed 800, the exceeding Endpoints will be ignored.
maxEndpoints = 800
)

// Client is the interface to program OVS flows for entity connectivity of Antrea.
type Client interface {
Expand Down Expand Up @@ -434,6 +439,11 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

if len(endpoints) > maxEndpoints {
endpoints = endpoints[:maxEndpoints]
}

group := c.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group: %w", err)
Expand Down Expand Up @@ -497,7 +507,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP,
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol))
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0))
if affinityTimeout != 0 {
flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout))
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,12 +1903,21 @@ func (c *client) serviceLearnFlow(groupID binding.GroupIDType, svcIP net.IP, svc

// serviceLBFlow generates the flow which uses the specific group to do Endpoint
// selection.
func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol) binding.Flow {
func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, withSessionAffinity bool) binding.Flow {
var lbResultMark uint32
if withSessionAffinity {
lbResultMark = marksRegServiceNeedLearn
} else {
lbResultMark = marksRegServiceSelected
}

return c.pipeline[serviceLBTable].BuildFlow(priorityNormal).
MatchProtocol(protocol).
MatchDstPort(svcPort, nil).
MatchDstIP(svcIP).
MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLB, serviceLearnRegRange).
Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
Action().Group(groupID).
Cookie(c.cookieAllocator.Request(cookie.Service).Raw()).
Done()
Expand Down Expand Up @@ -1974,13 +1983,10 @@ func (c *client) hairpinSNATFlow(endpointIP net.IP) binding.Flow {
func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
group := c.bridge.CreateGroup(groupID).ResetBuckets()
var resubmitTableID binding.TableIDType
var lbResultMark uint32
if withSessionAffinity {
resubmitTableID = serviceLBTable
lbResultMark = marksRegServiceNeedLearn
} else {
resubmitTableID = endpointDNATTable
lbResultMark = marksRegServiceSelected
}

for _, endpoint := range endpoints {
Expand All @@ -1993,17 +1999,13 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf
group = group.Bucket().Weight(100).
LoadReg(int(endpointIPReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(100).
LoadXXReg(int(endpointIPv6XXReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
Done()
}
Expand Down
9 changes: 7 additions & 2 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,16 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
nw_proto = 132
learnProtoField = "OXM_OF_SCTP_DST[]"
}

serviceLearnReg := 2
if stickyAge != 0 {
serviceLearnReg = 3
}
cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum)
svcFlows := expectTableFlows{tableID: 41, flows: []*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
ActStr: fmt.Sprintf("group:%d", gid),
ActStr: fmt.Sprintf("load:0x%x->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],group:%d", serviceLearnReg, gid),
},
{
MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
Expand All @@ -617,7 +622,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
for _, ep := range endpointList {
epIP := ipToHexString(net.ParseIP(ep.IP()))
epPort, _ := ep.Port()
bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],resubmit(,42)", epIP, epPort)
bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],resubmit(,42)", epIP, epPort)
groupBuckets = append(groupBuckets, bucket)

unionVal := (0b010 << 16) + uint32(epPort)
Expand Down

0 comments on commit 54597d9

Please sign in to comment.