Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extends the Endpoints support from 500 to 800, extra ones will be dropped in AntreaProxy #2101

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/feature-gates.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ example, to enable `AntreaProxy` on Linux, edit the Agent configuration in the
`AntreaProxy` implements Service load-balancing for ClusterIP Services as part
of the OVS pipeline, as opposed to relying on kube-proxy. This only applies to
traffic originating from Pods, and destined to ClusterIP Services. In
particular, it does not apply to NodePort Services.
particular, it does not apply to NodePort Services. Please note that due to
some restrictions on the implementation of Services in Antrea, the maximum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:
Please note that in the current AntreaProxy implementation there is a restriction that the maximum number...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you feel strongly about it, I suggest merging as it is. We have been postponing v1.0.1 for a while now, and this is pretty much the last change we are waiting for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works for me.

number of Endpoints that Antrea can support at the moment is 800. If the
number of Endpoints for a given Service exceeds 800, extra Endpoints will
be dropped.

Note that this feature must be enabled for Windows. The Antrea Windows YAML
manifest provided as part of releases enables this feature by default. If you
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

group := c.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group: %w", err)
Expand Down Expand Up @@ -497,7 +498,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP,
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol))
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0))
if affinityTimeout != 0 {
flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout))
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,12 +1903,21 @@ func (c *client) serviceLearnFlow(groupID binding.GroupIDType, svcIP net.IP, svc

// serviceLBFlow generates the flow which uses the specific group to do Endpoint
// selection.
func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol) binding.Flow {
func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, withSessionAffinity bool) binding.Flow {
var lbResultMark uint32
if withSessionAffinity {
lbResultMark = marksRegServiceNeedLearn
} else {
lbResultMark = marksRegServiceSelected
}

return c.pipeline[serviceLBTable].BuildFlow(priorityNormal).
MatchProtocol(protocol).
MatchDstPort(svcPort, nil).
MatchDstIP(svcIP).
MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLB, serviceLearnRegRange).
Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
Action().Group(groupID).
Cookie(c.cookieAllocator.Request(cookie.Service).Raw()).
Done()
Expand Down Expand Up @@ -1974,13 +1983,10 @@ func (c *client) hairpinSNATFlow(endpointIP net.IP) binding.Flow {
func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
group := c.bridge.CreateGroup(groupID).ResetBuckets()
hongliangl marked this conversation as resolved.
Show resolved Hide resolved
var resubmitTableID binding.TableIDType
var lbResultMark uint32
if withSessionAffinity {
resubmitTableID = serviceLBTable
lbResultMark = marksRegServiceNeedLearn
} else {
resubmitTableID = endpointDNATTable
lbResultMark = marksRegServiceSelected
}

for _, endpoint := range endpoints {
Expand All @@ -1993,17 +1999,13 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf
group = group.Bucket().Weight(100).
LoadReg(int(endpointIPReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(100).
LoadXXReg(int(endpointIPv6XXReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
Done()
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/agent/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (t *endpointsChangesTracker) OnEndpointUpdate(previous, current *corev1.End
return len(t.changes) > 0
}

// EndpointSliceUpdate updates the given service's endpoints change map based on the <previous, current> endpoints pair.
// OnEndpointSliceUpdate updates the given service's endpoints change map based on the <previous, current> endpoints pair.
// It returns true if items changed, otherwise it returns false. Will add/update/delete items of endpointsChange Map.
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
func (t *endpointsChangesTracker) OnEndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
Expand Down Expand Up @@ -220,3 +220,16 @@ func (t *endpointsChangesTracker) Update(em types.EndpointsMap) {
}
}
}

// byEndpoint helps sort Endpoint
type byEndpoint []k8sproxy.Endpoint

func (p byEndpoint) Len() int {
return len(p)
}
func (p byEndpoint) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
func (p byEndpoint) Less(i, j int) bool {
return p[i].String() < p[j].String()
}
48 changes: 44 additions & 4 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package proxy
import (
"fmt"
"net"
"sort"
"strings"
"sync"
"time"
Expand All @@ -25,6 +26,7 @@ import (
"k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
k8sapitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
Expand All @@ -42,6 +44,10 @@ import (
const (
resyncPeriod = time.Minute
componentName = "antrea-agent-proxy"
// Due to the maximum message size in Openflow 1.3 and the implementation of Services in Antrea, the maximum number
// of Endpoints that Antrea can support at the moment is 800. If the number of Endpoints for a given Service exceeds
// 800, extra Endpoints will be dropped.
maxEndpoints = 800
)

// Proxier wraps proxy.Provider and adds extra methods. It is introduced for
Expand Down Expand Up @@ -90,6 +96,8 @@ type proxier struct {
serviceStringMap map[string]k8sproxy.ServicePortName
// serviceStringMapMutex protects serviceStringMap object.
serviceStringMapMutex sync.Mutex
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

runner *k8sproxy.BoundedFrequencyRunner
stopChan <-chan struct{}
Expand Down Expand Up @@ -117,6 +125,9 @@ func (p *proxier) removeStaleServices() {
}
svcInfo := svcPort.(*types.ServiceInfo)
klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String())
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err)
continue
Expand Down Expand Up @@ -258,12 +269,40 @@ func (p *proxier) installServices() {
}

var endpointUpdateList []k8sproxy.Endpoint
for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
if len(endpoints) > maxEndpoints {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally I would have liked to see a unit test for this (with maxEndpoints set to small value). However, this is strictly better than what we had before, and there is little risk in merging this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I prefer to add a unit test. However, the maxEndpoints is a constant variable, IMO, I didn't come up with any good idea to test this. BTW, thanks for merging this.

if !p.oversizeServiceSet.Has(svcPortName.String()) {
klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints)
p.oversizeServiceSet.Insert(svcPortName.String())
}
// If the length of endpoints > maxEndpoints, endpoints should be cut. However, endpoints is a map. Therefore,
// iterate the map and append every Endpoint to a slice endpointList. Since the iteration order of map in
// Golang is random, if cut directly without any sorting, some Endpoints may not be installed. So cutting
// slice endpointList after sorting can avoid this situation in some degree.
var endpointList []k8sproxy.Endpoint
for _, endpoint := range endpoints {
endpointList = append(endpointList, endpoint)
}
sort.Sort(byEndpoint(endpointList))
endpointList = endpointList[:maxEndpoints]

for _, endpoint := range endpointList { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
} else {
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}

if len(endpoints) < len(endpointsInstalled) { // There are Endpoints which expired.
klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String())
needUpdateEndpoints = true
Expand Down Expand Up @@ -605,6 +644,7 @@ func NewProxier(
endpointsMap: types.EndpointsMap{},
endpointReferenceCounter: map[string]int{},
serviceStringMap: map[string]k8sproxy.ServicePortName{},
oversizeServiceSet: sets.NewString(),
groupCounter: types.NewGroupCounter(),
ofClient: ofClient,
isIPv6: isIPv6,
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ func testProxyServiceLifeCycle(ipFamily *corev1.IPFamily, ingressIPs []string, d

var groupKeyword string
if *ipFamily == corev1.IPv6Protocol {
groupKeyword = fmt.Sprintf("set_field:0x%s->xxreg3,load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv6.To16()), "0"), 80)
groupKeyword = fmt.Sprintf("set_field:0x%s->xxreg3,load:0x%x->NXM_NX_REG4[0..15]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv6.To16()), "0"), 80)
} else {
groupKeyword = fmt.Sprintf("load:0x%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"), 80)
groupKeyword = fmt.Sprintf("load:0x%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"), 80)
}
groupOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-groups", defaultBridgeName})
require.NoError(t, err)
Expand Down
9 changes: 7 additions & 2 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,16 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
nw_proto = 132
learnProtoField = "OXM_OF_SCTP_DST[]"
}

serviceLearnReg := 2
if stickyAge != 0 {
serviceLearnReg = 3
}
cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum)
svcFlows := expectTableFlows{tableID: 41, flows: []*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
ActStr: fmt.Sprintf("group:%d", gid),
ActStr: fmt.Sprintf("load:0x%x->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],group:%d", serviceLearnReg, gid),
},
{
MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
Expand All @@ -617,7 +622,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
for _, ep := range endpointList {
epIP := ipToHexString(net.ParseIP(ep.IP()))
epPort, _ := ep.Port()
bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],resubmit(,42)", epIP, epPort)
bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],resubmit(,42)", epIP, epPort)
groupBuckets = append(groupBuckets, bucket)

unionVal := (0b010 << 16) + uint32(epPort)
Expand Down