Skip to content

Commit

Permalink
optimize provider network (kubeovn#2099)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed Dec 6, 2022
1 parent 410c8af commit 7ad735b
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 182 deletions.
4 changes: 4 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ spec:
type: array
items:
type: string
notReadyNodes:
type: array
items:
type: string
vlans:
type: array
items:
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ type ProviderNetworkStatus struct {
// +optional
ReadyNodes []string `json:"readyNodes,omitempty"`

// +optional
NotReadyNodes []string `json:"notReadyNodes,omitempty"`

// +optional
Vlans []string `json:"vlans,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeovn/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 4 additions & 13 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ type Controller struct {
vlansLister kubeovnlister.VlanLister
vlanSynced cache.InformerSynced

providerNetworksLister kubeovnlister.ProviderNetworkLister
providerNetworkSynced cache.InformerSynced
updateProviderNetworkQueue workqueue.RateLimitingInterface
providerNetworksLister kubeovnlister.ProviderNetworkLister
providerNetworkSynced cache.InformerSynced

addVlanQueue workqueue.RateLimitingInterface
delVlanQueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -211,9 +210,8 @@ func NewController(config *Configuration) *Controller {
delVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DelVlan"),
updateVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVlan"),

providerNetworksLister: providerNetworkInformer.Lister(),
providerNetworkSynced: providerNetworkInformer.Informer().HasSynced,
updateProviderNetworkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateProviderNetwork"),
providerNetworksLister: providerNetworkInformer.Lister(),
providerNetworkSynced: providerNetworkInformer.Informer().HasSynced,

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
Expand Down Expand Up @@ -323,10 +321,6 @@ func NewController(config *Configuration) *Controller {
UpdateFunc: controller.enqueueUpdateVlan,
})

providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.enqueueUpdateProviderNetwork,
})

if config.EnableNP {
npInformer := informerFactory.Networking().V1().NetworkPolicies()
controller.npsLister = npInformer.Lister()
Expand Down Expand Up @@ -457,8 +451,6 @@ func (c *Controller) shutdown() {
c.delVlanQueue.ShutDown()
c.updateVlanQueue.ShutDown()

c.updateProviderNetworkQueue.ShutDown()

c.addOrUpdateVpcQueue.ShutDown()
c.updateVpcStatusQueue.ShutDown()
c.delVpcQueue.ShutDown()
Expand Down Expand Up @@ -543,7 +535,6 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {

go wait.Until(c.runDelVpcWorker, time.Second, stopCh)
go wait.Until(c.runUpdateVpcStatusWorker, time.Second, stopCh)
go wait.Until(c.runUpdateProviderNetworkWorker, time.Second, stopCh)

if c.config.EnableLb {
// run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
Expand Down
149 changes: 48 additions & 101 deletions pkg/controller/provider-network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,126 +4,73 @@ import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"github.com/kubeovn/kube-ovn/pkg/util"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
)

func (c *Controller) enqueueUpdateProviderNetwork(_, obj interface{}) {
if !c.isLeader() {
return
}

key, err := cache.MetaNamespaceKeyFunc(obj)
func (c *Controller) resyncProviderNetworkStatus() {
klog.Infof("start to sync ProviderNetwork status")
pns, err := c.providerNetworksLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
klog.Errorf("failed to list provider network: %v", err)
return
}

klog.V(3).Infof("enqueue update provider network %s", key)
c.updateProviderNetworkQueue.Add(key)
}

func (c *Controller) runUpdateProviderNetworkWorker() {
for c.processNextUpdateProviderNetworkWorkItem() {
}
}

func (c *Controller) processNextUpdateProviderNetworkWorkItem() bool {
obj, shutdown := c.updateProviderNetworkQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.updateProviderNetworkQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.updateProviderNetworkQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleUpdateProviderNetwork(key); err != nil {
c.updateProviderNetworkQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.updateProviderNetworkQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
}

return true
}

func (c *Controller) handleUpdateProviderNetwork(key string) error {
pn, err := c.providerNetworksLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}

nodes, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list nodes: %v", err)
return err
return
}

if providerNetworkIsReady(pn, nodes) != pn.Status.Ready {
newPn := pn.DeepCopy()
newPn.Status.Ready = !pn.Status.Ready
_, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
return err
for _, cachedPn := range pns {
pn := cachedPn.DeepCopy()
var readyNodes, notReadyNodes, expectNodes []string
for _, node := range nodes {
if util.ContainsString(pn.Spec.ExcludeNodes, node.Name) {
pn.Status.RemoveNodeConditions(node.Name)
continue
}
if node.Labels[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] == "true" {
pn.Status.SetNodeReady(node.Name, "InitOVSBridgeSucceeded", "")
readyNodes = append(readyNodes, node.Name)
} else {
pods, err := c.config.KubeClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
LabelSelector: "app=kube-ovn-cni",
FieldSelector: fmt.Sprintf("spec.nodeName=%s", node.Name),
})
if err != nil {
klog.Errorf("failed to list pod: %v", err)
continue
}

var errMsg string
if len(pods.Items) == 1 && pods.Items[0].Annotations != nil {
errMsg = pods.Items[0].Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, pn.Name)]
}
pn.Status.SetNodeNotReady(node.Name, "InitOVSBridgeFailed", errMsg)
notReadyNodes = append(notReadyNodes, node.Name)
}
}
}

return nil
}

func (c *Controller) resyncProviderNetworkStatus() {
nodeList, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get nodes %v", err)
return
}
pnList, err := c.providerNetworksLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get provider networks %v", err)
return
}

for _, pn := range pnList {
if providerNetworkIsReady(pn, nodeList) != pn.Status.Ready {
newPn := pn.DeepCopy()
newPn.Status.Ready = !newPn.Status.Ready
_, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
expectNodes = append(readyNodes, notReadyNodes...)
conditionsChange := false
for _, c := range pn.Status.Conditions {
if !util.ContainsString(expectNodes, c.Node) {
pn.Status.RemoveNodeConditions(c.Node)
conditionsChange = true
}
}
}
}

func providerNetworkIsReady(pn *kubeovnv1.ProviderNetwork, nodes []*corev1.Node) bool {
for _, node := range nodes {
if !util.ContainsString(pn.Spec.ExcludeNodes, node.Name) &&
!util.ContainsString(pn.Status.ReadyNodes, node.Name) {
return false
if conditionsChange || len(util.DiffStringSlice(pn.Status.ReadyNodes, readyNodes)) != 0 ||
len(util.DiffStringSlice(pn.Status.NotReadyNodes, notReadyNodes)) != 0 {
pn.Status.ReadyNodes = readyNodes
pn.Status.NotReadyNodes = notReadyNodes
pn.Status.Ready = (len(notReadyNodes) == 0)
if _, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), pn, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
}
}
}
return true
}
Loading

0 comments on commit 7ad735b

Please sign in to comment.