Skip to content

Commit

Permalink
Online image change: route to temporary subcluster to keep online (#131)
Browse files Browse the repository at this point in the history
This is the next set of changes for online image change. It will
route to a temporary subcluster, called a transient, so that client
connections connect to an up node. It will automatically route back
to the original subcluster once the subcluster is back up. It will
also process secondary subclusters that they are brought back up.

This includes some rework in the onlineimagechange_reconciler to cut
down on the amount of code duplication.

Added the ability to specify a template for the transient cluster.
This will get created when the image change starts and will get
cleanup up when the image change is done.
  • Loading branch information
spilchen authored Dec 21, 2021
1 parent 1afbfd2 commit 2b458b4
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 165 deletions.
17 changes: 9 additions & 8 deletions api/v1beta1/verticadb_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ type VerticaDBSpec struct {
// running with a Vertica version that supports read-only subclusters.
ImageChangePolicy ImageChangePolicyType `json:"imageChangePolicy"`

// +kubebuilder:validation:Optional
// +operator-sdk:csv:customresourcedefinitions:type=spec,xDescriptors="urn:alm:descriptor:com.tectonic.ui:hidden"
// When doing an online image change, we utilize a transient subcluster to
// serve traffic while one of the other subclusters restart. This is the
// size of that subcluster. This subcluster is created at the beginning of
// the online image change and is removed when that process cleans up.
TransientSubclusterSize int `json:"transientSubclusterSize,omitempty"`

// +kubebuilder:validation:Optional
// +operator-sdk:csv:customresourcedefinitions:type=spec,xDescriptors={"urn:alm:descriptor:com.tectonic.ui:fieldDependency:initPolicy:Revive","urn:alm:descriptor:com.tectonic.ui:advanced"}
// This specifies the order of nodes when doing a revive. Each entry
Expand Down Expand Up @@ -199,6 +191,15 @@ type VerticaDBSpec struct {
//+operator-sdk:csv:customresourcedefinitions:type=spec
Subclusters []Subcluster `json:"subclusters"`

// +kubebuilder:validation:Optional
// +operator-sdk:csv:customresourcedefinitions:type=spec,xDescriptors="urn:alm:descriptor:com.tectonic.ui:hidden"
// When doing an online image change, we utilize a transient subcluster to
// serve traffic while one of the other subclusters restart. This is the
// template to create that subcluster. This subcluster is created at the
// beginning of the online image change and is removed when that process
// cleans up.
TransientSubclusterTemplate Subcluster `json:"transientSubclusterTemplate,omitempty"`

// +kubebuilder:default:="1"
// +kubebuilder:validation:Optional
// +operator-sdk:csv:customresourcedefinitions:type=spec,xDescriptors={"urn:alm:descriptor:com.tectonic.ui:select:0","urn:alm:descriptor:com.tectonic.ui:select:1","urn:alm:descriptor:com.tectonic.ui:advanced"}
Expand Down
36 changes: 26 additions & 10 deletions pkg/controllers/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
)

const (
SuperuserPasswordPath = "superuser-passwd"
TransientSubclusterName = "transient"
SuperuserPasswordPath = "superuser-passwd"
DefaultTransientSubclusterName = "transient"
)

// buildExtSvc creates desired spec for the external service.
Expand Down Expand Up @@ -586,21 +586,37 @@ func getK8sAffinity(a vapi.Affinity) *corev1.Affinity {

// buildTransientSubcluster creates a temporary read-only subcluster based on an
// existing subcluster
func buildTransientSubcluster(sc *vapi.Subcluster, imageOverride string) *vapi.Subcluster {
func buildTransientSubcluster(vdb *vapi.VerticaDB, sc *vapi.Subcluster, imageOverride string) *vapi.Subcluster {
return &vapi.Subcluster{
Name: TransientSubclusterName,
Size: 1,
Name: transientSubclusterName(vdb),
Size: transientSubclusterSize(vdb),
IsTransient: true,
ImageOverride: imageOverride,
IsPrimary: false,
NodeSelector: sc.NodeSelector,
Affinity: sc.Affinity,
PriorityClassName: sc.PriorityClassName,
Tolerations: sc.Tolerations,
Resources: sc.Resources,
NodeSelector: vdb.Spec.TransientSubclusterTemplate.NodeSelector,
Affinity: vdb.Spec.TransientSubclusterTemplate.Affinity,
PriorityClassName: vdb.Spec.TransientSubclusterTemplate.PriorityClassName,
Tolerations: vdb.Spec.TransientSubclusterTemplate.Tolerations,
Resources: vdb.Spec.TransientSubclusterTemplate.Resources,
ServiceType: sc.ServiceType,
ServiceName: sc.GetServiceName(),
NodePort: sc.NodePort,
ExternalIPs: sc.ExternalIPs,
}
}

// transientSuclusterName returns the name of the transient subcluster
func transientSubclusterName(vdb *vapi.VerticaDB) string {
if vdb.Spec.TransientSubclusterTemplate.Name == "" {
return DefaultTransientSubclusterName
}
return vdb.Spec.TransientSubclusterTemplate.Name
}

// transientSubclusterSize returns the size of the transient subcluster.
func transientSubclusterSize(vdb *vapi.VerticaDB) int32 {
if vdb.Spec.TransientSubclusterTemplate.Size > 0 {
return vdb.Spec.TransientSubclusterTemplate.Size
}
return 1
}
66 changes: 35 additions & 31 deletions pkg/controllers/imagechange.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ func (i *ImageChangeManager) setImageChangeStatus(ctx context.Context, msg strin
}

// updateImageInStatefulSets will change the image in each of the statefulsets.
// Caller can indicate whether primary or secondary types change.
func (i *ImageChangeManager) updateImageInStatefulSets(ctx context.Context, chgPrimary, chgSecondary bool) (int, ctrl.Result, error) {
// This changes the images in all subclusters except any transient ones.
func (i *ImageChangeManager) updateImageInStatefulSets(ctx context.Context) (int, ctrl.Result, error) {
numStsChanged := 0 // Count to keep track of the nubmer of statefulsets updated

// We use FindExisting for the finder because we only want to work with sts
Expand All @@ -176,12 +176,6 @@ func (i *ImageChangeManager) updateImageInStatefulSets(ctx context.Context, chgP
for inx := range stss.Items {
sts := &stss.Items[inx]

if !chgPrimary && sts.Labels[SubclusterTypeLabel] == vapi.PrimarySubclusterType {
continue
}
if !chgSecondary && sts.Labels[SubclusterTypeLabel] == vapi.SecondarySubclusterType {
continue
}
isTransient, err := strconv.ParseBool(sts.Labels[SubclusterTransientLabel])
if err != nil {
return numStsChanged, ctrl.Result{}, err
Expand All @@ -190,33 +184,44 @@ func (i *ImageChangeManager) updateImageInStatefulSets(ctx context.Context, chgP
continue
}

// Skip the statefulset if it already has the proper image.
if sts.Spec.Template.Spec.Containers[names.ServerContainerIndex].Image != i.Vdb.Spec.Image {
i.Log.Info("Updating image in old statefulset", "name", sts.ObjectMeta.Name)
if stsUpdated, err := i.updateImageInStatefulSet(ctx, sts); err != nil {
return numStsChanged, ctrl.Result{}, err
} else if stsUpdated {
err = i.setImageChangeStatus(ctx, "Rescheduling pods with new image name")
if err != nil {
return numStsChanged, ctrl.Result{}, err
}
sts.Spec.Template.Spec.Containers[names.ServerContainerIndex].Image = i.Vdb.Spec.Image
// We change the update strategy to OnDelete. We don't want the k8s
// sts controller to interphere and do a rolling update after the
// update has completed. We don't explicitly change this back. The
// ObjReconciler will handle it for us.
sts.Spec.UpdateStrategy.Type = appsv1.OnDeleteStatefulSetStrategyType
err = i.VRec.Client.Update(ctx, sts)
if err != nil {
return numStsChanged, ctrl.Result{}, err
}
numStsChanged++
}
}
return numStsChanged, ctrl.Result{}, nil
}

// updateImageInStatefulSet will update the image in the given statefulset. It
// returns true if the image was changed.
func (i *ImageChangeManager) updateImageInStatefulSet(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) {
stsUpdated := false
// Skip the statefulset if it already has the proper image.
if sts.Spec.Template.Spec.Containers[names.ServerContainerIndex].Image != i.Vdb.Spec.Image {
i.Log.Info("Updating image in old statefulset", "name", sts.ObjectMeta.Name)
sts.Spec.Template.Spec.Containers[names.ServerContainerIndex].Image = i.Vdb.Spec.Image
// We change the update strategy to OnDelete. We don't want the k8s
// sts controller to interphere and do a rolling update after the
// update has completed. We don't explicitly change this back. The
// ObjReconciler will handle it for us.
sts.Spec.UpdateStrategy.Type = appsv1.OnDeleteStatefulSetStrategyType
if err := i.VRec.Client.Update(ctx, sts); err != nil {
return false, err
}
stsUpdated = true
}
return stsUpdated, nil
}

// deletePodsRunningOldImage will delete pods that have the old image. It will return the
// number of pods that were deleted. Callers can control whether to delete pods
// just for the primary or primary/secondary.
func (i *ImageChangeManager) deletePodsRunningOldImage(ctx context.Context, delSecondary bool) (int, ctrl.Result, error) {
// for a specific subcluster or all -- passing an empty string for scName will delete all.
func (i *ImageChangeManager) deletePodsRunningOldImage(ctx context.Context, scName string) (int, error) {
numPodsDeleted := 0 // Tracks the number of pods that were deleted

// We use FindExisting for the finder because we only want to work with pods
Expand All @@ -225,16 +230,15 @@ func (i *ImageChangeManager) deletePodsRunningOldImage(ctx context.Context, delS
// doesn't take affect until after the image change.
pods, err := i.Finder.FindPods(ctx, FindExisting)
if err != nil {
return numPodsDeleted, ctrl.Result{}, err
return numPodsDeleted, err
}
for inx := range pods.Items {
pod := &pods.Items[inx]

// We aren't deleting secondary pods, so we only continue if the pod is
// for a primary
if !delSecondary {
scType, ok := pod.Labels[SubclusterTypeLabel]
if ok && scType != vapi.PrimarySubclusterType {
// If scName was passed in, we only delete for a specific subcluster
if scName != "" {
scNameFromLabel, ok := pod.Labels[SubclusterNameLabel]
if ok && scNameFromLabel != scName {
continue
}
}
Expand All @@ -244,12 +248,12 @@ func (i *ImageChangeManager) deletePodsRunningOldImage(ctx context.Context, delS
i.Log.Info("Deleting pod that had old image", "name", pod.ObjectMeta.Name)
err = i.VRec.Client.Delete(ctx, pod)
if err != nil {
return numPodsDeleted, ctrl.Result{}, err
return numPodsDeleted, err
}
numPodsDeleted++
}
}
return numPodsDeleted, ctrl.Result{}, nil
return numPodsDeleted, nil
}

// onlineImageChangeAllowed returns true if image change must be done online
Expand Down
61 changes: 33 additions & 28 deletions pkg/controllers/imagechange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

var _ = Describe("imagechange", func() {
ctx := context.Background()
const OldImage = "old-image"
const NewImage = "new-image-1"

It("should correctly pick the image change type", func() {
vdb := vapi.MakeVDB()
Expand Down Expand Up @@ -76,10 +78,7 @@ var _ = Describe("imagechange", func() {
Expect(mgr.IsImageChangeNeeded(ctx)).Should(Equal(false))
})

It("should change the image of just the primaries or just secondaries", func() {
const OldImage = "old-image"
const NewImage1 = "new-image-1"
const NewImage2 = "new-image-2"
It("should change the image of both primaries and secondaries", func() {
vdb := vapi.MakeVDB()
vdb.Spec.Image = OldImage
vdb.Spec.Subclusters = []vapi.Subcluster{
Expand All @@ -88,64 +87,70 @@ var _ = Describe("imagechange", func() {
}
createPods(ctx, vdb, AllPodsRunning)
defer deletePods(ctx, vdb)
vdb.Spec.Image = NewImage1
vdb.Spec.Image = NewImage
createVdb(ctx, vdb)
defer deleteVdb(ctx, vdb)

mgr := MakeImageChangeManager(vrec, logger, vdb, vapi.OfflineImageChangeInProgress,
func(vdb *vapi.VerticaDB) bool { return true })
Expect(mgr.IsImageChangeNeeded(ctx)).Should(Equal(true))
stsChange, res, err := mgr.updateImageInStatefulSets(ctx, true, false)
stsChange, res, err := mgr.updateImageInStatefulSets(ctx)
Expect(err).Should(Succeed())
Expect(res).Should(Equal(ctrl.Result{}))
Expect(stsChange).Should(Equal(1))
Expect(stsChange).Should(Equal(2))

sts := &appsv1.StatefulSet{}
Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[0]), sts)).Should(Succeed())
Expect(sts.Spec.Template.Spec.Containers[ServerContainerIndex].Image).Should(Equal(NewImage1))
Expect(sts.Spec.Template.Spec.Containers[ServerContainerIndex].Image).Should(Equal(NewImage))
Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[1]), sts)).Should(Succeed())
Expect(sts.Spec.Template.Spec.Containers[ServerContainerIndex].Image).Should(Equal(OldImage))
Expect(sts.Spec.Template.Spec.Containers[ServerContainerIndex].Image).Should(Equal(NewImage))
})

vdb.Spec.Image = NewImage2
Expect(k8sClient.Update(ctx, vdb)).Should(Succeed())
It("should delete pods of all subclusters", func() {
vdb := vapi.MakeVDB()
vdb.Spec.Subclusters = []vapi.Subcluster{
{Name: "sc1", Size: 1, IsPrimary: true},
{Name: "sc2", Size: 1, IsPrimary: false},
}
createPods(ctx, vdb, AllPodsRunning)
defer deletePods(ctx, vdb)
vdb.Spec.Image = NewImage // Change image to force pod deletion

stsChange, res, err = mgr.updateImageInStatefulSets(ctx, false, true)
mgr := MakeImageChangeManager(vrec, logger, vdb, vapi.OfflineImageChangeInProgress,
func(vdb *vapi.VerticaDB) bool { return true })
numPodsDeleted, err := mgr.deletePodsRunningOldImage(ctx, "") // pods from primaries only
Expect(err).Should(Succeed())
Expect(res).Should(Equal(ctrl.Result{}))
Expect(stsChange).Should(Equal(1))
Expect(numPodsDeleted).Should(Equal(2))

Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[0]), sts)).Should(Succeed())
Expect(sts.Spec.Template.Spec.Containers[ServerContainerIndex].Image).Should(Equal(NewImage1))
Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[1]), sts)).Should(Succeed())
Expect(sts.Spec.Template.Spec.Containers[ServerContainerIndex].Image).Should(Equal(NewImage2))
pod := &corev1.Pod{}
Expect(k8sClient.Get(ctx, names.GenPodName(vdb, &vdb.Spec.Subclusters[0], 0), pod)).ShouldNot(Succeed())
Expect(k8sClient.Get(ctx, names.GenPodName(vdb, &vdb.Spec.Subclusters[1], 0), pod)).ShouldNot(Succeed())
})

It("should delete pods of primaries only", func() {
It("should delete pods of specific subcluster", func() {
vdb := vapi.MakeVDB()
vdb.Spec.Subclusters = []vapi.Subcluster{
{Name: "sc1", Size: 1, IsPrimary: true},
{Name: "sc1", Size: 1, IsPrimary: false},
{Name: "sc2", Size: 1, IsPrimary: false},
}
createPods(ctx, vdb, AllPodsRunning)
defer deletePods(ctx, vdb)
vdb.Spec.Image = "new-image" // Change image to force pod deletion
vdb.Spec.Image = NewImage // Change image to force pod deletion

mgr := MakeImageChangeManager(vrec, logger, vdb, vapi.OfflineImageChangeInProgress,
func(vdb *vapi.VerticaDB) bool { return true })
numPodsDeleted, res, err := mgr.deletePodsRunningOldImage(ctx, false) // pods from primaries only
numPodsDeleted, err := mgr.deletePodsRunningOldImage(ctx, vdb.Spec.Subclusters[1].Name)
Expect(err).Should(Succeed())
Expect(res).Should(Equal(ctrl.Result{}))
Expect(numPodsDeleted).Should(Equal(1))

pod := &corev1.Pod{}
Expect(k8sClient.Get(ctx, names.GenPodName(vdb, &vdb.Spec.Subclusters[0], 0), pod)).ShouldNot(Succeed())
Expect(k8sClient.Get(ctx, names.GenPodName(vdb, &vdb.Spec.Subclusters[1], 0), pod)).Should(Succeed())
Expect(k8sClient.Get(ctx, names.GenPodName(vdb, &vdb.Spec.Subclusters[0], 0), pod)).Should(Succeed())
Expect(k8sClient.Get(ctx, names.GenPodName(vdb, &vdb.Spec.Subclusters[1], 0), pod)).ShouldNot(Succeed())

numPodsDeleted, res, err = mgr.deletePodsRunningOldImage(ctx, true) // pods from secondary and primaries
numPodsDeleted, err = mgr.deletePodsRunningOldImage(ctx, vdb.Spec.Subclusters[0].Name)
Expect(err).Should(Succeed())
Expect(res).Should(Equal(ctrl.Result{}))
Expect(numPodsDeleted).Should(Equal(1))

Expect(k8sClient.Get(ctx, names.GenPodName(vdb, &vdb.Spec.Subclusters[1], 0), pod)).ShouldNot(Succeed())
Expect(k8sClient.Get(ctx, names.GenPodName(vdb, &vdb.Spec.Subclusters[0], 0), pod)).ShouldNot(Succeed())
})
})
21 changes: 15 additions & 6 deletions pkg/controllers/labels_annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ const (

// makeSubclusterLabels returns the labels added for the subcluster
func makeSubclusterLabels(sc *vapi.Subcluster) map[string]string {
return map[string]string{
m := map[string]string{
SubclusterNameLabel: sc.Name,
SubclusterTypeLabel: sc.GetType(),
SubclusterSvcNameLabel: sc.GetServiceName(),
SubclusterTransientLabel: strconv.FormatBool(sc.IsTransient),
}
// Transient subclusters never have the service name label set. At various
// parts of the image change, it will accept traffic from all of the
// subclusters.
if !sc.IsTransient {
m[SubclusterSvcNameLabel] = sc.GetServiceName()
}
return m
}

// makeOperatorLabels returns the labels that all objects created by this operator will have
Expand Down Expand Up @@ -109,10 +115,13 @@ func makeSvcSelectorLabels(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]
VDBInstanceLabel: vdb.Name,
}
if sc != nil {
m[SubclusterSvcNameLabel] = sc.GetServiceName()
// This label is here to ensure service object routes to
// primary/secondary or the transient, but never both
m[SubclusterTransientLabel] = strconv.FormatBool(sc.IsTransient)
if sc.IsTransient {
// This label is here to ensure service object routes to
// primary/secondary or the transient, but never both
m[SubclusterTransientLabel] = strconv.FormatBool(sc.IsTransient)
} else {
m[SubclusterSvcNameLabel] = sc.GetServiceName()
}
}
return m
}
2 changes: 1 addition & 1 deletion pkg/controllers/obj_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ var _ = Describe("obj_reconcile", func() {
svc1 := &corev1.Service{}
Expect(k8sClient.Get(ctx, nm, svc1)).Should(Succeed())

standby := buildTransientSubcluster(sc, "")
standby := buildTransientSubcluster(vdb, sc, "")
pfacts := MakePodFacts(k8sClient, &cmds.FakePodRunner{})
actor := MakeObjReconciler(vrec, logger, vdb, &pfacts)
objr := actor.(*ObjReconciler)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/offlineimagechange_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (o *OfflineImageChangeReconciler) stopCluster(ctx context.Context) (ctrl.Re
// Since there will be processing after to delete the pods so that they come up
// with the new image.
func (o *OfflineImageChangeReconciler) updateImageInStatefulSets(ctx context.Context) (ctrl.Result, error) {
numStsChanged, res, err := o.Manager.updateImageInStatefulSets(ctx, true, true)
numStsChanged, res, err := o.Manager.updateImageInStatefulSets(ctx)
if numStsChanged > 0 {
o.PFacts.Invalidate()
}
Expand All @@ -151,11 +151,11 @@ func (o *OfflineImageChangeReconciler) updateImageInStatefulSets(ctx context.Con
// the sts is OnDelete. Deleting the pods ensures they get rescheduled with the
// new image.
func (o *OfflineImageChangeReconciler) deletePods(ctx context.Context) (ctrl.Result, error) {
numPodsDeleted, res, err := o.Manager.deletePodsRunningOldImage(ctx, true)
numPodsDeleted, err := o.Manager.deletePodsRunningOldImage(ctx, "")
if numPodsDeleted > 0 {
o.PFacts.Invalidate()
}
return res, err
return ctrl.Result{}, err
}

// checkForNewPods will check to ensure at least one pod exists with the new image.
Expand Down
Loading

0 comments on commit 2b458b4

Please sign in to comment.