Skip to content

Commit

Permalink
Adding support for weighted k8s service (#2293)
Browse files Browse the repository at this point in the history
* Adding support for weighted k8s service

* Adding changelog

* if per-app weight is 0 then pull the weight to 1

* Addressing review comments

* Addressing review comments

* Addressing review comments

* Comment update

* Comment update

* Parameterized table test

* Parameterized table test

* fixing linting issue

* fixing linting issue

---------

Co-authored-by: srahul3 <[email protected]>
  • Loading branch information
srahul3 and srahul3 committed Jun 19, 2023
1 parent c4617fc commit 47d4063
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/2293.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
sync-catalog: add ability to support weighted loadbalancing by service annotation `consul.hashicorp.com/service-weight: <number>`
```
6 changes: 6 additions & 0 deletions control-plane/catalog/to-consul/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ const (
// annotationServiceMetaPrefix is the prefix for setting meta key/value
// for a service. The remainder of the key is the meta key.
annotationServiceMetaPrefix = "consul.hashicorp.com/service-meta-"

// annotationServiceWeight is the key of the annotation that determines
// the traffic weight of the service which is spanned over multiple k8s cluster.
// e.g. Service `backend` in k8s cluster `A` receives 25% of the traffic
// compared to same `backend` service in k8s cluster `B`.
annotationServiceWeight = "consul.hashicorp.com/service-weight"
)
41 changes: 41 additions & 0 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,19 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, ip)
r.Service.Address = ip
// Adding information about service weight.
// Overrides the existing weight if present.
if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" {
weightI, err := getServiceWeight(weight)
if err == nil {
r.Service.Weights = consulapi.AgentWeights{
Passing: weightI,
}
} else {
t.Log.Debug("[generateRegistrations] service weight err: ", err)
}
}

t.consulMap[key] = append(t.consulMap[key], &r)
}

Expand Down Expand Up @@ -547,6 +560,19 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr

// Adding information about service weight.
// Overrides the existing weight if present.
if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" {
weightI, err := getServiceWeight(weight)
if err == nil {
r.Service.Weights = consulapi.AgentWeights{
Passing: weightI,
}
} else {
t.Log.Debug("[generateRegistrations] service weight err: ", err)
}
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
Expand Down Expand Up @@ -999,3 +1025,18 @@ func (t *ServiceResource) isIngressService(key string) bool {
func consulHealthCheckID(k8sNS string, serviceID string) string {
return fmt.Sprintf("%s/%s", k8sNS, serviceID)
}

// Calculates the passing service weight.
func getServiceWeight(weight string) (int, error) {
// error validation if the input param is a number.
weightI, err := strconv.Atoi(weight)
if err != nil {
return -1, err
}

if weightI <= 1 {
return -1, fmt.Errorf("expecting the service annotation %s value to be greater than 1", annotationServiceWeight)
}

return weightI, nil
}
133 changes: 133 additions & 0 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,139 @@ func TestServiceResource_createDelete(t *testing.T) {
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_ingress(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "3.3.3.3"},
)

svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "4.4.4.4"},
)

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 3)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "3.3.3.3", actual[1].Service.Address)
require.Equal(r, 22, actual[1].Service.Weights.Passing)
require.Equal(r, "foo", actual[2].Service.Service)
require.Equal(r, "4.4.4.4", actual[2].Service.Address)
require.Equal(r, 22, actual[2].Service.Weights.Passing)
require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID)
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_externalIP(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Spec.ExternalIPs = []string{"3.3.3.3", "4.4.4.4"}

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 2)
require.Equal(r, "foo", actual[0].Service.Service)
require.Equal(r, "3.3.3.3", actual[0].Service.Address)
require.Equal(r, 22, actual[0].Service.Weights.Passing)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "4.4.4.4", actual[1].Service.Address)
require.Equal(r, 22, actual[1].Service.Weights.Passing)
require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID)
})
}

// Test service weight.
func TestServiceWeight(t *testing.T) {
t.Parallel()
cases := map[string]struct {
Weight string
ExpectError bool
ExtectedWeight int
}{
"external-IP": {
Weight: "22",
ExpectError: false,
ExtectedWeight: 22,
},
"non-int-weight": {
Weight: "non-int",
ExpectError: true,
ExtectedWeight: 0,
},
"one-weight": {
Weight: "1",
ExpectError: true,
ExtectedWeight: 0,
},
"zero-weight": {
Weight: "0",
ExpectError: true,
ExtectedWeight: 0,
},
"negative-weight": {
Weight: "-2",
ExpectError: true,
ExtectedWeight: 0,
},
"greater-than-100-is-allowed": {
Weight: "1000",
ExpectError: false,
ExtectedWeight: 1000,
},
}

for name, c := range cases {
t.Run(name, func(tt *testing.T) {
weightI, err := getServiceWeight(c.Weight)
// Verify what we got
retry.Run(tt, func(r *retry.R) {
if c.ExpectError {
require.Error(r, err)
} else {
require.Equal(r, c.ExtectedWeight, weightI)
}
})
})
}
}

// Test that we're default enabled.
func TestServiceResource_defaultEnable(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit 47d4063

Please sign in to comment.