Skip to content

Commit

Permalink
add pool service controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Apr 8, 2024
1 parent c137221 commit de158e6
Show file tree
Hide file tree
Showing 17 changed files with 2,570 additions and 201 deletions.
1 change: 1 addition & 0 deletions cmd/yurt-manager/names/controller_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ func YurtManagerControllerAliases() map[string]string {
"gatewaydns": GatewayDNSController,
"nodelifecycle": NodeLifeCycleController,
"nodebucket": NodeBucketController,
"poolservice": PoolServiceController,
}
}
4 changes: 2 additions & 2 deletions pkg/apis/network/v1alpha1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ limitations under the License.

package v1alpha1

// Package v1alpha1 contains API Schema definitions for the net v1alpha1API group
// Package v1alpha1 contains API Schema definitions for the network v1alpha1API group
// +kubebuilder:object:generate=true
// +groupName=net.openyurt.io
// +groupName=network.openyurt.io

import (
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/network/v1alpha1/poolservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type PoolServiceSpec struct {
// PoolServiceStatus defines the observed state of PoolService
type PoolServiceStatus struct {
// LoadBalancer contains the current status of the load-balancer in the current nodepool
LoadBalancer *v1.LoadBalancerStatus `json:"loadBalancer,omitempty"`
LoadBalancer v1.LoadBalancerStatus `json:"loadBalancer,omitempty"`

// Current poolService state
Conditions []metav1.Condition `json:"conditions,omitempty"`
Expand Down
7 changes: 1 addition & 6 deletions pkg/apis/network/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/yurtmanager/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodelifecycle"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodepool"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/platformadmin"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/poolservice"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/dns"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/gatewayinternalservice"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/gatewaypickup"
Expand Down Expand Up @@ -97,6 +98,7 @@ func NewControllerInitializers() map[string]InitFunc {
register(names.GatewayPublicServiceController, gatewaypublicservice.Add)
register(names.NodeLifeCycleController, nodelifecycle.Add)
register(names.NodeBucketController, nodebucket.Add)
register(names.PoolServiceController, poolservice.Add)

return controllers
}
Expand Down
167 changes: 167 additions & 0 deletions pkg/yurtmanager/controller/poolservice/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package poolservice

import (
"reflect"
"sort"
"strings"

corev1 "k8s.io/api/core/v1"

"github.com/openyurtio/openyurt/pkg/apis/network"
netv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1"
)

func aggregatePoolServicesAnnotations(poolServices []netv1alpha1.PoolService) map[string]string {
aggregatedAnnotations := make(map[string]string)
for _, ps := range poolServices {
aggregatedAnnotations = mergeAnnotations(aggregatedAnnotations, filterIgnoredKeys(ps.Annotations))
}

return aggregatedAnnotations
}

func filterIgnoredKeys(annotations map[string]string) map[string]string {
newAnnotations := make(map[string]string)
for key, value := range annotations {
if key == network.AnnotationNodePoolSelector {
continue
}
if !strings.HasPrefix(key, network.AggregateAnnotationsKeyPrefix) {
continue
}
newAnnotations[key] = value
}
return newAnnotations
}

func mergeAnnotations(m map[string]string, elem map[string]string) map[string]string {
if len(elem) == 0 {
return m
}

if m == nil {
m = make(map[string]string)

Check warning on line 59 in pkg/yurtmanager/controller/poolservice/annotations.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/annotations.go#L59

Added line #L59 was not covered by tests
}

for k, v := range elem {
m[k] = mergeAnnotationValue(m[k], v)
}

return m
}

func mergeAnnotationValue(originalValue, addValue string) string {
if len(originalValue) == 0 {
return addValue
}

if len(addValue) == 0 {
return originalValue

Check warning on line 75 in pkg/yurtmanager/controller/poolservice/annotations.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/annotations.go#L75

Added line #L75 was not covered by tests
}

splitOriginalValues := strings.Split(originalValue, ",")
if valueIsExist(splitOriginalValues, addValue) {
return originalValue
}

return joinNewValue(splitOriginalValues, addValue)
}

func valueIsExist(originalValueList []string, addValue string) bool {
for _, oldValue := range originalValueList {
if addValue == oldValue {
return true
}
}
return false
}

func joinNewValue(originalValueList []string, addValue string) string {
originalValueList = append(originalValueList, addValue)
sort.Strings(originalValueList)

return strings.Join(originalValueList, ",")
}

func compareAndUpdateServiceAnnotations(svc *corev1.Service, aggregatedAnnotations map[string]string) bool {
currentAggregatedServiceAnnotations := filterIgnoredKeys(svc.Annotations)

if reflect.DeepEqual(currentAggregatedServiceAnnotations, aggregatedAnnotations) {
return false
}

update, deletion := diffAnnotations(currentAggregatedServiceAnnotations, aggregatedAnnotations)
updateAnnotations(svc.Annotations, update, deletion)

return true
}

func diffAnnotations(currentAnnotations, desiredAnnotations map[string]string) (update map[string]string, deletion map[string]string) {
if currentAnnotations == nil {
return desiredAnnotations, nil

Check warning on line 117 in pkg/yurtmanager/controller/poolservice/annotations.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/annotations.go#L117

Added line #L117 was not covered by tests
}
if desiredAnnotations == nil {
return nil, currentAnnotations

Check warning on line 120 in pkg/yurtmanager/controller/poolservice/annotations.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/annotations.go#L120

Added line #L120 was not covered by tests
}

update = make(map[string]string)
for key, value := range desiredAnnotations {
if currentAnnotations[key] != value {
update[key] = value
}
}

deletion = make(map[string]string)
for key, value := range currentAnnotations {
if _, exist := desiredAnnotations[key]; !exist {
deletion[key] = value
}
}
return
}

func updateAnnotations(annotations, update, deletion map[string]string) {
if len(update) == 0 && len(deletion) == 0 {
return

Check warning on line 141 in pkg/yurtmanager/controller/poolservice/annotations.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/annotations.go#L141

Added line #L141 was not covered by tests
}
if annotations == nil {
annotations = make(map[string]string)

Check warning on line 144 in pkg/yurtmanager/controller/poolservice/annotations.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/annotations.go#L144

Added line #L144 was not covered by tests
}
for key, value := range update {
annotations[key] = value
}

for key, _ := range deletion {

Check failure on line 150 in pkg/yurtmanager/controller/poolservice/annotations.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofmt`-ed with `-s` (gofmt)
delete(annotations, key)
}
}

func annotationValueIsEqual(oldAnnotations, newAnnotations map[string]string, key string) bool {
var oldValue string
if oldAnnotations != nil {
oldValue = oldAnnotations[key]
}

var newValue string
if newAnnotations != nil {
newValue = newAnnotations[key]
}

return oldValue == newValue
}
162 changes: 162 additions & 0 deletions pkg/yurtmanager/controller/poolservice/event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package poolservice

import (
"context"
"reflect"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/openyurtio/openyurt/cmd/yurt-manager/names"
"github.com/openyurtio/openyurt/pkg/apis/apps/v1beta1"
"github.com/openyurtio/openyurt/pkg/apis/network"
"github.com/openyurtio/openyurt/pkg/apis/network/v1alpha1"
)

func NewPoolServiceEventHandler() handler.EventHandler {
return handler.Funcs{
CreateFunc: func(event event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
handlePoolServiceNormal(event.Object, limitingInterface)
},
UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
handlePoolServiceUpdate(updateEvent.ObjectOld, updateEvent.ObjectNew, limitingInterface)
},
DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
handlePoolServiceNormal(deleteEvent.Object, limitingInterface)
},
GenericFunc: func(genericEvent event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) {
handlePoolServiceNormal(genericEvent.Object, limitingInterface)
},

Check warning on line 51 in pkg/yurtmanager/controller/poolservice/event_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/event_handler.go#L46-L51

Added lines #L46 - L51 were not covered by tests
}
}

func handlePoolServiceNormal(event client.Object, q workqueue.RateLimitingInterface) {
ps := event.(*v1alpha1.PoolService)
serviceName := getServiceNameFromPoolService(ps)
enqueueService(ps.Namespace, serviceName, q)
}

func getServiceNameFromPoolService(poolService *v1alpha1.PoolService) string {
if poolService.Labels == nil {
return ""

Check warning on line 63 in pkg/yurtmanager/controller/poolservice/event_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/event_handler.go#L63

Added line #L63 was not covered by tests
}

serviceName, ok := poolService.Labels[network.LabelServiceName]
if !ok {
return ""
}
return serviceName
}

func enqueueService(namespace, serviceName string, q workqueue.RateLimitingInterface) {
if len(serviceName) == 0 || len(namespace) == 0 {
return
}

q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: namespace, Name: serviceName},
})
}

func handlePoolServiceUpdate(oldObject, newObject client.Object, q workqueue.RateLimitingInterface) {
oldPs := oldObject.(*v1alpha1.PoolService)
newPs := newObject.(*v1alpha1.PoolService)

oldServiceName := getServiceNameFromPoolService(oldPs)
newServiceName := getServiceNameFromPoolService(newPs)

if oldServiceName != newServiceName {
klog.Warningf("service name of %s/%s is changed from %s to %s", oldPs.Namespace, oldPs.Name, oldServiceName, newServiceName)
enqueueService(oldPs.Namespace, oldServiceName, q)
enqueueService(newPs.Namespace, newServiceName, q)
return
}
enqueueService(newPs.Namespace, newServiceName, q)

return
}

func NewNodePoolEventHandler(c client.Client) handler.EventHandler {
return handler.Funcs{
CreateFunc: func(createEvent event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
allMultiRegionLbServicesEnqueue(c, limitingInterface)
},
UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
handleNodePoolUpdate(c, updateEvent, limitingInterface)
},
DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
nodePoolRelatedServiceEnqueue(c, deleteEvent.Object, limitingInterface)
},
GenericFunc: func(genericEvent event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) {
nodePoolRelatedServiceEnqueue(c, genericEvent.Object, limitingInterface)
},
}
}

func allMultiRegionLbServicesEnqueue(c client.Client, q workqueue.RateLimitingInterface) {
services := &v1.ServiceList{}
err := c.List(context.Background(), services)
if err != nil {
return

Check warning on line 122 in pkg/yurtmanager/controller/poolservice/event_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/event_handler.go#L122

Added line #L122 was not covered by tests
}

for _, svc := range services.Items {
if !isMultiRegionalLbService(&svc) {
continue
}
enqueueService(svc.Namespace, svc.Name, q)
}
}

func handleNodePoolUpdate(c client.Client, updateEvent event.UpdateEvent, q workqueue.RateLimitingInterface) {
oldNp := updateEvent.ObjectOld.(*v1beta1.NodePool)
newNp := updateEvent.ObjectNew.(*v1beta1.NodePool)

if reflect.DeepEqual(oldNp.Labels, newNp.Labels) {
return
}

allMultiRegionLbServicesEnqueue(c, q)
}

func nodePoolRelatedServiceEnqueue(c client.Client, object client.Object, q workqueue.RateLimitingInterface) {
np := object.(*v1beta1.NodePool)
poolServiceList := &v1alpha1.PoolServiceList{}

listSelector := client.MatchingLabels{
network.LabelNodePoolName: np.Name,
labelManageBy: names.PoolServiceController}

if err := c.List(context.Background(), poolServiceList, listSelector); err != nil {
return

Check warning on line 153 in pkg/yurtmanager/controller/poolservice/event_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/event_handler.go#L153

Added line #L153 was not covered by tests
}

for _, item := range poolServiceList.Items {
if !hasServiceName(&item) {
continue

Check warning on line 158 in pkg/yurtmanager/controller/poolservice/event_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/poolservice/event_handler.go#L158

Added line #L158 was not covered by tests
}
handlePoolServiceNormal(&item, q)
}
}
Loading

0 comments on commit de158e6

Please sign in to comment.