From bfe3f414c170907549f0eabd11bc030bb5f1191c Mon Sep 17 00:00:00 2001 From: Anna Khmelnitsky Date: Tue, 8 Feb 2022 01:17:37 +0000 Subject: [PATCH] Support preallocate continuous IPs for StatefulSet In order to provide better user experience, AtreamIPAM will try to preallocate continuous IP range for StatefulSet. If unsuccesful, IPs for the StatfulSet will be allocated on the fly, as before. Signed-off-by: Anna Khmelnitsky --- build/yamls/antrea-aks.yml | 1 + build/yamls/antrea-eks.yml | 1 + build/yamls/antrea-gke.yml | 1 + build/yamls/antrea-ipsec.yml | 1 + build/yamls/antrea-kind.yml | 1 + build/yamls/antrea.yml | 1 + build/yamls/base/controller-rbac.yml | 1 + .../cniserver/ipam/antrea_ipam_controller.go | 13 +- pkg/agent/cniserver/ipam/antrea_ipam_test.go | 23 +- pkg/controller/ipam/antrea_ipam_controller.go | 150 ++++++++++-- .../ipam/antrea_ipam_controller_test.go | 213 ++++++++---------- pkg/{agent/cniserver => }/ipam/annotations.go | 0 pkg/ipam/ipallocator/allocator_test.go | 3 +- pkg/ipam/poolallocator/allocator.go | 16 ++ test/e2e/antreaipam_service_test.go | 4 +- test/e2e/antreaipam_test.go | 8 +- 16 files changed, 267 insertions(+), 170 deletions(-) rename pkg/{agent/cniserver => }/ipam/annotations.go (100%) diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 37158f849eb..cb3bbabbebc 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3867,6 +3867,7 @@ rules: - get - list - watch + - create - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 41f3ee07570..72dc2daf99d 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3867,6 +3867,7 @@ rules: - get - list - watch + - create - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 09ded9d693f..40e29f408a9 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3867,6 +3867,7 @@ rules: - get - list - watch + - create - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 699821483d6..498dd915473 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3867,6 +3867,7 @@ rules: - get - list - watch + - create - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: diff --git a/build/yamls/antrea-kind.yml b/build/yamls/antrea-kind.yml index 5613672862a..bb3f31e0dec 100644 --- a/build/yamls/antrea-kind.yml +++ b/build/yamls/antrea-kind.yml @@ -3867,6 +3867,7 @@ rules: - get - list - watch + - create - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index f0258e4e2f8..1699ac4ee71 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3867,6 +3867,7 @@ rules: - get - list - watch + - create - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: diff --git a/build/yamls/base/controller-rbac.yml b/build/yamls/base/controller-rbac.yml index 0401cb68396..39e889c010e 100644 --- a/build/yamls/base/controller-rbac.yml +++ b/build/yamls/base/controller-rbac.yml @@ -250,6 +250,7 @@ rules: - get - list - watch + - create # Deprecated in v1.0.0. - apiGroups: - clusterinformation.antrea.tanzu.vmware.com diff --git a/pkg/agent/cniserver/ipam/antrea_ipam_controller.go b/pkg/agent/cniserver/ipam/antrea_ipam_controller.go index 62bdf887b79..2dd05c8ed0b 100644 --- a/pkg/agent/cniserver/ipam/antrea_ipam_controller.go +++ b/pkg/agent/cniserver/ipam/antrea_ipam_controller.go @@ -32,6 +32,7 @@ import ( "antrea.io/antrea/pkg/client/informers/externalversions" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2" crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" + annotation "antrea.io/antrea/pkg/ipam" "antrea.io/antrea/pkg/ipam/poolallocator" "antrea.io/antrea/pkg/util/k8s" ) @@ -149,11 +150,11 @@ func (c *AntreaIPAMController) getIPPoolsByPod(namespace, name string) ([]string return nil, nil, nil, err } // Collect specified IPs if exist - ipStrings, _ := pod.Annotations[AntreaIPAMPodIPAnnotationKey] + ipStrings, _ := pod.Annotations[annotation.AntreaIPAMPodIPAnnotationKey] ipStrings = strings.ReplaceAll(ipStrings, " ", "") var ipErr error if ipStrings != "" { - splittedIPStrings := strings.Split(ipStrings, AntreaIPAMAnnotationDelimiter) + splittedIPStrings := strings.Split(ipStrings, annotation.AntreaIPAMAnnotationDelimiter) for _, ipString := range splittedIPStrings { ip := net.ParseIP(ipString) if ipString != "" && ip == nil { @@ -187,9 +188,9 @@ ownerReferenceLoop: } } - annotations, exists := pod.Annotations[AntreaIPAMAnnotationKey] + annotations, exists := pod.Annotations[annotation.AntreaIPAMAnnotationKey] if exists { - return strings.Split(annotations, AntreaIPAMAnnotationDelimiter), ips, reservedOwner, ipErr + return strings.Split(annotations, annotation.AntreaIPAMAnnotationDelimiter), ips, reservedOwner, ipErr } // Find IPPool by Namespace @@ -197,11 +198,11 @@ ownerReferenceLoop: if err != nil { return nil, nil, nil, nil } - annotations, exists = ns.Annotations[AntreaIPAMAnnotationKey] + annotations, exists = ns.Annotations[annotation.AntreaIPAMAnnotationKey] if !exists { return nil, nil, nil, nil } - return strings.Split(annotations, AntreaIPAMAnnotationDelimiter), ips, reservedOwner, ipErr + return strings.Split(annotations, annotation.AntreaIPAMAnnotationDelimiter), ips, reservedOwner, ipErr } func (c *AntreaIPAMController) getPoolAllocatorByPod(namespace, podName string) (*poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) { diff --git a/pkg/agent/cniserver/ipam/antrea_ipam_test.go b/pkg/agent/cniserver/ipam/antrea_ipam_test.go index 7e6b8acf41f..1b760d5a421 100644 --- a/pkg/agent/cniserver/ipam/antrea_ipam_test.go +++ b/pkg/agent/cniserver/ipam/antrea_ipam_test.go @@ -39,6 +39,7 @@ import ( argtypes "antrea.io/antrea/pkg/agent/cniserver/types" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + annotations "antrea.io/antrea/pkg/ipam" fakepoolclient "antrea.io/antrea/pkg/ipam/poolallocator/testing" ) @@ -153,13 +154,13 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: testApple, - Annotations: map[string]string{AntreaIPAMAnnotationKey: testApple, "junk": "garbage"}, + Annotations: map[string]string{annotations.AntreaIPAMAnnotationKey: testApple, "junk": "garbage"}, }, }, &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: testOrange, - Annotations: map[string]string{"junk": "garbage", AntreaIPAMAnnotationKey: testOrange}, + Annotations: map[string]string{"junk": "garbage", annotations.AntreaIPAMAnnotationKey: testOrange}, }, }, &corev1.Namespace{ @@ -171,7 +172,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: testJunkAnnotation, - Annotations: map[string]string{AntreaIPAMAnnotationKey: testJunkAnnotation}, + Annotations: map[string]string{annotations.AntreaIPAMAnnotationKey: testJunkAnnotation}, }, }, &corev1.Namespace{ @@ -219,7 +220,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { ObjectMeta: metav1.ObjectMeta{ Name: "pear1", Namespace: testPear, - Annotations: map[string]string{"junk": "garbage", AntreaIPAMAnnotationKey: testPear}, + Annotations: map[string]string{"junk": "garbage", annotations.AntreaIPAMAnnotationKey: testPear}, }, Spec: corev1.PodSpec{NodeName: "fakeNode"}, }, @@ -227,7 +228,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { ObjectMeta: metav1.ObjectMeta{ Name: "pear2", Namespace: testPear, - Annotations: map[string]string{"junk": "garbage", AntreaIPAMAnnotationKey: testPear, AntreaIPAMPodIPAnnotationKey: " "}, + Annotations: map[string]string{"junk": "garbage", annotations.AntreaIPAMAnnotationKey: testPear, annotations.AntreaIPAMPodIPAnnotationKey: " "}, }, Spec: corev1.PodSpec{NodeName: "fakeNode"}, }, @@ -235,7 +236,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { ObjectMeta: metav1.ObjectMeta{ Name: "pear3", Namespace: testPear, - Annotations: map[string]string{"junk": "garbage", AntreaIPAMAnnotationKey: testPear, AntreaIPAMPodIPAnnotationKey: "10.2.3.199"}, + Annotations: map[string]string{"junk": "garbage", annotations.AntreaIPAMAnnotationKey: testPear, annotations.AntreaIPAMPodIPAnnotationKey: "10.2.3.199"}, }, Spec: corev1.PodSpec{NodeName: "fakeNode"}, }, @@ -244,7 +245,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { // conflict Name: "pear4", Namespace: testPear, - Annotations: map[string]string{"junk": "garbage", AntreaIPAMAnnotationKey: testPear, AntreaIPAMPodIPAnnotationKey: "10.2.3.199"}, + Annotations: map[string]string{"junk": "garbage", annotations.AntreaIPAMAnnotationKey: testPear, annotations.AntreaIPAMPodIPAnnotationKey: "10.2.3.199"}, }, Spec: corev1.PodSpec{NodeName: "fakeNode"}, }, @@ -253,7 +254,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { // out of range Name: "pear5", Namespace: testPear, - Annotations: map[string]string{"junk": "garbage", AntreaIPAMAnnotationKey: testPear, AntreaIPAMPodIPAnnotationKey: "10.2.4.199"}, + Annotations: map[string]string{"junk": "garbage", annotations.AntreaIPAMAnnotationKey: testPear, annotations.AntreaIPAMPodIPAnnotationKey: "10.2.4.199"}, }, Spec: corev1.PodSpec{NodeName: "fakeNode"}, }, @@ -262,7 +263,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { // invalid IP Name: "pear6", Namespace: testPear, - Annotations: map[string]string{"junk": "garbage", AntreaIPAMAnnotationKey: testPear, AntreaIPAMPodIPAnnotationKey: "junk"}, + Annotations: map[string]string{"junk": "garbage", annotations.AntreaIPAMAnnotationKey: testPear, annotations.AntreaIPAMPodIPAnnotationKey: "junk"}, }, Spec: corev1.PodSpec{NodeName: "fakeNode"}, }, @@ -271,7 +272,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { // invalid IPPool Name: "pear7", Namespace: testPear, - Annotations: map[string]string{"junk": "garbage", AntreaIPAMAnnotationKey: testJunkAnnotation}, + Annotations: map[string]string{"junk": "garbage", annotations.AntreaIPAMAnnotationKey: testJunkAnnotation}, }, Spec: corev1.PodSpec{NodeName: "fakeNode"}, }, @@ -279,7 +280,7 @@ func initTestClients() (*fake.Clientset, *fakepoolclient.IPPoolClientset) { ObjectMeta: metav1.ObjectMeta{ Name: "pear-sts-8", Namespace: testPear, - Annotations: map[string]string{AntreaIPAMAnnotationKey: testPear}, + Annotations: map[string]string{annotations.AntreaIPAMAnnotationKey: testPear}, OwnerReferences: []metav1.OwnerReference{{Controller: &bTrue, Kind: "StatefulSet"}}, }, Spec: corev1.PodSpec{NodeName: "fakeNode"}, diff --git a/pkg/controller/ipam/antrea_ipam_controller.go b/pkg/controller/ipam/antrea_ipam_controller.go index 59f38d0f875..1467087c1f2 100644 --- a/pkg/controller/ipam/antrea_ipam_controller.go +++ b/pkg/controller/ipam/antrea_ipam_controller.go @@ -19,6 +19,8 @@ package ipam import ( "fmt" + "strings" + "sync" "time" appsv1 "k8s.io/api/apps/v1" @@ -27,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" appsinformers "k8s.io/client-go/informers/apps/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -36,6 +39,7 @@ import ( "antrea.io/antrea/pkg/client/informers/externalversions" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2" crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" + annotation "antrea.io/antrea/pkg/ipam" "antrea.io/antrea/pkg/ipam/poolallocator" "antrea.io/antrea/pkg/util/k8s" ) @@ -43,9 +47,8 @@ import ( const ( controllerName = "AntreaIPAMController" - // TODO: currently these constants are duplicated, move to a shared place - AntreaIPAMAnnotationKey = "ipam.antrea.io/ippools" - AntreaIPAMAnnotationDelimiter = "," + addEventIndication = "a" + delEventIndication = "d" // StatefulSet index name for IPPool cache. statefulSetIndex = "statefulSet" @@ -63,8 +66,14 @@ type AntreaIPAMController struct { // crdClient is the clientset for CRD API group. crdClient versioned.Interface - // Pool cleanup events triggered by StatefulSet delete - statefulSetCleanupQueue workqueue.RateLimitingInterface + // Pool cleanup events triggered by StatefulSet add/delete + statefulSetQueue workqueue.RateLimitingInterface + // StatefulSet objects would be stored here until add event is processed + statefulSetMap sync.Map + + // follow changes for Namespace objects + namespaceLister corelisters.NamespaceLister + namespaceListerSynced cache.InformerSynced // follow changes for StatefulSet objects statefulSetInformer appsinformers.StatefulSetInformer @@ -97,11 +106,16 @@ func NewAntreaIPAMController(crdClient versioned.Interface, ipPoolInformer := crdInformerFactory.Crd().V1alpha2().IPPools() ipPoolInformer.Informer().AddIndexers(cache.Indexers{statefulSetIndex: statefulSetIndexFunc}) + namespaceInformer := informerFactory.Core().V1().Namespaces() + statefulSetInformer := informerFactory.Apps().V1().StatefulSets() c := &AntreaIPAMController{ crdClient: crdClient, - statefulSetCleanupQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "statefulSetCleanup"), + statefulSetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "statefulSetPreallocationAndCleanup"), + statefulSetMap: sync.Map{}, + namespaceLister: namespaceInformer.Lister(), + namespaceListerSynced: namespaceInformer.Informer().HasSynced, statefulSetInformer: statefulSetInformer, statefulSetListerSynced: statefulSetInformer.Informer().HasSynced, ipPoolInformer: ipPoolInformer, @@ -113,6 +127,7 @@ func NewAntreaIPAMController(crdClient versioned.Interface, klog.V(2).InfoS("Subscribing for StatefulSet notifications", "controller", controllerName) statefulSetInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueStatefulSetCreateEvent, DeleteFunc: c.enqueueStatefulSetDeleteEvent, }, ) @@ -120,13 +135,24 @@ func NewAntreaIPAMController(crdClient versioned.Interface, return c } +// Enqueue the StatefulSet create notification to be processed by the worker +func (c *AntreaIPAMController) enqueueStatefulSetCreateEvent(obj interface{}) { + ss := obj.(*appsv1.StatefulSet) + klog.V(2).InfoS("Create notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name) + + key := k8s.NamespacedName(ss.Namespace, ss.Name) + + c.statefulSetMap.Store(key, ss) + c.statefulSetQueue.Add(addEventIndication + key) +} + // Enqueue the StatefulSet delete notification to be processed by the worker func (c *AntreaIPAMController) enqueueStatefulSetDeleteEvent(obj interface{}) { ss := obj.(*appsv1.StatefulSet) - klog.V(2).InfoS("Delete notification", "StatefulSet", ss.Name) + klog.V(2).InfoS("Delete notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name) key := k8s.NamespacedName(ss.Namespace, ss.Name) - c.statefulSetCleanupQueue.Add(key) + c.statefulSetQueue.Add(delEventIndication + key) } // Inspect all IPPools for stale IP Address entries. @@ -150,7 +176,7 @@ func (c *AntreaIPAMController) cleanupStaleStatefulSets() { if _, ok := statefulSetMap[key]; !ok { // This entry refers to StatefulSet that no longer exists klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "StatefulSet", key) - c.statefulSetCleanupQueue.Add(key) + c.statefulSetQueue.Add(delEventIndication + key) // Mark this entry in map to ensure cleanup is enqueued only once statefulSetMap[key] = true } @@ -159,9 +185,8 @@ func (c *AntreaIPAMController) cleanupStaleStatefulSets() { } } -// Look for an IP Pool associated with this StatefulSet, either a dedicated one or -// annotated to the namespace. If IPPool is found, this routine will clear all IP -// addresses that might be reserved for the pool. +// Look for an IP Pool associated with this StatefulSet. +// If IPPool is found, this routine will clear all addresses that might be reserved for the pool. func (c *AntreaIPAMController) cleanIPPoolForStatefulSet(namespacedName string) error { klog.InfoS("Processing delete notification", "StatefulSet", namespacedName) ipPools, _ := c.ipPoolInformer.Informer().GetIndexer().ByIndex(statefulSetIndex, namespacedName) @@ -187,40 +212,119 @@ func (c *AntreaIPAMController) cleanIPPoolForStatefulSet(namespacedName string) return nil } +// Find IP Pools annotated to StatefulSet via direct annotation or namespace annotation +func (c *AntreaIPAMController) getIPPoolsForStatefulSet(ss *appsv1.StatefulSet) []string { + + // Inspect pool annotation for the Pods + // In order to avoid extra API call in IPAM driver, IPAM annotations are defined + // on Pods rather than on StatefulSet + annotations, exists := ss.Spec.Template.Annotations[annotation.AntreaIPAMAnnotationKey] + if exists { + // Stateful Set Pod is annotated with dedicated IP pool + return strings.Split(annotations, annotation.AntreaIPAMAnnotationDelimiter) + } + + // Inspect Namespace + namespace, err := c.namespaceLister.Get(ss.Namespace) + if err != nil { + // Should never happen + klog.Errorf("Namespace %s not found for StatefulSet %s", ss.Namespace, ss.Name) + return nil + } + + annotations, exists = namespace.Annotations[annotation.AntreaIPAMAnnotationKey] + if exists { + return strings.Split(annotations, annotation.AntreaIPAMAnnotationDelimiter) + } + + return nil + +} + +// Look for an IP Pool associated with this StatefulSet, either a dedicated one or +// annotated to the namespace. If such IP Pool is found, preallocate IPs for the StatefulSet. +// This function returns error if pool is not found, or allocation fails. +func (c *AntreaIPAMController) preallocateIPPoolForStatefulSet(ss *appsv1.StatefulSet) error { + klog.InfoS("Processing create notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name) + + ipPools := c.getIPPoolsForStatefulSet(ss) + + if ipPools == nil { + // nothing to preallocate + return nil + } + + // Only one pool is supported for now. Dual stack support coming in future. + ipPoolName := ipPools[0] + allocator, err := poolallocator.NewIPPoolAllocator(ipPoolName, c.crdClient, c.ipPoolLister) + if err != nil { + return fmt.Errorf("Failed to find IP Pool %s: %s", ipPoolName, err) + } + + size := int(*ss.Spec.Replicas) + err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size) + if err != nil { + return fmt.Errorf("Failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err) + } + + return nil +} func (c *AntreaIPAMController) statefulSetWorker() { for c.processNextStatefulSetWorkItem() { } } func (c *AntreaIPAMController) processNextStatefulSetWorkItem() bool { - key, quit := c.statefulSetCleanupQueue.Get() + key, quit := c.statefulSetQueue.Get() if quit { return false } - defer c.statefulSetCleanupQueue.Done(key) - err := c.cleanIPPoolForStatefulSet(key.(string)) + defer c.statefulSetQueue.Done(key) - if err != nil { - // Put the item back on the workqueue to handle any transient errors. - c.statefulSetCleanupQueue.AddRateLimited(key) - klog.ErrorS(err, "Failed to clean IP Pool for StatefulSet", "StatefulSet", key) - return true + namespacedName := key.(string)[1:] + + if key.(string)[:1] == delEventIndication { + err := c.cleanIPPoolForStatefulSet(namespacedName) + if err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.statefulSetQueue.AddRateLimited(key) + klog.ErrorS(err, "Failed to clean IP Pool", "StatefulSet", key) + return true + } + } else { + ss, ok := c.statefulSetMap.Load(namespacedName) + if !ok { + // Object not found in map - should never happen + klog.Errorf("Failed to locate StatefulSet %s", namespacedName) + c.statefulSetQueue.Forget(key) + return true + } + err := c.preallocateIPPoolForStatefulSet(ss.(*appsv1.StatefulSet)) + if err != nil { + // Preallocation is best effort - we do not retry even with transient errors, + // since we don't want to implement logic that would delay Pods while waiting for + // preallocation. + klog.ErrorS(err, "No IPs prealllocated") + c.statefulSetMap.Delete(key) + c.statefulSetQueue.Forget(key) + return true + } } - c.statefulSetCleanupQueue.Forget(key) + c.statefulSetQueue.Forget(key) return true } // Run begins watching and syncing of a AntreaIPAMController. func (c *AntreaIPAMController) Run(stopCh <-chan struct{}) { - defer c.statefulSetCleanupQueue.ShutDown() + defer c.statefulSetQueue.ShutDown() klog.InfoS("Starting", "controller", controllerName) defer klog.InfoS("Shutting down", "controller", controllerName) - cacheSyncs := []cache.InformerSynced{c.statefulSetListerSynced, c.ipPoolListerSynced} + cacheSyncs := []cache.InformerSynced{c.namespaceListerSynced, c.statefulSetListerSynced, c.ipPoolListerSynced} if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { return } diff --git a/pkg/controller/ipam/antrea_ipam_controller_test.go b/pkg/controller/ipam/antrea_ipam_controller_test.go index 1d6e993e7ad..a68bf0174fc 100644 --- a/pkg/controller/ipam/antrea_ipam_controller_test.go +++ b/pkg/controller/ipam/antrea_ipam_controller_test.go @@ -18,9 +18,11 @@ package ipam import ( + "context" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -33,62 +35,66 @@ import ( fakecrd "antrea.io/antrea/pkg/client/clientset/versioned/fake" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" listers "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" + annotation "antrea.io/antrea/pkg/ipam" "antrea.io/antrea/pkg/ipam/poolallocator" ) -var ( - testPool = "test-pool" - testWithPool = "test-pool" - testNoPool = "test-no-pool" - testStale = "test-stale" -) - -// StatefulSet objects are not defined here, since IPAM annotations -// are configured on Pods which belong to the StatefulSet via "app" label. -func initTestClients(pool *crdv1a2.IPPool) (*fake.Clientset, *fakecrd.Clientset) { - crdClient := fakecrd.NewSimpleClientset(pool) +func initTestObjects(annotateNamespace bool, annotateStatefulSet bool, replicas int32) (*corev1.Namespace, *crdv1a2.IPPool, *appsv1.StatefulSet) { + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: uuid.New().String(), + }, + } - k8sClient := fake.NewSimpleClientset( - &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: testWithPool, - Annotations: map[string]string{AntreaIPAMAnnotationKey: testPool}, - }, + subnetRange := crdv1a2.SubnetIPRange{ + IPRange: crdv1a2.IPRange{ + Start: "10.2.2.100", + End: "10.2.2.110", }, - &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: testNoPool, - }, + SubnetInfo: crdv1a2.SubnetInfo{ + Gateway: "10.2.2.1", + PrefixLength: 24, }, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testNoPool, - Namespace: testWithPool, - Labels: map[string]string{"app": testNoPool}, - }, + } + + pool := &crdv1a2.IPPool{ + ObjectMeta: metav1.ObjectMeta{Name: uuid.New().String()}, + Spec: crdv1a2.IPPoolSpec{ + IPRanges: []crdv1a2.SubnetIPRange{subnetRange}, }, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{AntreaIPAMAnnotationKey: testPool}, - Labels: map[string]string{"app": testWithPool}, - Namespace: testNoPool, - }, + } + + if annotateNamespace { + namespace.Annotations = map[string]string{annotation.AntreaIPAMAnnotationKey: pool.Name} + } + + statefulSet := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: uuid.New().String(), + Namespace: namespace.Name, }, - &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: testWithPool, - Namespace: testNoPool, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace.Name, + Annotations: map[string]string{annotation.AntreaIPAMAnnotationKey: pool.Name}, + }, }, + Replicas: &replicas, }, - ) + } - return k8sClient, crdClient + if annotateStatefulSet { + statefulSet.Spec.Template.Annotations = map[string]string{annotation.AntreaIPAMAnnotationKey: pool.Name} + } + + return namespace, pool, statefulSet } -func verifyPoolAllocatedSize(t *testing.T, poolLister listers.IPPoolLister, size int) { +func verifyPoolAllocatedSize(t *testing.T, poolName string, poolLister listers.IPPoolLister, size int) { err := wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (bool, error) { - pool, err := poolLister.Get(testPool) + pool, err := poolLister.Get(poolName) if err != nil { return false, nil } @@ -102,33 +108,13 @@ func verifyPoolAllocatedSize(t *testing.T, poolLister listers.IPPoolLister, size require.NoError(t, err) } -// This test verifies release of reserved IPs for dedicated IP pool annotation, -// as well as namespace-based IP Pool annotation. -func TestReleaseStatefulSet(t *testing.T) { +func testStatefulSetLifecycle(t *testing.T, dedicatedPool bool, replicas int32) { stopCh := make(chan struct{}) defer close(stopCh) - ipRange := crdv1a2.IPRange{ - Start: "10.2.2.100", - End: "10.2.2.200", - } - - subnetInfo := crdv1a2.SubnetInfo{ - Gateway: "10.2.2.1", - PrefixLength: 24, - } - - subnetRange := crdv1a2.SubnetIPRange{IPRange: ipRange, - SubnetInfo: subnetInfo} - - pool := crdv1a2.IPPool{ - ObjectMeta: metav1.ObjectMeta{Name: testPool}, - Spec: crdv1a2.IPPoolSpec{ - IPRanges: []crdv1a2.SubnetIPRange{subnetRange}, - }, - } - - k8sClient, crdClient := initTestClients(&pool) + namespace, pool, statefulSet := initTestObjects(!dedicatedPool, dedicatedPool, replicas) + crdClient := fakecrd.NewSimpleClientset(pool) + k8sClient := fake.NewSimpleClientset(namespace, statefulSet) informerFactory := informers.NewSharedInformerFactory(k8sClient, 0) @@ -147,45 +133,44 @@ func TestReleaseStatefulSet(t *testing.T) { var err error // Wait until pool propagates to the informer pollErr := wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (bool, error) { - allocator, err = poolallocator.NewIPPoolAllocator(testPool, crdClient, poolLister) + allocator, err = poolallocator.NewIPPoolAllocator(pool.Name, crdClient, poolLister) if err != nil { return false, nil } return true, nil }) require.NoError(t, pollErr) + defer allocator.ReleaseStatefulSet(statefulSet.Namespace, statefulSet.Name) + + if int(replicas) < allocator.Total() { + // Verify create event was handled by the controller and preallocation was succesfull + verifyPoolAllocatedSize(t, pool.Name, poolLister, int(replicas)) + } else { + // Not enough IPs in the pool - preallocation should fail + verifyPoolAllocatedSize(t, pool.Name, poolLister, 0) + } - verifyPoolAllocatedSize(t, poolLister, 0) - - // Allocate StatefulSet with dedicated IP Pool - err = allocator.AllocateStatefulSet(testNoPool, testWithPool, 5) - require.NoError(t, err, "Failed to reserve IPs for StatefulSet") - verifyPoolAllocatedSize(t, poolLister, 5) - - // Allocate StatefulSet with namespace-based IP Pool annotation - err = allocator.AllocateStatefulSet(testWithPool, testNoPool, 3) - require.NoError(t, err, "Failed to reserve IPs for StatefulSet") - verifyPoolAllocatedSize(t, poolLister, 8) + // Delete StatefulSet + k8sClient.AppsV1().StatefulSets(namespace.Name).Delete(context.TODO(), statefulSet.Name, metav1.DeleteOptions{}) - // Delete StatefulSet with namespace-based IP Pool annotation - controller.enqueueStatefulSetDeleteEvent(&appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: testNoPool, - Namespace: testWithPool, - }}) + // Verify Delete event was processed + verifyPoolAllocatedSize(t, pool.Name, poolLister, 0) +} - // Verify delete event was handled by the controller - verifyPoolAllocatedSize(t, poolLister, 5) +// This test verifies preallocation of IPs for dedicated IP pool annotation. +func TestStatefulSetLifecycle_DedicatedPool(t *testing.T) { + testStatefulSetLifecycle(t, true, 5) +} - // Delete StatefulSet with dedicated IP Pool - controller.enqueueStatefulSetDeleteEvent(&appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: testWithPool, - Namespace: testNoPool, - }}) +// This test verifies preallocation of IPs for IP pool annotation based on StatefulSet Namespace. +func TestStatefulSetLifecycle_NamespacePool(t *testing.T) { + testStatefulSetLifecycle(t, false, 7) +} - // Verify delete event was handled by the controller - verifyPoolAllocatedSize(t, poolLister, 0) +// This test verifies use case when continuous IP range can not be preallocated. +// However we don't expect error since preallocation is best-effort feature. +func TestStatefulSetLifecycle_NoPreallocation(t *testing.T) { + testStatefulSetLifecycle(t, false, 20) } // Test for cleanup on controller startup: stale addresses that belong no StatefulSet objects @@ -194,30 +179,16 @@ func TestReleaseStaleAddresses(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - ipRange := crdv1a2.IPRange{ - CIDR: "10.2.2.0/24", - } - - subnetInfo := crdv1a2.SubnetInfo{ - Gateway: "10.2.2.1", - PrefixLength: 24, - } - - subnetRange := crdv1a2.SubnetIPRange{IPRange: ipRange, - SubnetInfo: subnetInfo} + namespace, pool, statefulSet := initTestObjects(true, false, 7) - // pool includes two entries for non-existent StatefulSet at startup - // as well as one legit entry activeSetOwner := crdv1a2.StatefulSetOwner{ - Name: testWithPool, - Namespace: testNoPool, - Index: 0, + Name: uuid.New().String(), + Namespace: namespace.Name, } staleSetOwner := crdv1a2.StatefulSetOwner{ - Name: testStale, - Namespace: testNoPool, - Index: 0, + Name: statefulSet.Name, + Namespace: namespace.Name, } addresses := []crdv1a2.IPAddressState{ @@ -231,17 +202,13 @@ func TestReleaseStaleAddresses(t *testing.T) { Phase: crdv1a2.IPAddressPhasePreallocated, Owner: crdv1a2.IPAddressOwner{StatefulSet: &staleSetOwner}}, } - pool := crdv1a2.IPPool{ - ObjectMeta: metav1.ObjectMeta{Name: testPool}, - Spec: crdv1a2.IPPoolSpec{ - IPRanges: []crdv1a2.SubnetIPRange{subnetRange}, - }, - Status: crdv1a2.IPPoolStatus{ - IPAddresses: addresses, - }, + + pool.Status = crdv1a2.IPPoolStatus{ + IPAddresses: addresses, } - k8sClient, crdClient := initTestClients(&pool) + crdClient := fakecrd.NewSimpleClientset(pool) + k8sClient := fake.NewSimpleClientset(namespace) informerFactory := informers.NewSharedInformerFactory(k8sClient, 0) @@ -258,7 +225,7 @@ func TestReleaseStaleAddresses(t *testing.T) { // Wait until pool propagates to the informer pollErr := wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (bool, error) { - _, err := poolallocator.NewIPPoolAllocator(testPool, crdClient, poolLister) + _, err := poolallocator.NewIPPoolAllocator(pool.Name, crdClient, poolLister) if err != nil { return false, nil } @@ -267,5 +234,5 @@ func TestReleaseStaleAddresses(t *testing.T) { require.NoError(t, pollErr) // after cleanup pool should have single entry - verifyPoolAllocatedSize(t, poolLister, 1) + verifyPoolAllocatedSize(t, pool.Name, poolLister, 1) } diff --git a/pkg/agent/cniserver/ipam/annotations.go b/pkg/ipam/annotations.go similarity index 100% rename from pkg/agent/cniserver/ipam/annotations.go rename to pkg/ipam/annotations.go diff --git a/pkg/ipam/ipallocator/allocator_test.go b/pkg/ipam/ipallocator/allocator_test.go index 7d10d673883..7624e9ddf08 100644 --- a/pkg/ipam/ipallocator/allocator_test.go +++ b/pkg/ipam/ipallocator/allocator_test.go @@ -308,7 +308,8 @@ func TestAllocateRange(t *testing.T) { } } -func TestName(t *testing.T) { +func TestNameAndTotal(t *testing.T) { ma := MultiIPAllocator{newIPRangeAllocator("1.1.1.10", "1.1.1.20"), newCIDRAllocator("10.10.10.128/30", nil)} assert.Equal(t, []string{"1.1.1.10-1.1.1.20", "10.10.10.128/30"}, ma.Names()) + assert.Equal(t, 14, ma.Total()) } diff --git a/pkg/ipam/poolallocator/allocator.go b/pkg/ipam/poolallocator/allocator.go index 2ea90c39697..987cfda9caa 100644 --- a/pkg/ipam/poolallocator/allocator.go +++ b/pkg/ipam/poolallocator/allocator.go @@ -409,6 +409,14 @@ func (a *IPPoolAllocator) AllocateStatefulSet(namespace, name string, size int) return err } + // Make sure there is no double allocation for this StatefulSet + for _, ip := range ipPool.Status.IPAddresses { + if ip.Owner.StatefulSet != nil && ip.Owner.StatefulSet.Namespace == namespace && ip.Owner.StatefulSet.Name == name { + return fmt.Errorf("StatefulSet %s/%s is already present in IPPool %s", namespace, name, ipPool.Name) + + } + } + ips, err := allocators.AllocateRange(size) if err != nil { return err @@ -608,3 +616,11 @@ func (a *IPPoolAllocator) getReservedIP(reservedOwner v1alpha2.IPAddressOwner) ( } return nil, nil } + +func (a IPPoolAllocator) Total() int { + _, allocators, err := a.getPoolAndInitIPAllocators() + if err != nil { + return 0 + } + return allocators.Total() +} diff --git a/test/e2e/antreaipam_service_test.go b/test/e2e/antreaipam_service_test.go index a2f225f226b..8dea155e5c9 100644 --- a/test/e2e/antreaipam_service_test.go +++ b/test/e2e/antreaipam_service_test.go @@ -17,7 +17,7 @@ package e2e import ( "testing" - "antrea.io/antrea/pkg/agent/cniserver/ipam" + annotation "antrea.io/antrea/pkg/ipam" ) func TestAntreaIPAMService(t *testing.T) { @@ -36,7 +36,7 @@ func TestAntreaIPAMService(t *testing.T) { } defer deleteIPPoolWrapper(t, data, ippool.Name) annotations := map[string]string{} - annotations[ipam.AntreaIPAMAnnotationKey] = ippool.Name + annotations[annotation.AntreaIPAMAnnotationKey] = ippool.Name err = data.createNamespaceWithAnnotations(testAntreaIPAMNamespace, annotations) if err != nil { t.Fatalf("Creating AntreaIPAM Namespace failed, err=%+v", err) diff --git a/test/e2e/antreaipam_test.go b/test/e2e/antreaipam_test.go index 89515c3a04c..5d2a6021458 100644 --- a/test/e2e/antreaipam_test.go +++ b/test/e2e/antreaipam_test.go @@ -33,8 +33,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilnet "k8s.io/utils/net" - "antrea.io/antrea/pkg/agent/cniserver/ipam" crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" + annotation "antrea.io/antrea/pkg/ipam" ) var ( @@ -94,7 +94,7 @@ func TestAntreaIPAM(t *testing.T) { } defer deleteIPPoolWrapper(t, data, ippool.Name) annotations := map[string]string{} - annotations[ipam.AntreaIPAMAnnotationKey] = ippool.Name + annotations[annotation.AntreaIPAMAnnotationKey] = ippool.Name err = data.createNamespaceWithAnnotations(testAntreaIPAMNamespace, annotations) if err != nil { t.Fatalf("Creating AntreaIPAM Namespace failed, err=%+v", err) @@ -214,7 +214,7 @@ func testAntreaIPAMStatefulSet(t *testing.T, data *TestData, dedicatedIPPoolKey sts.Spec.Template.Annotations = map[string]string{} } if dedicatedIPPoolKey != nil { - sts.Spec.Template.Annotations[ipam.AntreaIPAMAnnotationKey] = ipPoolName + sts.Spec.Template.Annotations[annotation.AntreaIPAMAnnotationKey] = ipPoolName } } _, cleanup, err := data.createStatefulSet(stsName, testAntreaIPAMNamespace, int32(size), agnhostContainerName, agnhostImage, []string{"sleep", "3600"}, nil, mutateFunc) @@ -243,7 +243,7 @@ func testAntreaIPAMStatefulSet(t *testing.T, data *TestData, dedicatedIPPoolKey pod.Annotations = map[string]string{} } if dedicatedIPPoolKey != nil { - pod.Annotations[ipam.AntreaIPAMAnnotationKey] = ipPoolName + pod.Annotations[annotation.AntreaIPAMAnnotationKey] = ipPoolName } } podName := randName("test-standalone-pod-")