Skip to content

Commit

Permalink
Enable leader election in ws-manager-mk2 (v3) (#18539)
Browse files Browse the repository at this point in the history
* Enable leader election in ws-manager-mk2

* Update go modules

* Move workspace activity to CRD

* Remove workspace activity

* Cleanup

* Update ws-manager-mk2 CRD

* Cleanup

* Restore lastActivity logic

* TEST

* Disable observability

* Start the grpc server after leader election

* Bount the source of subscribers to an informer

* Cleanup

* Avoid deepCopy

* Remove goroutine to execute OnReconcile

* Refactor last activity to be consistent acrtoss the controllers

* Address feedback
  • Loading branch information
aledbf committed Aug 26, 2023
1 parent 4200369 commit 687f337
Show file tree
Hide file tree
Showing 21 changed files with 492 additions and 180 deletions.
2 changes: 2 additions & 0 deletions components/ws-manager-api/go/crd/v1/workspace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ type WorkspaceStatus struct {

// +kubebuilder:validation:Optional
Runtime *WorkspaceRuntimeStatus `json:"runtime,omitempty"`

LastActivity *metav1.Time `json:"lastActivity,omitempty"`
}

func (s *WorkspaceStatus) SetCondition(cond metav1.Condition) {
Expand Down
4 changes: 4 additions & 0 deletions components/ws-manager-api/go/crd/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion components/ws-manager-mk2/cmd/sample-workspace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"log"
"time"

workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"

workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
)

func main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ spec:
type: string
type: array
type: object
lastActivity:
format: date-time
type: string
ownerToken:
type: string
phase:
Expand Down
2 changes: 0 additions & 2 deletions components/ws-manager-mk2/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ spec:
containers:
- command:
- /manager
args:
- --leader-elect
image: controller:latest
name: manager
securityContext:
Expand Down
53 changes: 48 additions & 5 deletions components/ws-manager-mk2/controllers/maintenance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"

Expand All @@ -17,7 +18,12 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var (
Expand Down Expand Up @@ -106,9 +112,46 @@ func (r *MaintenanceReconciler) setEnabledUntil(ctx context.Context, enabledUnti
log.FromContext(ctx).Info("maintenance mode state change", "enabledUntil", enabledUntil)
}

func (r *MaintenanceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("maintenance").
For(&corev1.ConfigMap{}).
Complete(r)
func (r *MaintenanceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// We need to use an unmanaged controller to avoid issues when the pod is in standby mode.
// In that scenario, the controllers are not started and don't watch changes and only
// observe the maintenance mode during the initialization.
c, err := controller.NewUnmanaged("maintenance-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

go func() {
err = c.Start(ctx)
if err != nil {
log.FromContext(ctx).Error(err, "cannot start maintenance reconciler")
os.Exit(1)
}
}()

return c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), &handler.EnqueueRequestForObject{}, &filterConfigMap{})
}

type filterConfigMap struct {
predicate.Funcs
}

func (f filterConfigMap) Create(e event.CreateEvent) bool {
return f.filter(e.Object)
}

func (f filterConfigMap) Update(e event.UpdateEvent) bool {
return f.filter(e.ObjectNew)
}

func (f filterConfigMap) Generic(e event.GenericEvent) bool {
return f.filter(e.Object)
}

func (f filterConfigMap) filter(obj client.Object) bool {
if obj == nil {
return false
}

return obj.GetName() == configMapKey.Name && obj.GetNamespace() == configMapKey.Namespace
}
80 changes: 80 additions & 0 deletions components/ws-manager-mk2/controllers/subscriber_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package controllers

import (
"context"
"os"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"

config "github.com/gitpod-io/gitpod/ws-manager/api/config"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
)

func NewSubscriberReconciler(c client.Client, cfg *config.Configuration) (*SubscriberReconciler, error) {
reconciler := &SubscriberReconciler{
Client: c,
Config: cfg,
}

return reconciler, nil
}

type SubscriberReconciler struct {
client.Client

Config *config.Configuration

OnReconcile func(ctx context.Context, ws *workspacev1.Workspace)
}

func (r *SubscriberReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

var workspace workspacev1.Workspace
if err := r.Get(ctx, req.NamespacedName, &workspace); err != nil {
if !errors.IsNotFound(err) {
log.Error(err, "unable to fetch workspace")
}

return ctrl.Result{}, client.IgnoreNotFound(err)
}

if workspace.Status.Conditions == nil {
workspace.Status.Conditions = []metav1.Condition{}
}

if r.OnReconcile != nil {
r.OnReconcile(ctx, &workspace)
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *SubscriberReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
c, err := controller.NewUnmanaged("subscribers-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

go func() {
err = c.Start(ctx)
if err != nil {
log.FromContext(ctx).Error(err, "cannot start Subscriber reconciler")
os.Exit(1)
}
}()

return c.Watch(source.Kind(mgr.GetCache(), &workspacev1.Workspace{}), &handler.EnqueueRequestForObject{})
}
11 changes: 4 additions & 7 deletions components/ws-manager-mk2/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/gitpod-io/gitpod/common-go/util"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
"github.com/gitpod-io/gitpod/ws-manager/api/config"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
//+kubebuilder:scaffold:imports
Expand All @@ -50,10 +49,9 @@ func TestAPIs(t *testing.T) {
}

var (
ctx context.Context
cancel context.CancelFunc
wsActivity *activity.WorkspaceActivity
wsMetrics *controllerMetrics
ctx context.Context
cancel context.CancelFunc
wsMetrics *controllerMetrics
)

var _ = BeforeSuite(func() {
Expand Down Expand Up @@ -111,8 +109,7 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())
Expect(wsReconciler.SetupWithManager(k8sManager)).To(Succeed())

wsActivity = activity.NewWorkspaceActivity()
timeoutReconciler, err := NewTimeoutReconciler(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("workspace"), conf, wsActivity, maintenance)
timeoutReconciler, err := NewTimeoutReconciler(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("workspace"), conf, maintenance)
Expect(err).ToNot(HaveOccurred())
Expect(timeoutReconciler.SetupWithManager(k8sManager)).To(Succeed())

Expand Down
8 changes: 3 additions & 5 deletions components/ws-manager-mk2/controllers/timeout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/gitpod-io/gitpod/common-go/util"
wsactivity "github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/maintenance"
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
)

func NewTimeoutReconciler(c client.Client, recorder record.EventRecorder, cfg config.Configuration, activity *wsactivity.WorkspaceActivity, maintenance maintenance.Maintenance) (*TimeoutReconciler, error) {
func NewTimeoutReconciler(c client.Client, recorder record.EventRecorder, cfg config.Configuration, maintenance maintenance.Maintenance) (*TimeoutReconciler, error) {
if cfg.HeartbeatInterval == 0 {
return nil, fmt.Errorf("invalid heartbeat interval, must not be 0")
}
Expand All @@ -38,7 +38,6 @@ func NewTimeoutReconciler(c client.Client, recorder record.EventRecorder, cfg co
return &TimeoutReconciler{
Client: c,
Config: cfg,
activity: activity,
reconcileInterval: reconcileInterval,
recorder: recorder,
maintenance: maintenance,
Expand All @@ -53,7 +52,6 @@ type TimeoutReconciler struct {
client.Client

Config config.Configuration
activity *wsactivity.WorkspaceActivity
reconcileInterval time.Duration
recorder record.EventRecorder
maintenance maintenance.Maintenance
Expand Down Expand Up @@ -157,7 +155,7 @@ func (r *TimeoutReconciler) isWorkspaceTimedOut(ws *workspacev1.Workspace) (reas
}

start := ws.ObjectMeta.CreationTimestamp.Time
lastActivity := r.activity.GetLastActivity(ws)
lastActivity := activity.Last(ws)
isClosed := ws.IsConditionTrue(workspacev1.WorkspaceConditionClosed)

switch phase {
Expand Down
56 changes: 13 additions & 43 deletions components/ws-manager-mk2/controllers/timeout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -36,7 +35,7 @@ var _ = Describe("TimeoutController", func() {
// Use a fake client instead of the envtest's k8s client, such that we can add objects
// with custom CreationTimestamps and check timeout logic.
fakeClient = fake.NewClientBuilder().WithStatusSubresource(&workspacev1.Workspace{}).WithScheme(k8sClient.Scheme()).Build()
r, err = NewTimeoutReconciler(fakeClient, record.NewFakeRecorder(100), conf, activity.NewWorkspaceActivity(), &fakeMaintenance{enabled: false})
r, err = NewTimeoutReconciler(fakeClient, record.NewFakeRecorder(100), conf, &fakeMaintenance{enabled: false})
Expect(err).ToNot(HaveOccurred())
})

Expand All @@ -48,20 +47,21 @@ var _ = Describe("TimeoutController", func() {
customMaxLifetime *time.Duration
update func(ws *workspacev1.Workspace)
updateStatus func(ws *workspacev1.Workspace)
controllerRestart time.Time
expectTimeout bool
}
DescribeTable("workspace timeouts",
func(tc testCase) {
By("creating a workspace")
ws := newWorkspace(uuid.NewString(), "default")
ws.CreationTimestamp = metav1.NewTime(now.Add(-tc.age))
Expect(fakeClient.Create(ctx, ws)).To(Succeed())

if tc.lastActivityAgo != nil {
r.activity.Store(ws.Name, now.Add(-*tc.lastActivityAgo))
now := metav1.NewTime(now.Add(-*tc.lastActivityAgo))
ws.Status.LastActivity = &now
}

Expect(fakeClient.Create(ctx, ws)).To(Succeed())

updateObjWithRetries(fakeClient, ws, false, func(ws *workspacev1.Workspace) {
if tc.customTimeout != nil {
ws.Spec.Timeout.Time = &metav1.Duration{Duration: *tc.customTimeout}
Expand All @@ -80,14 +80,6 @@ var _ = Describe("TimeoutController", func() {
}
})

// Set controller (re)start time.
if tc.controllerRestart.IsZero() {
// Bit arbitrary, but default to the controller running for ~2 days.
r.activity.ManagerStartedAt = now.Add(-48 * time.Hour)
} else {
r.activity.ManagerStartedAt = tc.controllerRestart
}

// Run the timeout controller for this workspace.
By("running the TimeoutController reconcile()")
_, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: ws.Name, Namespace: ws.Namespace}})
Expand Down Expand Up @@ -159,32 +151,11 @@ var _ = Describe("TimeoutController", func() {
lastActivityAgo: pointer.Duration(1 * time.Minute),
expectTimeout: true,
}),
Entry("shouldn't timeout after controller restart", testCase{
phase: workspacev1.WorkspacePhaseRunning,
updateStatus: func(ws *workspacev1.Workspace) {
// Add FirstUserActivity condition from 5 hours ago.
// From this condition the controller should deduce that the workspace
// has had user activity, but since lastActivity is nil, it's been cleared on
// a restart. The controller therefore should not timeout the workspace and
// wait for new user activity. Or timeout once user activity doesn't come
// eventually after the controller restart.
ws.Status.Conditions = wsk8s.AddUniqueCondition(ws.Status.Conditions, metav1.Condition{
Type: string(workspacev1.WorkspaceConditionFirstUserActivity),
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now.Add(-5 * time.Hour)),
})
},
age: 5 * time.Hour,
lastActivityAgo: nil, // No last activity recorded yet after controller restart.
controllerRestart: now,
expectTimeout: false,
}),
Entry("should timeout after controller restart if no FirstUserActivity", testCase{
phase: workspacev1.WorkspacePhaseRunning,
age: 5 * time.Hour,
lastActivityAgo: nil, // No last activity recorded yet after controller restart.
controllerRestart: now,
expectTimeout: true,
phase: workspacev1.WorkspacePhaseRunning,
age: 5 * time.Hour,
lastActivityAgo: nil, // No last activity recorded yet after controller restart.
expectTimeout: true,
}),
Entry("should timeout eventually with no user activity after controller restart", testCase{
phase: workspacev1.WorkspacePhaseRunning,
Expand All @@ -195,10 +166,9 @@ var _ = Describe("TimeoutController", func() {
LastTransitionTime: metav1.NewTime(now.Add(-5 * time.Hour)),
})
},
age: 5 * time.Hour,
lastActivityAgo: nil,
controllerRestart: now.Add(-2 * time.Hour),
expectTimeout: true,
age: 5 * time.Hour,
lastActivityAgo: nil,
expectTimeout: true,
}),
)
})
Expand All @@ -207,7 +177,7 @@ var _ = Describe("TimeoutController", func() {
var r *TimeoutReconciler
BeforeEach(func() {
var err error
r, err = NewTimeoutReconciler(k8sClient, record.NewFakeRecorder(100), newTestConfig(), activity.NewWorkspaceActivity(), &fakeMaintenance{enabled: false})
r, err = NewTimeoutReconciler(k8sClient, record.NewFakeRecorder(100), newTestConfig(), &fakeMaintenance{enabled: false})
Expect(err).ToNot(HaveOccurred())
})

Expand Down
Loading

0 comments on commit 687f337

Please sign in to comment.