Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extends the Endpoints support from 500 to 800, extra ones will be dropped in AntreaProxy #2101

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/feature-gates.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ example, to enable `AntreaProxy` on Linux, edit the Agent configuration in the
`AntreaProxy` implements Service load-balancing for ClusterIP Services as part
of the OVS pipeline, as opposed to relying on kube-proxy. This only applies to
traffic originating from Pods, and destined to ClusterIP Services. In
particular, it does not apply to NodePort Services.
particular, it does not apply to NodePort Services. Please note that due to
some restrictions on the implementation of Services in Antrea, the maximum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:
Please note that in the current AntreaProxy implementation there is a restriction that the maximum number...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you feel strongly about it, I suggest merging as it is. We have been postponing v1.0.1 for a while now, and this is pretty much the last change we are waiting for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works for me.

number of Endpoints that Antrea can support at the moment is 800. If the
number of Endpoints for a given Service exceeds 800, extra Endpoints will
be dropped.

Note that this feature must be enabled for Windows. The Antrea Windows YAML
manifest provided as part of releases enables this feature by default. If you
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

group := c.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group: %w", err)
Expand Down Expand Up @@ -497,7 +498,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP,
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol))
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0))
if affinityTimeout != 0 {
flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout))
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,12 +1903,21 @@ func (c *client) serviceLearnFlow(groupID binding.GroupIDType, svcIP net.IP, svc

// serviceLBFlow generates the flow which uses the specific group to do Endpoint
// selection.
func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol) binding.Flow {
func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, withSessionAffinity bool) binding.Flow {
var lbResultMark uint32
if withSessionAffinity {
lbResultMark = marksRegServiceNeedLearn
} else {
lbResultMark = marksRegServiceSelected
}

return c.pipeline[serviceLBTable].BuildFlow(priorityNormal).
MatchProtocol(protocol).
MatchDstPort(svcPort, nil).
MatchDstIP(svcIP).
MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLB, serviceLearnRegRange).
Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
Action().Group(groupID).
Cookie(c.cookieAllocator.Request(cookie.Service).Raw()).
Done()
Expand Down Expand Up @@ -1974,13 +1983,10 @@ func (c *client) hairpinSNATFlow(endpointIP net.IP) binding.Flow {
func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
group := c.bridge.CreateGroup(groupID).ResetBuckets()
hongliangl marked this conversation as resolved.
Show resolved Hide resolved
var resubmitTableID binding.TableIDType
var lbResultMark uint32
if withSessionAffinity {
resubmitTableID = serviceLBTable
lbResultMark = marksRegServiceNeedLearn
} else {
resubmitTableID = endpointDNATTable
lbResultMark = marksRegServiceSelected
}

for _, endpoint := range endpoints {
Expand All @@ -1993,17 +1999,13 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf
group = group.Bucket().Weight(100).
LoadReg(int(endpointIPReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(100).
LoadXXReg(int(endpointIPv6XXReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
Done()
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,16 @@ func (t *endpointsChangesTracker) Update(em types.EndpointsMap) {
}
}
}

// byEndpoint helps sort Endpoint
type byEndpoint []k8sproxy.Endpoint

func (p byEndpoint) Len() int {
return len(p)
}
func (p byEndpoint) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
func (p byEndpoint) Less(i, j int) bool {
return p[i].String() < p[j].String()
}
40 changes: 40 additions & 0 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package proxy
import (
"fmt"
"net"
"sort"
"strings"
"sync"
"time"
Expand All @@ -25,6 +26,7 @@ import (
"k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
k8sapitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
Expand All @@ -42,6 +44,10 @@ import (
const (
resyncPeriod = time.Minute
componentName = "antrea-agent-proxy"
// Due to the maximum message size in Openflow 1.3 and the implementation of Services in Antrea, the maximum number
// of Endpoints that Antrea can support at the moment is 800. If the number of Endpoints for a given Service exceeds
// 800, extra Endpoints will be dropped.
maxEndpoints = 800
)

// Proxier wraps proxy.Provider and adds extra methods. It is introduced for
Expand Down Expand Up @@ -90,6 +96,8 @@ type proxier struct {
serviceStringMap map[string]k8sproxy.ServicePortName
// serviceStringMapMutex protects serviceStringMap object.
serviceStringMapMutex sync.Mutex
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

runner *k8sproxy.BoundedFrequencyRunner
stopChan <-chan struct{}
Expand Down Expand Up @@ -117,6 +125,9 @@ func (p *proxier) removeStaleServices() {
}
svcInfo := svcPort.(*types.ServiceInfo)
klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String())
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err)
continue
Expand Down Expand Up @@ -264,6 +275,34 @@ func (p *proxier) installServices() {
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}

// If the length of endpointUpdateList > maxEndpoints, endpointUpdateList should be cut. However, the iteration
// of map in Golang is random, so endpointUpdateList are not always in the same order. If endpointUpdateList is
// cut directly without any sorting, some Endpoints may not be installed on the cut endpointUpdateList(Last sync,
// these Endpoints may be cut off from endpointUpdateList, they are of course not installed.) So cutting
// endpointUpdateList after sorting can avoid this situation in some degree.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it hasn't addressed the concern: #2101 (comment). The extra endpoints won't be in "endpointsInstalled" and "needUpdateEndpoints" will always be true regardless of how many times it has been reconciled, unless I misunderstand the code. Could you test it with one oversized service and one normal size service and check whether updating the latter one will cause the first one resynced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to add some test code to verify this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean to add test code to verify it as it will have to check logs and really create a service with 800+ endpoints. Could you just change maxEndpoints to 10 and verify it manually if you haven't done it before? From the code, I think it has the issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll verify this manually.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed maxEndpoints to 4, then built and run. The detail test is below:

  • Applies a deployment of 6 pods and applies a Service selecting these 6 pods. E.g, this Endpoint IPs are (10.10.1.1-6)
  • Endpoints whose IPs are 10.10.1.1-4 are selected(According to sort function of Endpoints, 10.10.1.1 will be in the first place after sorting, and 10.10.1.6 will be in the last place after sorting.). Related warning log appears and Service name will be recorded.
  • When deleting the pod whose IP is 10.10.1.5-6, Service Group will not be updated as they are extra Endpoints and the Endpoints from new creating pods(assuming these two IPs are 10.10.1.7, 10.10.1.8) are also extra after sorting. Related warning log doesn't appears as Service name is recorded.
  • When deleting the pod whose IP is 10.10.1.1-4, Service Group will be updated. Related warning log doesn't appears as Service name is recorded.

if len(endpointUpdateList) > maxEndpoints {
if !p.oversizeServiceSet.Has(svcPortName.String()) {
klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints)
p.oversizeServiceSet.Insert(svcPortName.String())
}

needUpdateEndpoints = false
sort.Sort(byEndpoint(endpointUpdateList))
endpointUpdateList = endpointUpdateList[:maxEndpoints]
// Check if the cut endpointUpdateList are all installed.
for _, endpoint := range endpointUpdateList {
if _, ok := endpointsInstalled[endpoint.String()]; !ok {
needUpdateEndpoints = true
break
}
}
Copy link
Contributor Author

@hongliangl hongliangl Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnqn

The extra endpoints won't be in "endpointsInstalled" and "needUpdateEndpoints" will always be true regardless of how many times it has been reconciled.

Sorting endpointUpdateList is to arrange all Endpoints in order and then cut them. If the reserved
Endpoints are all installed, needUpdateEndpoints will not true.

More detailed, needUpdateEndpoints is recalculated with the cut endpointUpdateList and endpointsInstalled. needUpdateEndpoints will not true if the reserved Endpoints in the cut endpointUpdateList are all installed. This part causes extra overhead, but only when the service is oversize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think L274 will set needUpdateEndpoints to true.

Copy link
Contributor Author

@hongliangl hongliangl Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a Service is oversize, L274 will always set needUpdateEndpoints to true as extra Endpoints are not installed.
L290 resets needUpdateEndpoints to false as endpointUpdateList is cut and it's unknown that if all reserved Endpoints in endpointUpdateList are installed, so L294~L299 checks that if there are any Endpoints in endpointUpdateList not install.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed L290. However, it still seems wrong to reset needUpdateEndpoints as there are other conditions that could set it to true, for example, needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType(). And it's redundant to "Check if the cut endpointUpdateList are all installed" twice when it exceeds the size. Could we cut the endpoints in the below place? https://github.com/vmware-tanzu/antrea/blob/eadf0921f88712b373df9f00c778ef81642def2f/pkg/agent/proxy/proxier.go#L244-L246

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it still seems wrong to reset needUpdateEndpoints as there are other conditions that could set it to true, for example, needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType().

needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType() should be considered.

Could we cut the endpoints in the below place?

Variable endpoints is a map, IMO, we can't cut the endpoints below the code above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to convert it to a list sooner or later, no harm to do it earlier? It can save the repeated calculation of needUpdateEndpoints in your current code L273-L278 and L294-303, and reduce the code complexity in some way I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

} else {
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
}

if len(endpoints) < len(endpointsInstalled) { // There are Endpoints which expired.
klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String())
needUpdateEndpoints = true
Expand Down Expand Up @@ -605,6 +644,7 @@ func NewProxier(
endpointsMap: types.EndpointsMap{},
endpointReferenceCounter: map[string]int{},
serviceStringMap: map[string]k8sproxy.ServicePortName{},
oversizeServiceSet: sets.NewString(),
groupCounter: types.NewGroupCounter(),
ofClient: ofClient,
isIPv6: isIPv6,
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ func testProxyServiceLifeCycle(ipFamily *corev1.IPFamily, ingressIPs []string, d

var groupKeyword string
if *ipFamily == corev1.IPv6Protocol {
groupKeyword = fmt.Sprintf("set_field:0x%s->xxreg3,load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv6.To16()), "0"), 80)
groupKeyword = fmt.Sprintf("set_field:0x%s->xxreg3,load:0x%x->NXM_NX_REG4[0..15]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv6.To16()), "0"), 80)
} else {
groupKeyword = fmt.Sprintf("load:0x%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"), 80)
groupKeyword = fmt.Sprintf("load:0x%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"), 80)
}
groupOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-groups", defaultBridgeName})
require.NoError(t, err)
Expand Down
9 changes: 7 additions & 2 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,16 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
nw_proto = 132
learnProtoField = "OXM_OF_SCTP_DST[]"
}

serviceLearnReg := 2
if stickyAge != 0 {
serviceLearnReg = 3
}
cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum)
svcFlows := expectTableFlows{tableID: 41, flows: []*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
ActStr: fmt.Sprintf("group:%d", gid),
ActStr: fmt.Sprintf("load:0x%x->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],group:%d", serviceLearnReg, gid),
},
{
MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
Expand All @@ -617,7 +622,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
for _, ep := range endpointList {
epIP := ipToHexString(net.ParseIP(ep.IP()))
epPort, _ := ep.Port()
bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],resubmit(,42)", epIP, epPort)
bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],resubmit(,42)", epIP, epPort)
groupBuckets = append(groupBuckets, bucket)

unionVal := (0b010 << 16) + uint32(epPort)
Expand Down