Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable leader election in ws-manager-mk2 (v3) #18539

Merged
merged 17 commits into from
Aug 26, 2023
Merged
2 changes: 2 additions & 0 deletions components/ws-manager-api/go/crd/v1/workspace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ type WorkspaceStatus struct {

// +kubebuilder:validation:Optional
Runtime *WorkspaceRuntimeStatus `json:"runtime,omitempty"`

LastActivity *metav1.Time `json:"lastActivity,omitempty"`
}

func (s *WorkspaceStatus) SetCondition(cond metav1.Condition) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion components/ws-manager-mk2/cmd/sample-workspace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"log"
"time"

workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"

workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
)

func main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ spec:
type: string
type: array
type: object
lastActivity:
format: date-time
type: string
ownerToken:
type: string
phase:
Expand Down
2 changes: 0 additions & 2 deletions components/ws-manager-mk2/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ spec:
containers:
- command:
- /manager
args:
- --leader-elect
image: controller:latest
name: manager
securityContext:
Expand Down
53 changes: 48 additions & 5 deletions components/ws-manager-mk2/controllers/maintenance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"

Expand All @@ -17,7 +18,12 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var (
Expand Down Expand Up @@ -106,9 +112,46 @@ func (r *MaintenanceReconciler) setEnabledUntil(ctx context.Context, enabledUnti
log.FromContext(ctx).Info("maintenance mode state change", "enabledUntil", enabledUntil)
}

func (r *MaintenanceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("maintenance").
For(&corev1.ConfigMap{}).
Complete(r)
func (r *MaintenanceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// We need to use an unmanaged controller to avoid issues when the pod is in standby mode.
// In that scenario, the controllers are not started and don't watch changes and only
// observe the maintenance mode during the initialization.
c, err := controller.NewUnmanaged("maintenance-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

go func() {
err = c.Start(ctx)
if err != nil {
log.FromContext(ctx).Error(err, "cannot start maintenance reconciler")
os.Exit(1)
}
}()

return c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), &handler.EnqueueRequestForObject{}, &filterConfigMap{})
}

type filterConfigMap struct {
predicate.Funcs
}

func (f filterConfigMap) Create(e event.CreateEvent) bool {
return f.filter(e.Object)
}

func (f filterConfigMap) Update(e event.UpdateEvent) bool {
return f.filter(e.ObjectNew)
}

func (f filterConfigMap) Generic(e event.GenericEvent) bool {
return f.filter(e.Object)
}

func (f filterConfigMap) filter(obj client.Object) bool {
if obj == nil {
return false
}

return obj.GetName() == configMapKey.Name && obj.GetNamespace() == configMapKey.Namespace
}
80 changes: 80 additions & 0 deletions components/ws-manager-mk2/controllers/subscriber_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package controllers

import (
"context"
"os"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"

config "github.com/gitpod-io/gitpod/ws-manager/api/config"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
)

func NewSubscriberReconciler(c client.Client, cfg *config.Configuration) (*SubscriberReconciler, error) {
aledbf marked this conversation as resolved.
Show resolved Hide resolved
reconciler := &SubscriberReconciler{
Client: c,
Config: cfg,
}

return reconciler, nil
}

type SubscriberReconciler struct {
client.Client

Config *config.Configuration

OnReconcile func(ctx context.Context, ws *workspacev1.Workspace)
}

func (r *SubscriberReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

var workspace workspacev1.Workspace
if err := r.Get(ctx, req.NamespacedName, &workspace); err != nil {
if !errors.IsNotFound(err) {
log.Error(err, "unable to fetch workspace")
}

return ctrl.Result{}, client.IgnoreNotFound(err)
}

if workspace.Status.Conditions == nil {
workspace.Status.Conditions = []metav1.Condition{}
}

if r.OnReconcile != nil {
r.OnReconcile(ctx, &workspace)
aledbf marked this conversation as resolved.
Show resolved Hide resolved
}
aledbf marked this conversation as resolved.
Show resolved Hide resolved

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *SubscriberReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
c, err := controller.NewUnmanaged("subscribers-controller", mgr, controller.Options{Reconciler: r})
aledbf marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

go func() {
err = c.Start(ctx)
if err != nil {
log.FromContext(ctx).Error(err, "cannot start Subscriber reconciler")
os.Exit(1)
}
}()

return c.Watch(source.Kind(mgr.GetCache(), &workspacev1.Workspace{}), &handler.EnqueueRequestForObject{})
}
11 changes: 4 additions & 7 deletions components/ws-manager-mk2/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/gitpod-io/gitpod/common-go/util"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
"github.com/gitpod-io/gitpod/ws-manager/api/config"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
//+kubebuilder:scaffold:imports
Expand All @@ -50,10 +49,9 @@ func TestAPIs(t *testing.T) {
}

var (
ctx context.Context
cancel context.CancelFunc
wsActivity *activity.WorkspaceActivity
wsMetrics *controllerMetrics
ctx context.Context
cancel context.CancelFunc
wsMetrics *controllerMetrics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do the ws-manager metrics behave when two replicas are enabled? Do both report metrics, or only one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics will be scrapped from all available replicas

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will break some of the dashboards then (and maybe alerts too). For instance we sum all workspaces by cluster, this will then double the amount of reported workspaces

Copy link
Member Author

@aledbf aledbf Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this gist (integration tests are running there)

Only the leader will have metrics related to the controller. If the current leader is not elected anymore, the pod is restarted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the gists. Both replicas report workspace metrics, e.g.:

gitpod_ws_manager_mk2_workspace_phase_total{class="g1-standard",phase="Running",type="Regular"} 1

is in both. Summing them as done in dashboards/alerts would give 2 replicas, while there's only 1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gist updated after running the integration test

)

var _ = BeforeSuite(func() {
Expand Down Expand Up @@ -111,8 +109,7 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())
Expect(wsReconciler.SetupWithManager(k8sManager)).To(Succeed())

wsActivity = activity.NewWorkspaceActivity()
timeoutReconciler, err := NewTimeoutReconciler(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("workspace"), conf, wsActivity, maintenance)
timeoutReconciler, err := NewTimeoutReconciler(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("workspace"), conf, maintenance)
Expect(err).ToNot(HaveOccurred())
Expect(timeoutReconciler.SetupWithManager(k8sManager)).To(Succeed())

Expand Down
8 changes: 3 additions & 5 deletions components/ws-manager-mk2/controllers/timeout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/gitpod-io/gitpod/common-go/util"
wsactivity "github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/maintenance"
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
)

func NewTimeoutReconciler(c client.Client, recorder record.EventRecorder, cfg config.Configuration, activity *wsactivity.WorkspaceActivity, maintenance maintenance.Maintenance) (*TimeoutReconciler, error) {
func NewTimeoutReconciler(c client.Client, recorder record.EventRecorder, cfg config.Configuration, maintenance maintenance.Maintenance) (*TimeoutReconciler, error) {
if cfg.HeartbeatInterval == 0 {
return nil, fmt.Errorf("invalid heartbeat interval, must not be 0")
}
Expand All @@ -38,7 +38,6 @@ func NewTimeoutReconciler(c client.Client, recorder record.EventRecorder, cfg co
return &TimeoutReconciler{
Client: c,
Config: cfg,
activity: activity,
reconcileInterval: reconcileInterval,
recorder: recorder,
maintenance: maintenance,
Expand All @@ -53,7 +52,6 @@ type TimeoutReconciler struct {
client.Client

Config config.Configuration
activity *wsactivity.WorkspaceActivity
reconcileInterval time.Duration
recorder record.EventRecorder
maintenance maintenance.Maintenance
Expand Down Expand Up @@ -157,7 +155,7 @@ func (r *TimeoutReconciler) isWorkspaceTimedOut(ws *workspacev1.Workspace) (reas
}

start := ws.ObjectMeta.CreationTimestamp.Time
lastActivity := r.activity.GetLastActivity(ws)
lastActivity := activity.Last(ws)
isClosed := ws.IsConditionTrue(workspacev1.WorkspaceConditionClosed)

switch phase {
Expand Down
56 changes: 13 additions & 43 deletions components/ws-manager-mk2/controllers/timeout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -36,7 +35,7 @@ var _ = Describe("TimeoutController", func() {
// Use a fake client instead of the envtest's k8s client, such that we can add objects
// with custom CreationTimestamps and check timeout logic.
fakeClient = fake.NewClientBuilder().WithStatusSubresource(&workspacev1.Workspace{}).WithScheme(k8sClient.Scheme()).Build()
r, err = NewTimeoutReconciler(fakeClient, record.NewFakeRecorder(100), conf, activity.NewWorkspaceActivity(), &fakeMaintenance{enabled: false})
r, err = NewTimeoutReconciler(fakeClient, record.NewFakeRecorder(100), conf, &fakeMaintenance{enabled: false})
Expect(err).ToNot(HaveOccurred())
})

Expand All @@ -48,20 +47,21 @@ var _ = Describe("TimeoutController", func() {
customMaxLifetime *time.Duration
update func(ws *workspacev1.Workspace)
updateStatus func(ws *workspacev1.Workspace)
controllerRestart time.Time
expectTimeout bool
}
DescribeTable("workspace timeouts",
func(tc testCase) {
By("creating a workspace")
ws := newWorkspace(uuid.NewString(), "default")
ws.CreationTimestamp = metav1.NewTime(now.Add(-tc.age))
Expect(fakeClient.Create(ctx, ws)).To(Succeed())

if tc.lastActivityAgo != nil {
r.activity.Store(ws.Name, now.Add(-*tc.lastActivityAgo))
now := metav1.NewTime(now.Add(-*tc.lastActivityAgo))
ws.Status.LastActivity = &now
}

Expect(fakeClient.Create(ctx, ws)).To(Succeed())

updateObjWithRetries(fakeClient, ws, false, func(ws *workspacev1.Workspace) {
if tc.customTimeout != nil {
ws.Spec.Timeout.Time = &metav1.Duration{Duration: *tc.customTimeout}
Expand All @@ -80,14 +80,6 @@ var _ = Describe("TimeoutController", func() {
}
})

// Set controller (re)start time.
if tc.controllerRestart.IsZero() {
// Bit arbitrary, but default to the controller running for ~2 days.
r.activity.ManagerStartedAt = now.Add(-48 * time.Hour)
} else {
r.activity.ManagerStartedAt = tc.controllerRestart
}

// Run the timeout controller for this workspace.
By("running the TimeoutController reconcile()")
_, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: ws.Name, Namespace: ws.Namespace}})
Expand Down Expand Up @@ -159,32 +151,11 @@ var _ = Describe("TimeoutController", func() {
lastActivityAgo: pointer.Duration(1 * time.Minute),
expectTimeout: true,
}),
Entry("shouldn't timeout after controller restart", testCase{
phase: workspacev1.WorkspacePhaseRunning,
updateStatus: func(ws *workspacev1.Workspace) {
// Add FirstUserActivity condition from 5 hours ago.
// From this condition the controller should deduce that the workspace
// has had user activity, but since lastActivity is nil, it's been cleared on
// a restart. The controller therefore should not timeout the workspace and
// wait for new user activity. Or timeout once user activity doesn't come
// eventually after the controller restart.
ws.Status.Conditions = wsk8s.AddUniqueCondition(ws.Status.Conditions, metav1.Condition{
Type: string(workspacev1.WorkspaceConditionFirstUserActivity),
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now.Add(-5 * time.Hour)),
})
},
age: 5 * time.Hour,
lastActivityAgo: nil, // No last activity recorded yet after controller restart.
controllerRestart: now,
expectTimeout: false,
}),
Entry("should timeout after controller restart if no FirstUserActivity", testCase{
phase: workspacev1.WorkspacePhaseRunning,
age: 5 * time.Hour,
lastActivityAgo: nil, // No last activity recorded yet after controller restart.
controllerRestart: now,
expectTimeout: true,
phase: workspacev1.WorkspacePhaseRunning,
age: 5 * time.Hour,
lastActivityAgo: nil, // No last activity recorded yet after controller restart.
expectTimeout: true,
}),
Entry("should timeout eventually with no user activity after controller restart", testCase{
phase: workspacev1.WorkspacePhaseRunning,
Expand All @@ -195,10 +166,9 @@ var _ = Describe("TimeoutController", func() {
LastTransitionTime: metav1.NewTime(now.Add(-5 * time.Hour)),
})
},
age: 5 * time.Hour,
lastActivityAgo: nil,
controllerRestart: now.Add(-2 * time.Hour),
expectTimeout: true,
age: 5 * time.Hour,
lastActivityAgo: nil,
expectTimeout: true,
}),
)
})
Expand All @@ -207,7 +177,7 @@ var _ = Describe("TimeoutController", func() {
var r *TimeoutReconciler
BeforeEach(func() {
var err error
r, err = NewTimeoutReconciler(k8sClient, record.NewFakeRecorder(100), newTestConfig(), activity.NewWorkspaceActivity(), &fakeMaintenance{enabled: false})
r, err = NewTimeoutReconciler(k8sClient, record.NewFakeRecorder(100), newTestConfig(), &fakeMaintenance{enabled: false})
Expect(err).ToNot(HaveOccurred())
})

Expand Down
Loading
Loading