From eccd6b57ca12875c31c65b89b2f51c6aeccdd4bf Mon Sep 17 00:00:00 2001 From: Alex Zhang Date: Tue, 12 Jan 2021 15:39:03 +0800 Subject: [PATCH 1/3] feat: controller leader election --- cmd/ingress/ingress.go | 1 + conf/config-default.yaml | 17 +++-- pkg/config/config.go | 5 ++ pkg/config/config_test.go | 2 + pkg/ingress/controller/controller.go | 103 +++++++++++++++++++++++---- 5 files changed, 108 insertions(+), 20 deletions(-) diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go index 66a7748efa..dcaf894afd 100644 --- a/cmd/ingress/ingress.go +++ b/cmd/ingress/ingress.go @@ -126,6 +126,7 @@ the apisix cluster and others are created`, cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig, "kubeconfig", "", "Kubernetes configuration file (by default in-cluster configuration will be used)") cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration, "resync-interval", time.Minute, "the controller resync (with Kubernetes) interval, the minimum resync interval is 30s") cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, "app-namespace", []string{config.NamespaceAll}, "namespaces that controller will watch for resources") + cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for compaign the controller leader") cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api") cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api") diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 5e34911eb8..419338ad99 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -33,13 +33,16 @@ enable_profiling: true # enable profileing via web interfaces # Kubernetes related configurations. kubernetes: - kubeconfig: "" # the Kubernetes configuration file path, default is - # "", so the in-cluster configuration will be used. - resync_interval: "6h" # how long should apisix-ingress-controller - # re-synchronizes with Kubernetes, default is 6h, - # and the minimal resync interval is 30s. - app_namespaces: ["*"] # namespace list that controller will watch for resources, - # by default all namespaces (represented by "*") are watched. + kubeconfig: "" # the Kubernetes configuration file path, default is + # "", so the in-cluster configuration will be used. + resync_interval: "6h" # how long should apisix-ingress-controller + # re-synchronizes with Kubernetes, default is 6h, + # and the minimal resync interval is 30s. + app_namespaces: ["*"] # namespace list that controller will watch for resources, + # by default all namespaces (represented by "*") are watched. + election_id: "ingress-apisix-leader" # the election id for the controller leader compaign, + # only the leader will watch and delivery resource changes, + # other instances (as candidates) stand by. # APISIX related configurations. apisix: diff --git a/pkg/config/config.go b/pkg/config/config.go index 4a00383aaf..76279894b9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -30,6 +30,9 @@ import ( const ( // NamespaceAll represents all namespaces. NamespaceAll = "*" + // IngressAPISIXLeader is the default election id for the controller + // leader election. + IngressAPISIXLeader = "ingress-apisix-leader" _minimalResyncInterval = 30 * time.Second ) @@ -50,6 +53,7 @@ type KubernetesConfig struct { Kubeconfig string `json:"kubeconfig" yaml:"kubeconfig"` ResyncInterval types.TimeDuration `json:"resync_interval" yaml:"resync_interval"` AppNamespaces []string `json:"app_namespaces" yaml:"app_namespaces"` + ElectionID string `json:"election_id" yaml:"election_id"` } // APISIXConfig contains all APISIX related config items. @@ -71,6 +75,7 @@ func NewDefaultConfig() *Config { Kubeconfig: "", // Use in-cluster configurations. ResyncInterval: types.TimeDuration{Duration: 6 * time.Hour}, AppNamespaces: []string{v1.NamespaceAll}, + ElectionID: IngressAPISIXLeader, }, } } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index d220b385a4..808ed8889e 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -36,6 +36,7 @@ func TestNewConfigFromFile(t *testing.T) { ResyncInterval: types.TimeDuration{time.Hour}, Kubeconfig: "/path/to/foo/baz", AppNamespaces: []string{""}, + ElectionID: "my-election-id", }, APISIX: APISIXConfig{ BaseURL: "http://127.0.0.1:8080/apisix", @@ -72,6 +73,7 @@ enable_profiling: true kubernetes: kubeconfig: /path/to/foo/baz resync_interval: 1h0m0s + election_id: my-election-id apisix: base_url: http://127.0.0.1:8080/apisix admin_key: "123456" diff --git a/pkg/ingress/controller/controller.go b/pkg/ingress/controller/controller.go index 2114ec28cb..7775a595c3 100644 --- a/pkg/ingress/controller/controller.go +++ b/pkg/ingress/controller/controller.go @@ -15,8 +15,17 @@ package controller import ( + "context" "os" "sync" + "time" + + "go.uber.org/zap" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" v1 "k8s.io/api/core/v1" @@ -47,6 +56,9 @@ func recoverException() { // Controller is the ingress apisix controller object. type Controller struct { + name string + namespace string + cfg *config.Config wg sync.WaitGroup watchingNamespace map[string]struct{} apiServer *api.Server @@ -96,6 +108,9 @@ func NewController(cfg *config.Config) (*Controller, error) { } c := &Controller{ + name: podName, + namespace: podNamespace, + cfg: cfg, apiServer: apiSrv, metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace), clientset: kube.GetKubeClient(), @@ -115,30 +130,93 @@ func (c *Controller) goAttach(handler func()) { }() } +// Eventf implements the resourcelock.EventRecorder interface. +func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string, message string, _ ...interface{}) { + log.Infow(reason, zap.String("message", message), zap.String("event_type", eventType)) +} + // Run launches the controller. func (c *Controller) Run(stop chan struct{}) error { - // TODO leader election. + rootCtx, rootCancel := context.WithCancel(context.Background()) + defer rootCancel() + go func() { + <-stop + rootCancel() + }() + c.metricsCollector.ResetLeader(false) + + go func() { + if err := c.apiServer.Run(rootCtx.Done()); err != nil { + log.Errorf("failed to launch API Server: %s", err) + } + }() + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Namespace: c.namespace, + Name: c.cfg.Kubernetes.ElectionID, + }, + Client: c.clientset.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: c.name, + EventRecorder: c, + }, + } + cfg := leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 15 * time.Second, + RenewDeadline: 5 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: c.run, + OnNewLeader: func(identity string) { + log.Warnf("found a new leader %s", identity) + if identity != c.name { + log.Infof("controller now is running as a candidate") + } + }, + OnStoppedLeading: func() { + log.Info("controller now is running as a candidate") + c.metricsCollector.ResetLeader(false) + }, + }, + ReleaseOnCancel: true, + Name: "ingress-apisix", + } + + elector, err := leaderelection.NewLeaderElector(cfg) + if err != nil { + log.Errorf("failed to create leader elector: %s", err.Error()) + return err + } + +election: + elector.Run(rootCtx) + select { + case <-rootCtx.Done(): + return nil + default: + goto election + } +} + +func (c *Controller) run(ctx context.Context) { + log.Info("controller now is running as leader") c.metricsCollector.ResetLeader(true) - log.Info("controller run as leader") ac := &Api6Controller{ KubeClientSet: c.clientset, Api6ClientSet: c.crdClientset, SharedInformerFactory: c.crdInformerFactory, CoreSharedInformerFactory: kube.CoreSharedInformerFactory, - Stop: stop, + Stop: ctx.Done(), } epInformer := ac.CoreSharedInformerFactory.Core().V1().Endpoints() kube.EndpointsInformer = epInformer // endpoint ac.Endpoint(c) c.goAttach(func() { - ac.CoreSharedInformerFactory.Start(stop) - }) - c.goAttach(func() { - if err := c.apiServer.Run(stop); err != nil { - log.Errorf("failed to launch API Server: %s", err) - } + ac.CoreSharedInformerFactory.Start(ctx.Done()) }) // ApisixRoute @@ -151,12 +229,11 @@ func (c *Controller) Run(stop chan struct{}) error { ac.ApisixTLS(c) c.goAttach(func() { - ac.SharedInformerFactory.Start(stop) + ac.SharedInformerFactory.Start(ctx.Done()) }) - <-stop + <-ctx.Done() c.wg.Wait() - return nil } // namespaceWatching accepts a resource key, getting the namespace part @@ -182,7 +259,7 @@ type Api6Controller struct { Api6ClientSet clientSet.Interface SharedInformerFactory externalversions.SharedInformerFactory CoreSharedInformerFactory informers.SharedInformerFactory - Stop chan struct{} + Stop <-chan struct{} } func (api6 *Api6Controller) ApisixRoute(controller *Controller) { From 86972eea51fbb4161de73c776ed41fd7d1130ec9 Mon Sep 17 00:00:00 2001 From: Alex Zhang Date: Tue, 12 Jan 2021 19:49:44 +0800 Subject: [PATCH 2/3] test: add e2e cases --- .../ingress-apisix/templates/deployment.yaml | 9 +++ pkg/ingress/controller/controller.go | 36 ++++++------ .../deploy/deployment/ingress-controller.yaml | 9 +++ test/e2e/ingress/sanity.go | 56 +++++++++++++++++++ test/e2e/scaffold/ingress.go | 37 +++++++++++- test/e2e/scaffold/scaffold.go | 14 +++++ 6 files changed, 143 insertions(+), 18 deletions(-) diff --git a/charts/ingress-apisix/templates/deployment.yaml b/charts/ingress-apisix/templates/deployment.yaml index b96e811a7f..b103ebdc0c 100644 --- a/charts/ingress-apisix/templates/deployment.yaml +++ b/charts/ingress-apisix/templates/deployment.yaml @@ -70,6 +70,15 @@ spec: volumeMounts: - mountPath: /ingress-apisix/conf name: configuration + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name {{- with .Values.ingressController.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/pkg/ingress/controller/controller.go b/pkg/ingress/controller/controller.go index 7775a595c3..14fef47245 100644 --- a/pkg/ingress/controller/controller.go +++ b/pkg/ingress/controller/controller.go @@ -20,26 +20,21 @@ import ( "sync" "time" - "go.uber.org/zap" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - - v1 "k8s.io/api/core/v1" - - "k8s.io/client-go/tools/cache" - - "github.com/api7/ingress-controller/pkg/apisix" - clientSet "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned" crdclientset "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned" "github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions" + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "github.com/api7/ingress-controller/pkg/api" + "github.com/api7/ingress-controller/pkg/apisix" "github.com/api7/ingress-controller/pkg/config" "github.com/api7/ingress-controller/pkg/kube" "github.com/api7/ingress-controller/pkg/log" @@ -172,11 +167,17 @@ func (c *Controller) Run(stop chan struct{}) error { OnNewLeader: func(identity string) { log.Warnf("found a new leader %s", identity) if identity != c.name { - log.Infof("controller now is running as a candidate") + log.Infow("controller now is running as a candidate", + zap.String("namespace", c.namespace), + zap.String("pod", c.name), + ) } }, OnStoppedLeading: func() { - log.Info("controller now is running as a candidate") + log.Infow("controller now is running as a candidate", + zap.String("namespace", c.namespace), + zap.String("pod", c.name), + ) c.metricsCollector.ResetLeader(false) }, }, @@ -201,7 +202,10 @@ election: } func (c *Controller) run(ctx context.Context) { - log.Info("controller now is running as leader") + log.Infow("controller now is running as leader", + zap.String("namespace", c.namespace), + zap.String("pod", c.name), + ) c.metricsCollector.ResetLeader(true) ac := &Api6Controller{ diff --git a/samples/deploy/deployment/ingress-controller.yaml b/samples/deploy/deployment/ingress-controller.yaml index 98267f9183..eae43e1d79 100644 --- a/samples/deploy/deployment/ingress-controller.yaml +++ b/samples/deploy/deployment/ingress-controller.yaml @@ -50,6 +50,15 @@ spec: - mountPath: /ingress-apisix/conf/config.yaml name: apisix-ingress-configmap subPath: config.yaml + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name volumes: - configMap: name: apisix-ingress-cm diff --git a/test/e2e/ingress/sanity.go b/test/e2e/ingress/sanity.go index 5b2c5cdeda..40cac87ee2 100644 --- a/test/e2e/ingress/sanity.go +++ b/test/e2e/ingress/sanity.go @@ -17,6 +17,7 @@ package ingress import ( "encoding/json" "net/http" + "time" "github.com/api7/ingress-controller/test/e2e/scaffold" "github.com/onsi/ginkgo" @@ -103,3 +104,58 @@ var _ = ginkgo.Describe("double-routes", func() { // We don't care the json data, only make sure it's a normal json string. }) }) + +var _ = ginkgo.Describe("leader election", func() { + s := scaffold.NewScaffold(&scaffold.Options{ + Name: "leaderelection", + Kubeconfig: scaffold.GetKubeconfig(), + APISIXConfigPath: "testdata/apisix-gw-config.yaml", + APISIXDefaultConfigPath: "testdata/apisix-gw-config-default.yaml", + IngressAPISIXReplicas: 2, + }) + ginkgo.It("lease check", func() { + pods, err := s.GetIngressPodDetails() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), pods, 2) + lease, err := s.GetLeaderLease() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Equal(ginkgo.GinkgoT(), *lease.Spec.LeaseDurationSeconds, int32(15)) + if *lease.Spec.HolderIdentity != pods[0].Name && *lease.Spec.HolderIdentity != pods[1].Name { + assert.Fail(ginkgo.GinkgoT(), "bad leader lease holder identity") + } + }) + + ginkgo.It("leader failover", func() { + // Wait the leader election to complete. + time.Sleep(2 * time.Second) + pods, err := s.GetIngressPodDetails() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), pods, 2) + + lease, err := s.GetLeaderLease() + assert.Nil(ginkgo.GinkgoT(), err) + + leaderIdx := 0 + if *lease.Spec.HolderIdentity == pods[1].Name { + leaderIdx = 1 + } + ginkgo.GinkgoT().Logf("lease is %s", *lease.Spec.HolderIdentity) + assert.Nil(ginkgo.GinkgoT(), s.KillPod(pods[leaderIdx].Name)) + time.Sleep(25 * time.Second) + + newLease, err := s.GetLeaderLease() + assert.Nil(ginkgo.GinkgoT(), err) + + newPods, err := s.GetIngressPodDetails() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), pods, 2) + + assert.NotEqual(ginkgo.GinkgoT(), *newLease.Spec.HolderIdentity, *lease.Spec.HolderIdentity) + assert.Greater(ginkgo.GinkgoT(), *newLease.Spec.LeaseTransitions, *lease.Spec.LeaseTransitions) + + if *newLease.Spec.HolderIdentity != newPods[0].Name && *newLease.Spec.HolderIdentity != newPods[1].Name { + assert.Failf(ginkgo.GinkgoT(), "bad leader lease holder identity: %s, should be %s or %s", + *newLease.Spec.HolderIdentity, newPods[0].Name, newPods[1].Name) + } + }) +}) diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go index 9a6de565e8..5cb4d35f4c 100644 --- a/test/e2e/scaffold/ingress.go +++ b/test/e2e/scaffold/ingress.go @@ -15,11 +15,14 @@ package scaffold import ( + "context" "fmt" "github.com/gruntwork-io/terratest/modules/k8s" "github.com/onsi/ginkgo" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -45,7 +48,7 @@ kind: Deployment metadata: name: ingress-apisix-controller-deployment-e2e-test spec: - replicas: 1 + replicas: %d selector: matchLabels: app: ingress-apisix-controller-deployment-e2e-test @@ -77,6 +80,15 @@ spec: tcpSocket: port: 8080 timeoutSeconds: 2 + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name image: "apache/apisix-ingress-controller:dev" imagePullPolicy: Never name: ingress-apisix-controller-deployment-e2e-test @@ -102,7 +114,7 @@ spec: ) func (s *Scaffold) newIngressAPISIXController() error { - ingressAPISIXDeployment := fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.namespace) + ingressAPISIXDeployment := fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.opts.IngressAPISIXReplicas, s.namespace) if err := k8s.CreateServiceAccountE(s.t, s.kubectlOptions, _serviceAccount); err != nil { return err } @@ -144,3 +156,24 @@ func (s *Scaffold) waitAllIngressControllerPodsAvailable() error { } return waitExponentialBackoff(condFunc) } + +// GetLeaderLease returns the Lease resource. +func (s *Scaffold) GetLeaderLease() (*coordinationv1.Lease, error) { + cli, err := k8s.GetKubernetesClientE(s.t) + if err != nil { + return nil, err + } + lease, err := cli.CoordinationV1().Leases(s.namespace).Get(context.TODO(), "ingress-apisix-leader", metav1.GetOptions{}) + if err != nil { + return nil, err + } + return lease, nil +} + +// GetIngressPodDetails returns a batch of pod description +// about apisix-ingress-controller. +func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) { + return k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{ + LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test", + }) +} diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index fdd19981d0..858686049b 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -15,6 +15,7 @@ package scaffold import ( + "context" "fmt" "io/ioutil" "net/http" @@ -26,6 +27,8 @@ import ( "text/template" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/gavv/httpexpect/v2" "github.com/gruntwork-io/terratest/modules/k8s" "github.com/gruntwork-io/terratest/modules/testing" @@ -41,6 +44,7 @@ type Options struct { Kubeconfig string APISIXConfigPath string APISIXDefaultConfigPath string + IngressAPISIXReplicas int } type Scaffold struct { @@ -101,10 +105,20 @@ func NewDefaultScaffold() *Scaffold { Kubeconfig: GetKubeconfig(), APISIXConfigPath: "testdata/apisix-gw-config.yaml", APISIXDefaultConfigPath: "testdata/apisix-gw-config-default.yaml", + IngressAPISIXReplicas: 1, } return NewScaffold(opts) } +// KillPod kill the pod which name is podName. +func (s *Scaffold) KillPod(podName string) error { + cli, err := k8s.GetKubernetesClientE(s.t) + if err != nil { + return err + } + return cli.CoreV1().Pods(s.namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) +} + // DefaultHTTPBackend returns the service name and service ports // of the default http backend. func (s *Scaffold) DefaultHTTPBackend() (string, []int32) { From 2cb331a71221fa607e12eca98665922bf047603f Mon Sep 17 00:00:00 2001 From: Alex Zhang Date: Wed, 13 Jan 2021 21:13:55 +0800 Subject: [PATCH 3/3] chore: optimize e2e cases --- test/e2e/ingress/namespace.go | 5 +++++ test/e2e/ingress/resourcepushing.go | 14 ++++++++----- test/e2e/ingress/sanity.go | 18 +++++++++++----- test/e2e/scaffold/httpbin.go | 32 +++++++++++++++++++++++++++++ test/e2e/scaffold/ingress.go | 20 ++++++++++++++---- test/e2e/scaffold/scaffold.go | 15 +++++++------- 6 files changed, 83 insertions(+), 21 deletions(-) diff --git a/test/e2e/ingress/namespace.go b/test/e2e/ingress/namespace.go index 23601ffa56..8cc654b0af 100644 --- a/test/e2e/ingress/namespace.go +++ b/test/e2e/ingress/namespace.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "net/http" + "time" "github.com/api7/ingress-controller/test/e2e/scaffold" "github.com/onsi/ginkgo" @@ -49,6 +50,10 @@ spec: assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes") assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams") + // TODO When ingress controller can feedback the lifecycle of CRDs to the + // status field, we can poll it rather than sleeping. + time.Sleep(3 * time.Second) + body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw() var placeholder ip err := json.Unmarshal([]byte(body), &placeholder) diff --git a/test/e2e/ingress/resourcepushing.go b/test/e2e/ingress/resourcepushing.go index 2ffba3f78a..70490ff042 100644 --- a/test/e2e/ingress/resourcepushing.go +++ b/test/e2e/ingress/resourcepushing.go @@ -49,10 +49,11 @@ spec: assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes") err = s.EnsureNumApisixUpstreamsCreated(1) assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams") - scale := 2 - err = s.ScaleHTTPBIN(scale) - assert.Nil(ginkgo.GinkgoT(), err) - time.Sleep(5 * time.Second) // wait for ingress to sync + assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(2), "scaling number of httpbin instancess") + assert.Nil(ginkgo.GinkgoT(), s.WaitAllHTTPBINPoddsAvailable(), "waiting for all httpbin pods ready") + // TODO When ingress controller can feedback the lifecycle of CRDs to the + // status field, we can poll it rather than sleeping. + time.Sleep(5 * time.Second) ups, err := s.ListApisixUpstreams() assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error") assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2, "upstreams nodes not expect") @@ -84,7 +85,10 @@ spec: // remove assert.Nil(ginkgo.GinkgoT(), s.RemoveResourceByString(apisixRoute)) - time.Sleep(10 * time.Second) // wait for ingress to sync + + // TODO When ingress controller can feedback the lifecycle of CRDs to the + // status field, we can poll it rather than sleeping. + time.Sleep(10 * time.Second) ups, err := s.ListApisixUpstreams() assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error") assert.Len(ginkgo.GinkgoT(), ups, 0, "upstreams nodes not expect") diff --git a/test/e2e/ingress/sanity.go b/test/e2e/ingress/sanity.go index 40cac87ee2..fc43fc6950 100644 --- a/test/e2e/ingress/sanity.go +++ b/test/e2e/ingress/sanity.go @@ -52,6 +52,11 @@ var _ = ginkgo.Describe("single-route", func() { assert.Nil(ginkgo.GinkgoT(), err, "checking number of routes") err = s.EnsureNumApisixUpstreamsCreated(1) assert.Nil(ginkgo.GinkgoT(), err, "checking number of upstreams") + + // TODO When ingress controller can feedback the lifecycle of CRDs to the + // status field, we can poll it rather than sleeping. + time.Sleep(3 * time.Second) + body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw() var placeholder ip err = json.Unmarshal([]byte(body), &placeholder) @@ -92,6 +97,9 @@ var _ = ginkgo.Describe("double-routes", func() { assert.Nil(ginkgo.GinkgoT(), err, "checking number of routes") err = s.EnsureNumApisixUpstreamsCreated(1) assert.Nil(ginkgo.GinkgoT(), err, "checking number of upstreams") + // TODO When ingress controller can feedback the lifecycle of CRDs to the + // status field, we can poll it rather than sleeping. + time.Sleep(3 * time.Second) body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw() var placeholder ip err = json.Unmarshal([]byte(body), &placeholder) @@ -117,7 +125,7 @@ var _ = ginkgo.Describe("leader election", func() { pods, err := s.GetIngressPodDetails() assert.Nil(ginkgo.GinkgoT(), err) assert.Len(ginkgo.GinkgoT(), pods, 2) - lease, err := s.GetLeaderLease() + lease, err := s.WaitGetLeaderLease() assert.Nil(ginkgo.GinkgoT(), err) assert.Equal(ginkgo.GinkgoT(), *lease.Spec.LeaseDurationSeconds, int32(15)) if *lease.Spec.HolderIdentity != pods[0].Name && *lease.Spec.HolderIdentity != pods[1].Name { @@ -126,13 +134,11 @@ var _ = ginkgo.Describe("leader election", func() { }) ginkgo.It("leader failover", func() { - // Wait the leader election to complete. - time.Sleep(2 * time.Second) pods, err := s.GetIngressPodDetails() assert.Nil(ginkgo.GinkgoT(), err) assert.Len(ginkgo.GinkgoT(), pods, 2) - lease, err := s.GetLeaderLease() + lease, err := s.WaitGetLeaderLease() assert.Nil(ginkgo.GinkgoT(), err) leaderIdx := 0 @@ -141,9 +147,11 @@ var _ = ginkgo.Describe("leader election", func() { } ginkgo.GinkgoT().Logf("lease is %s", *lease.Spec.HolderIdentity) assert.Nil(ginkgo.GinkgoT(), s.KillPod(pods[leaderIdx].Name)) + + // Wait the old lease expire and new leader was elected. time.Sleep(25 * time.Second) - newLease, err := s.GetLeaderLease() + newLease, err := s.WaitGetLeaderLease() assert.Nil(ginkgo.GinkgoT(), err) newPods, err := s.GetIngressPodDetails() diff --git a/test/e2e/scaffold/httpbin.go b/test/e2e/scaffold/httpbin.go index 33401b30db..f9e1f87652 100644 --- a/test/e2e/scaffold/httpbin.go +++ b/test/e2e/scaffold/httpbin.go @@ -18,6 +18,9 @@ import ( "fmt" "time" + "github.com/onsi/ginkgo" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/gruntwork-io/terratest/modules/k8s" corev1 "k8s.io/api/core/v1" ) @@ -112,3 +115,32 @@ func (s *Scaffold) ScaleHTTPBIN(desired int) error { } return nil } + +// WaitAllHTTPBINPods waits until all httpbin pods ready. +func (s *Scaffold) WaitAllHTTPBINPoddsAvailable() error { + opts := metav1.ListOptions{ + LabelSelector: "app=httpbin-deployment-e2e-test", + } + condFunc := func() (bool, error) { + items, err := k8s.ListPodsE(s.t, s.kubectlOptions, opts) + if err != nil { + return false, err + } + if len(items) == 0 { + ginkgo.GinkgoT().Log("no apisix pods created") + return false, nil + } + for _, item := range items { + for _, cond := range item.Status.Conditions { + if cond.Type != corev1.PodReady { + continue + } + if cond.Status != "True" { + return false, nil + } + } + } + return true, nil + } + return waitExponentialBackoff(condFunc) +} diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go index 5cb4d35f4c..6076eb9c13 100644 --- a/test/e2e/scaffold/ingress.go +++ b/test/e2e/scaffold/ingress.go @@ -23,6 +23,7 @@ import ( coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -157,14 +158,25 @@ func (s *Scaffold) waitAllIngressControllerPodsAvailable() error { return waitExponentialBackoff(condFunc) } -// GetLeaderLease returns the Lease resource. -func (s *Scaffold) GetLeaderLease() (*coordinationv1.Lease, error) { +// WaitGetLeaderLease waits the lease to be created and returns it. +func (s *Scaffold) WaitGetLeaderLease() (*coordinationv1.Lease, error) { cli, err := k8s.GetKubernetesClientE(s.t) if err != nil { return nil, err } - lease, err := cli.CoordinationV1().Leases(s.namespace).Get(context.TODO(), "ingress-apisix-leader", metav1.GetOptions{}) - if err != nil { + var lease *coordinationv1.Lease + condFunc := func() (bool, error) { + l, err := cli.CoordinationV1().Leases(s.namespace).Get(context.TODO(), "ingress-apisix-leader", metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return false, nil + } + return false, err + } + lease = l + return true, nil + } + if err := waitExponentialBackoff(condFunc); err != nil { return nil, err } return lease, nil diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index 858686049b..6e3e851536 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -163,14 +163,12 @@ func (s *Scaffold) beforeEach() { s.etcdService, err = s.newEtcd() assert.Nil(s.t, err, "initializing etcd") - // We don't use k8s.WaitUntilServiceAvailable since it hacks for Minikube. - err = k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, s.labelSelector("app=etcd-deployment-e2e-test"), 1, 5, 2*time.Second) + err = s.waitAllEtcdPodsAvailable() assert.Nil(s.t, err, "waiting for etcd ready") s.apisixService, err = s.newAPISIX() assert.Nil(s.t, err, "initializing Apache APISIX") - // We don't use k8s.WaitUntilServiceAvailable since it hacks for Minikube. err = s.waitAllAPISIXPodsAvailable() assert.Nil(s.t, err, "waiting for apisix ready") @@ -194,6 +192,10 @@ func (s *Scaffold) afterEach() { for _, f := range s.finializers { f() } + + // Wait for a while to prevent the worker node being overwhelming + // (new cases will be run). + time.Sleep(3 * time.Second) } func (s *Scaffold) addFinializer(f func()) { @@ -216,10 +218,9 @@ func (s *Scaffold) renderConfig(path string) (string, error) { func waitExponentialBackoff(condFunc func() (bool, error)) error { backoff := wait.Backoff{ - Duration: 100 * time.Millisecond, - Factor: 3, - Jitter: 0, - Steps: 6, + Duration: 500 * time.Millisecond, + Factor: 2, + Steps: 8, } return wait.ExponentialBackoff(backoff, condFunc) }