Skip to content

Commit

Permalink
Fix: Send BGP Withdrawals for Service VIPs Upon Service Deletion (#756)
Browse files Browse the repository at this point in the history
* Refactor: seperate fetching service VIPs from advertise/withdrawal decision

* Refactor: simplify advertise/withdrawal logic

* Pass svcDeleted param to getVIPsForService

* Don't advertise VIPs from deleted services

* Test for withdrawing VIPs from deleted service

* Refactor: use explicit handleServiceDelete functions
  • Loading branch information
tompntn authored and murali-reddy committed Sep 19, 2019
1 parent 3aacd48 commit d6f9f31
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 22 deletions.
75 changes: 55 additions & 20 deletions pkg/controllers/routing/ecmp_vip.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"strconv"
"time"

"strings"

"github.com/golang/glog"
"github.com/osrg/gobgp/packet/bgp"
"github.com/osrg/gobgp/table"
v1core "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"strings"
)

// bgpAdvertiseVIP advertises the service vip (cluster ip or load balancer ip or external IP) the configured peers
Expand Down Expand Up @@ -103,13 +104,36 @@ func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) {
nrc.withdrawVIPs(toWithdraw)
}

func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) {

if !nrc.bgpServerStarted {
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
return
}

err := nrc.AddPolicies()
if err != nil {
glog.Errorf("Error adding BGP policies: %s", err.Error())
}

nrc.withdrawVIPs(nrc.getAllVIPsForService(svc))

}

func (nrc *NetworkRoutingController) tryHandleServiceUpdate(obj interface{}, logMsgFormat string) {
if svc := getServiceObject(obj); svc != nil {
glog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name)
nrc.handleServiceUpdate(svc)
}
}

func (nrc *NetworkRoutingController) tryHandleServiceDelete(obj interface{}, logMsgFormat string) {
if svc := getServiceObject(obj); svc != nil {
glog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name)
nrc.handleServiceDelete(svc)
}
}

// OnServiceCreate handles new service create event from the kubernetes API server
func (nrc *NetworkRoutingController) OnServiceCreate(obj interface{}) {
nrc.tryHandleServiceUpdate(obj, "Received new service: %s/%s from watch API")
Expand Down Expand Up @@ -141,7 +165,7 @@ func getMissingPrevGen(old, new []string) (withdrawIPs []string) {

// OnServiceDelete handles the service delete updates from the kubernetes API server
func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
nrc.tryHandleServiceUpdate(obj, "Received event to delete service: %s/%s from watch API")
nrc.tryHandleServiceDelete(obj, "Received event to delete service: %s/%s from watch API")
}

func (nrc *NetworkRoutingController) newEndpointsEventHandler() cache.ResourceEventHandler {
Expand Down Expand Up @@ -306,24 +330,38 @@ func (nrc *NetworkRoutingController) shouldAdvertiseService(svc *v1core.Service,
}

func (nrc *NetworkRoutingController) getVIPsForService(svc *v1core.Service, onlyActiveEndpoints bool) ([]string, []string, error) {
ipList := make([]string, 0)
var err error

nodeHasEndpoints := true
if onlyActiveEndpoints {
_, isLocal := svc.Annotations[svcLocalAnnotation]
if isLocal || svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal {
nodeHasEndpoints, err = nrc.nodeHasEndpointsForService(svc)
if err != nil {
return nil, nil, err
}

advertise := true

_, hasLocalAnnotation := svc.Annotations[svcLocalAnnotation]
hasLocalTrafficPolicy := svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal
isLocal := hasLocalAnnotation || hasLocalTrafficPolicy

if onlyActiveEndpoints && isLocal {
var err error
advertise, err = nrc.nodeHasEndpointsForService(svc)
if err != nil {
return nil, nil, err
}
}

ipList := nrc.getAllVIPsForService(svc)

if !advertise {
return nil, ipList, nil
}

return ipList, nil, nil
}

func (nrc *NetworkRoutingController) getAllVIPsForService(svc *v1core.Service) []string {

ipList := make([]string, 0)

if nrc.shouldAdvertiseService(svc, svcAdvertiseClusterAnnotation, nrc.advertiseClusterIP) {
clusterIp := nrc.getClusterIp(svc)
if clusterIp != "" {
ipList = append(ipList, clusterIp)
clusterIP := nrc.getClusterIp(svc)
if clusterIP != "" {
ipList = append(ipList, clusterIP)
}
}

Expand All @@ -338,11 +376,8 @@ func (nrc *NetworkRoutingController) getVIPsForService(svc *v1core.Service, only
ipList = append(ipList, nrc.getLoadBalancerIps(svc)...)
}

if !nodeHasEndpoints {
return nil, ipList, nil
}
return ipList

return ipList, nil, nil
}

func isEndpointsForLeaderElection(ep *v1core.Endpoints) bool {
Expand Down
38 changes: 36 additions & 2 deletions pkg/controllers/routing/ecmp_vip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func Equal(a, b []string) bool {
type ServiceAdvertisedIPs struct {
service *v1core.Service
advertisedIPs []string
withdrawnIPs []string
annotations map[string]string
}

Expand Down Expand Up @@ -97,21 +98,25 @@ func Test_getVIPsForService(t *testing.T) {
{
services["cluster"],
[]string{"10.0.0.1"},
[]string{},
nil,
},
{
services["external"],
[]string{"10.0.0.1", "1.1.1.1"},
[]string{},
nil,
},
{
services["nodeport"],
[]string{"10.0.0.1", "1.1.1.1"},
[]string{},
nil,
},
{
services["loadbalancer"],
[]string{"10.0.0.1", "10.0.255.1", "10.0.255.2"},
[]string{},
nil,
},
},
Expand All @@ -123,21 +128,25 @@ func Test_getVIPsForService(t *testing.T) {
{
services["cluster"],
[]string{},
[]string{},
nil,
},
{
services["external"],
[]string{},
[]string{},
nil,
},
{
services["nodeport"],
[]string{},
[]string{},
nil,
},
{
services["loadbalancer"],
[]string{},
[]string{},
nil,
},
},
Expand All @@ -149,21 +158,25 @@ func Test_getVIPsForService(t *testing.T) {
{
services["cluster"],
[]string{"10.0.0.1"},
[]string{},
nil,
},
{
services["external"],
[]string{"10.0.0.1"},
[]string{},
nil,
},
{
services["nodeport"],
[]string{"10.0.0.1"},
[]string{},
nil,
},
{
services["loadbalancer"],
[]string{"10.0.0.1"},
[]string{},
nil,
},
},
Expand All @@ -175,21 +188,25 @@ func Test_getVIPsForService(t *testing.T) {
{
services["cluster"],
[]string{},
[]string{},
nil,
},
{
services["external"],
[]string{"1.1.1.1"},
[]string{},
nil,
},
{
services["nodeport"],
[]string{"1.1.1.1"},
[]string{},
nil,
},
{
services["loadbalancer"],
[]string{},
[]string{},
nil,
},
},
Expand All @@ -201,21 +218,25 @@ func Test_getVIPsForService(t *testing.T) {
{
services["cluster"],
[]string{},
[]string{},
nil,
},
{
services["external"],
[]string{},
[]string{},
nil,
},
{
services["nodeport"],
[]string{},
[]string{},
nil,
},
{
services["loadbalancer"],
[]string{"10.0.255.1", "10.0.255.2"},
[]string{},
nil,
},
},
Expand All @@ -227,6 +248,7 @@ func Test_getVIPsForService(t *testing.T) {
{
services["cluster"],
[]string{"10.0.0.1"},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "true",
svcAdvertiseExternalAnnotation: "true",
Expand All @@ -236,6 +258,7 @@ func Test_getVIPsForService(t *testing.T) {
{
services["external"],
[]string{"10.0.0.1", "1.1.1.1"},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "true",
svcAdvertiseExternalAnnotation: "true",
Expand All @@ -245,6 +268,7 @@ func Test_getVIPsForService(t *testing.T) {
{
services["nodeport"],
[]string{"10.0.0.1", "1.1.1.1"},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "true",
svcAdvertiseExternalAnnotation: "true",
Expand All @@ -254,6 +278,7 @@ func Test_getVIPsForService(t *testing.T) {
{
services["loadbalancer"],
[]string{"10.0.0.1", "10.0.255.1", "10.0.255.2"},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "true",
svcAdvertiseExternalAnnotation: "true",
Expand All @@ -264,6 +289,7 @@ func Test_getVIPsForService(t *testing.T) {
// Special case to test svcAdvertiseLoadBalancerAnnotation vs legacy svcSkipLbIpsAnnotation
services["loadbalancer"],
[]string{"10.0.0.1"},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "true",
svcAdvertiseExternalAnnotation: "true",
Expand All @@ -280,6 +306,7 @@ func Test_getVIPsForService(t *testing.T) {
{
services["cluster"],
[]string{},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "false",
svcAdvertiseExternalAnnotation: "false",
Expand All @@ -289,6 +316,7 @@ func Test_getVIPsForService(t *testing.T) {
{
services["external"],
[]string{},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "false",
svcAdvertiseExternalAnnotation: "false",
Expand All @@ -298,6 +326,7 @@ func Test_getVIPsForService(t *testing.T) {
{
services["nodeport"],
[]string{},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "false",
svcAdvertiseExternalAnnotation: "false",
Expand All @@ -307,6 +336,7 @@ func Test_getVIPsForService(t *testing.T) {
{
services["loadbalancer"],
[]string{},
[]string{},
map[string]string{
svcAdvertiseClusterAnnotation: "false",
svcAdvertiseExternalAnnotation: "false",
Expand All @@ -331,10 +361,14 @@ func Test_getVIPsForService(t *testing.T) {
serviceAdvertisedIP.service.ObjectMeta.Annotations = serviceAdvertisedIP.annotations
}
svc, _ := clientset.CoreV1().Services("default").Create(serviceAdvertisedIP.service)
advertisedIPs, _, _ := nrc.getVIPsForService(svc, false)
advertisedIPs, withdrawnIPs, _ := nrc.getVIPsForService(svc, false)
t.Logf("AdvertisedIPs: %v\n", advertisedIPs)
t.Logf("WithdrawnIPs: %v\n", withdrawnIPs)
if !Equal(serviceAdvertisedIP.advertisedIPs, advertisedIPs) {
t.Errorf("Advertised IPs are incorrect, got: %v, want: %v.", serviceAdvertisedIP.advertisedIPs, advertisedIPs)
t.Errorf("Advertised IPs are incorrect, got: %v, want: %v.", advertisedIPs, serviceAdvertisedIP.advertisedIPs)
}
if !Equal(serviceAdvertisedIP.withdrawnIPs, withdrawnIPs) {
t.Errorf("Withdrawn IPs are incorrect, got: %v, want: %v.", withdrawnIPs, serviceAdvertisedIP.withdrawnIPs)
}
}
})
Expand Down

0 comments on commit d6f9f31

Please sign in to comment.