Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

suspend cluster #382

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/nebulacluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (nc *NebulaCluster) GenerateOwnerReferences() []metav1.OwnerReference {
}
}

func (nc *NebulaCluster) IsSuspendEnabled() bool {
return pointer.BoolDeref(nc.Spec.Suspend, false)
}

func (nc *NebulaCluster) IsPVReclaimEnabled() bool {
return pointer.BoolDeref(nc.Spec.EnablePVReclaim, false)
}
Expand Down Expand Up @@ -155,6 +159,7 @@ func (nc *NebulaCluster) IsReady() bool {
func (nc *NebulaCluster) IsStoragedAvailable() bool {
return nc.StoragedComponent().IsReady() &&
nc.Status.Storaged.BalancedSpaces == nil &&
nc.Status.Storaged.RemovedSpaces == nil &&
nc.Status.Storaged.LastBalanceJob == nil
}

Expand Down
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/nebulacluster_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func getClientCertsVolume(sslCerts *SSLCertsSpec) []corev1.Volume {
}
}

func rollingUpdateDone(workloadStatus WorkloadStatus) bool {
func rollingUpdateDone(workloadStatus *WorkloadStatus) bool {
return workloadStatus.UpdatedReplicas == workloadStatus.Replicas &&
workloadStatus.ReadyReplicas == workloadStatus.Replicas &&
workloadStatus.CurrentReplicas == workloadStatus.UpdatedReplicas &&
Expand Down
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/nebulacluster_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ type NebulaClusterComponent interface {

IsReady() bool
GetUpdateRevision() string
SetPhase(phase ComponentPhase)
GetPhase() ComponentPhase
IsSuspending() bool
IsSuspended() bool
SetWorkloadStatus(status *WorkloadStatus)
UpdateComponentStatus(status *ComponentStatus)
}

Expand Down
32 changes: 32 additions & 0 deletions apis/apps/v1alpha1/nebulacluster_graphd.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type graphdComponent struct {
}

func (c *graphdComponent) GetUpdateRevision() string {
if c.nc.Status.Storaged.Workload == nil {
return ""
}
return c.nc.Status.Graphd.Workload.UpdateRevision
}

Expand Down Expand Up @@ -129,6 +132,9 @@ func (c *graphdComponent) GetEndpoints(portName string) []string {
}

func (c *graphdComponent) IsReady() bool {
if c.nc.Status.Graphd.Workload == nil {
return false
}
return *c.nc.Spec.Graphd.Replicas == c.nc.Status.Graphd.Workload.ReadyReplicas &&
rollingUpdateDone(c.nc.Status.Graphd.Workload)
}
Expand Down Expand Up @@ -314,3 +320,29 @@ func (c *graphdComponent) GenerateConfigMap() *corev1.ConfigMap {
func (c *graphdComponent) UpdateComponentStatus(status *ComponentStatus) {
c.nc.Status.Graphd = *status
}

func (c *graphdComponent) SetWorkloadStatus(status *WorkloadStatus) {
c.nc.Status.Graphd.Workload = status
}

func (c *graphdComponent) GetPhase() ComponentPhase {
return c.nc.Status.Graphd.Phase
}

func (c *graphdComponent) SetPhase(phase ComponentPhase) {
c.nc.Status.Graphd.Phase = phase
}

func (c *graphdComponent) IsSuspending() bool {
return c.nc.Status.Graphd.Phase == SuspendPhase
}

func (c *graphdComponent) IsSuspended() bool {
if !c.IsSuspending() {
return false
}
if c.nc.IsSuspendEnabled() && c.nc.Status.Graphd.Workload != nil {
return false
}
return true
}
32 changes: 32 additions & 0 deletions apis/apps/v1alpha1/nebulacluster_metad.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type metadComponent struct {
}

func (c *metadComponent) GetUpdateRevision() string {
if c.nc.Status.Metad.Workload == nil {
return ""
}
return c.nc.Status.Metad.Workload.UpdateRevision
}

Expand Down Expand Up @@ -146,6 +149,9 @@ func (c *metadComponent) GetEndpoints(portName string) []string {
}

func (c *metadComponent) IsReady() bool {
if c.nc.Status.Metad.Workload == nil {
return false
}
return *c.nc.Spec.Metad.Replicas == c.nc.Status.Metad.Workload.ReadyReplicas &&
rollingUpdateDone(c.nc.Status.Metad.Workload)
}
Expand Down Expand Up @@ -392,3 +398,29 @@ func (c *metadComponent) GenerateConfigMap() *corev1.ConfigMap {
func (c *metadComponent) UpdateComponentStatus(status *ComponentStatus) {
c.nc.Status.Metad = *status
}

func (c *metadComponent) SetWorkloadStatus(status *WorkloadStatus) {
c.nc.Status.Metad.Workload = status
}

func (c *metadComponent) GetPhase() ComponentPhase {
return c.nc.Status.Metad.Phase
}

func (c *metadComponent) SetPhase(phase ComponentPhase) {
c.nc.Status.Metad.Phase = phase
}

func (c *metadComponent) IsSuspending() bool {
return c.nc.Status.Metad.Phase == SuspendPhase
}

func (c *metadComponent) IsSuspended() bool {
if !c.IsSuspending() {
return false
}
if c.nc.IsSuspendEnabled() && c.nc.Status.Metad.Workload != nil {
return false
}
return true
}
32 changes: 32 additions & 0 deletions apis/apps/v1alpha1/nebulacluster_storaged.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type storagedComponent struct {
}

func (c *storagedComponent) GetUpdateRevision() string {
if c.nc.Status.Storaged.Workload == nil {
return ""
}
return c.nc.Status.Storaged.Workload.UpdateRevision
}

Expand Down Expand Up @@ -146,6 +149,9 @@ func (c *storagedComponent) GetEndpoints(portName string) []string {
}

func (c *storagedComponent) IsReady() bool {
if c.nc.Status.Storaged.Workload == nil {
return false
}
return *c.nc.Spec.Storaged.Replicas == c.nc.Status.Storaged.Workload.ReadyReplicas &&
rollingUpdateDone(c.nc.Status.Storaged.Workload)
}
Expand Down Expand Up @@ -372,6 +378,32 @@ func (c *storagedComponent) UpdateComponentStatus(status *ComponentStatus) {
c.nc.Status.Storaged.ComponentStatus = *status
}

func (c *storagedComponent) SetWorkloadStatus(status *WorkloadStatus) {
c.nc.Status.Storaged.Workload = status
}

func (c *storagedComponent) GetPhase() ComponentPhase {
return c.nc.Status.Storaged.Phase
}

func (c *storagedComponent) SetPhase(phase ComponentPhase) {
c.nc.Status.Storaged.Phase = phase
}

func (c *storagedComponent) IsSuspending() bool {
return c.nc.Status.Storaged.Phase == SuspendPhase
}

func (c *storagedComponent) IsSuspended() bool {
if !c.IsSuspending() {
return false
}
if c.nc.IsSuspendEnabled() && c.nc.Status.Storaged.Workload != nil {
return false
}
return true
}

func storageDataVolumeClaims(storageClaims []StorageClaim, componentType string) ([]corev1.PersistentVolumeClaim, error) {
var pvcs []corev1.PersistentVolumeClaim
for i := range storageClaims {
Expand Down
8 changes: 4 additions & 4 deletions apis/apps/v1alpha1/nebulacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type NebulaClusterSpec struct {
Reference WorkloadReference `json:"reference,omitempty"`

// +optional
Suspend bool `json:"suspend,omitempty"`
Suspend *bool `json:"suspend,omitempty"`

// +kubebuilder:default=default-scheduler
// +optional
Expand Down Expand Up @@ -127,9 +127,9 @@ type NebulaClusterStatus struct {

// ComponentStatus is the status and version of a nebula component.
type ComponentStatus struct {
Version string `json:"version,omitempty"`
Phase ComponentPhase `json:"phase,omitempty"`
Workload WorkloadStatus `json:"workload,omitempty"`
Version string `json:"version,omitempty"`
Phase ComponentPhase `json:"phase,omitempty"`
Workload *WorkloadStatus `json:"workload,omitempty"`
}

// StoragedStatus describes the status and version of nebula storaged.
Expand Down
11 changes: 10 additions & 1 deletion apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/caarlos0/env/v8 v8.0.0
github.com/go-playground/validator/v10 v10.15.5
github.com/google/go-cmp v0.5.9
github.com/hashicorp/go-multierror v1.1.1
github.com/joho/godotenv v1.5.1
github.com/onsi/gomega v1.27.7
github.com/openkruise/kruise-api v1.3.0
Expand Down Expand Up @@ -86,7 +87,6 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/component/graphd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
notExist := apierrors.IsNotFound(err)
oldWorkload := oldWorkloadTemp.DeepCopy()

needSuspend, err := suspendComponent(c.clientSet.Workload(), nc.GraphdComponent(), oldWorkload)
if err != nil {
return fmt.Errorf("suspend graphd cluster %s failed: %v", componentName, err)
}
if needSuspend {
klog.Infof("graphd cluster %s is suspended, skip reconciling", componentName)
return nil
}

cm, cmHash, err := c.syncGraphdConfigMap(nc.DeepCopy())
if err != nil {
return err
Expand All @@ -101,7 +110,7 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
}

if err := c.syncNebulaClusterStatus(nc, newWorkload, oldWorkload); err != nil {
return fmt.Errorf("sync graphd cluster status status failed: %v", err)
return fmt.Errorf("sync graphd cluster status failed: %v", err)
}

if notExist {
Expand All @@ -116,7 +125,7 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
if err := c.clientSet.Workload().CreateWorkload(newWorkload); err != nil {
return err
}
nc.Status.Graphd.Workload = v1alpha1.WorkloadStatus{}
nc.Status.Graphd.Workload = &v1alpha1.WorkloadStatus{}
return utilerrors.ReconcileErrorf("waiting for graphd cluster %s running", newWorkload.GetName())
}

Expand Down
59 changes: 58 additions & 1 deletion pkg/controller/component/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ import (
"github.com/vesoft-inc/nebula-operator/pkg/util/maputil"
)

var suspendOrder = []v1alpha1.ComponentType{
v1alpha1.GraphdComponentType,
v1alpha1.StoragedComponentType,
v1alpha1.MetadComponentType,
}

const (
InPlaceGracePeriodSeconds = 60
)
Expand Down Expand Up @@ -72,7 +78,7 @@ func syncComponentStatus(
}

func setWorkloadStatus(obj *unstructured.Unstructured, status *v1alpha1.ComponentStatus) error {
workload := v1alpha1.WorkloadStatus{}
workload := &v1alpha1.WorkloadStatus{}
data, err := json.Marshal(extender.GetStatus(obj))
if err != nil {
return err
Expand Down Expand Up @@ -524,3 +530,54 @@ func syncPVC(
}
return nil
}

func suspendComponent(
workloadClient kube.Workload,
component v1alpha1.NebulaClusterComponent,
workload *unstructured.Unstructured) (bool, error) {
nc := component.GetNebulaCluster()
suspending := component.IsSuspending()
if !nc.IsSuspendEnabled() {
if suspending {
component.SetPhase(v1alpha1.RunningPhase)
return true, nil
}
klog.Infof("component %s is not needed to be suspended", component.GetName())
return false, nil
}
if !suspending {
if s, reason := canSuspendComponent(component); !s {
klog.Warningf("component %s can not be suspended: %s", component.GetName(), reason)
return false, nil
}
component.SetPhase(v1alpha1.SuspendPhase)
return true, nil
}
if workload != nil {
if err := workloadClient.DeleteWorkload(workload); err != nil {
return false, err
}
}
component.SetWorkloadStatus(nil)
return true, nil
}

func canSuspendComponent(component v1alpha1.NebulaClusterComponent) (bool, string) {
nc := component.GetNebulaCluster()
if component.GetPhase() != v1alpha1.RunningPhase && component.GetPhase() != v1alpha1.SuspendPhase {
return false, "component phase is not Running or Suspend"
}
for _, ct := range suspendOrder {
if ct == component.ComponentType() {
break
}
c, err := nc.ComponentByType(ct)
if err != nil {
return false, err.Error()
}
if !c.IsSuspended() {
return false, fmt.Sprintf("waiting for component %s to be suspended", ct)
}
}
return true, ""
}
11 changes: 10 additions & 1 deletion pkg/controller/component/metad_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error {
notExist := apierrors.IsNotFound(err)
oldWorkload := oldWorkloadTemp.DeepCopy()

needSuspend, err := suspendComponent(c.clientSet.Workload(), nc.MetadComponent(), oldWorkload)
if err != nil {
return fmt.Errorf("suspend metad cluster %s failed: %v", componentName, err)
}
if needSuspend {
klog.Infof("metad cluster %s is suspended, skip reconciling", componentName)
return nil
}

cm, cmHash, err := c.syncMetadConfigMap(nc.DeepCopy())
if err != nil {
return err
Expand Down Expand Up @@ -120,7 +129,7 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error {
if err := c.clientSet.Workload().CreateWorkload(newWorkload); err != nil {
return err
}
nc.Status.Metad.Workload = v1alpha1.WorkloadStatus{}
nc.Status.Metad.Workload = &v1alpha1.WorkloadStatus{}
return utilerrors.ReconcileErrorf("waiting for metad cluster %s running", newWorkload.GetName())
}

Expand Down
Loading