Skip to content

Commit

Permalink
infer endpoint is local from endpoints "subset.addresses.nodeName" (#560
Browse files Browse the repository at this point in the history
)

Fixes #557
  • Loading branch information
murali-reddy authored Oct 27, 2018
1 parent d9570c5 commit 827bbbc
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 105 deletions.
101 changes: 32 additions & 69 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type ipvsCalls interface {
ipvsDelService(ipvsSvc *ipvs.Service) error
ipvsUpdateService(ipvsSvc *ipvs.Service) error
ipvsGetServices() ([]*ipvs.Service, error)
ipvsAddServer(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination, local bool, podCidr string) error
ipvsAddServer(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error
ipvsNewDestination(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error
ipvsUpdateDestination(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error
ipvsGetDestinations(ipvsSvc *ipvs.Service) ([]*ipvs.Destination, error)
Expand Down Expand Up @@ -241,8 +241,9 @@ type serviceInfoMap map[string]*serviceInfo

// internal representation of endpoints
type endpointsInfo struct {
ip string
port int
ip string
port int
isLocal bool
}

// map of all endpoints, with unique service id(namespace name, service name, port) as key
Expand Down Expand Up @@ -469,29 +470,13 @@ type externalIPService struct {
externalIp string
}

func hasActiveEndpoints(svc *serviceInfo, endpoints []endpointsInfo, nodePodCidrStr string) bool {
if svc.local {
_, nodePodCidr, err := net.ParseCIDR(nodePodCidrStr)
if err != nil {
glog.Errorf("Failed to ParseCIDR %s for hasActiveEndpoints on service %s/%s",
nodePodCidrStr, svc.namespace, svc.name)
return false
func hasActiveEndpoints(svc *serviceInfo, endpoints []endpointsInfo) bool {
for _, endpoint := range endpoints {
if endpoint.isLocal {
return true
}
for _, endpoint := range endpoints {
ip := net.ParseIP(endpoint.ip)
if ip == nil {
glog.Errorf("Failed to ParseCIDR %s for endpoint in hasActiveEndpoints on service %s/%s",
endpoint.ip, svc.namespace, svc.name)
continue
}
if nodePodCidr.Contains(ip) {
return true
}
}
return false
}

return len(endpoints) > 0
return false
}

// sync the ipvs service and server details configured to reflect the desired state of services and endpoint
Expand Down Expand Up @@ -555,7 +540,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf

endpoints := endpointsInfoMap[k]

if !hasActiveEndpoints(svc, endpoints, nsc.podCidr) {
if svc.local && !hasActiveEndpoints(svc, endpoints) {
glog.V(1).Infof("Skipping service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name)
continue
}
Expand Down Expand Up @@ -689,8 +674,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf

activeServiceEndpointMap[externalIpServiceId] = make([]string, 0)
for _, endpoint := range endpoints {
isLocal, _ := isLocalEndpoint(endpoint.ip, nsc.podCidr)
if !svc.local || (svc.local && isLocal) {
if !svc.local || (svc.local && endpoint.isLocal) {
activeServiceEndpointMap[externalIpServiceId] = append(activeServiceEndpointMap[externalIpServiceId], endpoint.ip)
}
}
Expand All @@ -705,37 +689,39 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
Weight: 1,
}

err := nsc.ln.ipvsAddServer(ipvsClusterVipSvc, &dst, svc.local, nsc.podCidr)
if err != nil {
glog.Errorf(err.Error())
}

isLocal, err := isLocalEndpoint(endpoint.ip, nsc.podCidr)
if !svc.local || (svc.local && isLocal) {
activeServiceEndpointMap[clusterServiceId] = append(activeServiceEndpointMap[clusterServiceId], endpoint.ip)
if !svc.local || (svc.local && endpoint.isLocal) {
err := nsc.ln.ipvsAddServer(ipvsClusterVipSvc, &dst)
if err != nil {
glog.Errorf(err.Error())
} else {
activeServiceEndpointMap[clusterServiceId] = append(activeServiceEndpointMap[clusterServiceId], endpoint.ip)
}
}

if svc.nodePort != 0 {
for i := 0; i < len(ipvsNodeportSvcs); i++ {
err := nsc.ln.ipvsAddServer(ipvsNodeportSvcs[i], &dst, svc.local, nsc.podCidr)
if err != nil {
glog.Errorf(err.Error())
}

if !svc.local || (svc.local && isLocal) {
activeServiceEndpointMap[nodeServiceIds[i]] = append(activeServiceEndpointMap[clusterServiceId], endpoint.ip)
if !svc.local || (svc.local && endpoint.isLocal) {
err := nsc.ln.ipvsAddServer(ipvsNodeportSvcs[i], &dst)
if err != nil {
glog.Errorf(err.Error())
} else {
activeServiceEndpointMap[nodeServiceIds[i]] = append(activeServiceEndpointMap[nodeServiceIds[i]], endpoint.ip)
}
}
}
}

for _, externalIpService := range externalIpServices {
if svc.local && !endpoint.isLocal {
continue
}

if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
dst.ConnectionFlags = ipvs.ConnectionFlagTunnel
}

// add server to IPVS service
err := nsc.ln.ipvsAddServer(externalIpService.ipvsSvc, &dst, svc.local, nsc.podCidr)
err := nsc.ln.ipvsAddServer(externalIpService.ipvsSvc, &dst)
if err != nil {
glog.Errorf(err.Error())
}
Expand Down Expand Up @@ -871,17 +857,6 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
return nil
}

func isLocalEndpoint(ip, podCidr string) (bool, error) {
_, ipnet, err := net.ParseCIDR(podCidr)
if err != nil {
return false, err
}
if ipnet.Contains(net.ParseIP(ip)) {
return true, nil
}
return false, nil
}

func (nsc *NetworkServicesController) getPodObjectForEndpoint(endpointIP string) (*api.Pod, error) {
for _, obj := range nsc.podLister.List() {
pod := obj.(*api.Pod)
Expand Down Expand Up @@ -1145,7 +1120,8 @@ func (nsc *NetworkServicesController) buildEndpointsInfo() endpointsInfoMap {
svcId := generateServiceId(ep.Namespace, ep.Name, port.Name)
endpoints := make([]endpointsInfo, 0)
for _, addr := range epSubset.Addresses {
endpoints = append(endpoints, endpointsInfo{ip: addr.IP, port: int(port.Port)})
isLocal := addr.NodeName != nil && *addr.NodeName == nsc.nodeHostName
endpoints = append(endpoints, endpointsInfo{ip: addr.IP, port: int(port.Port), isLocal: isLocal})
}
endpointsMap[svcId] = shuffle(endpoints)
}
Expand Down Expand Up @@ -1581,20 +1557,7 @@ func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint1
return &svc, nil
}

func (ln *linuxNetworking) ipvsAddServer(service *ipvs.Service, dest *ipvs.Destination, local bool, podCidr string) error {
//for service.local enabled svc, only forward traffic to the pod on local node
if local {
_, ipnet, err := net.ParseCIDR(podCidr)
if err != nil {
glog.Infof("Failed to ParseCIDR %s for adding destination %s to the service %s",
podCidr, ipvsDestinationString(dest), ipvsServiceString(service))
return nil
}
if !ipnet.Contains(dest.Address) {
return nil
}
}

func (ln *linuxNetworking) ipvsAddServer(service *ipvs.Service, dest *ipvs.Destination) error {
err := ln.ipvsNewDestination(service, dest)
if err == nil {
glog.V(2).Infof("Successfully added destination %s to the service %s",
Expand Down
Loading

0 comments on commit 827bbbc

Please sign in to comment.