Skip to content

Commit

Permalink
[Enhancement] Use StarRocksCluster State to log errors when subContro…
Browse files Browse the repository at this point in the history
…ller apply failed

Signed-off-by: yandongxiao <[email protected]>
  • Loading branch information
yandongxiao committed Dec 15, 2023
1 parent fe2220b commit b92ff24
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 44 deletions.
13 changes: 11 additions & 2 deletions config/crd/bases/starrocks.com_starrocksclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .status.phase
name: Phase
type: string
- jsonPath: .status.starRocksFeStatus.phase
name: FeStatus
type: string
- jsonPath: .status.starRocksBeStatus.phase
name: BeStatus
type: string
- jsonPath: .status.starRocksCnStatus.phase
name: CnStatus
type: string
- jsonPath: .status.starRocksBeStatus.phase
name: BeStatus
- jsonPath: .status.starRocksFeProxyStatus.phase
name: FeProxyStatus
type: string
name: v1
schema:
Expand Down Expand Up @@ -5745,6 +5751,9 @@ spec:
description: 'Represents the state of cluster. the possible value
are: running, failed, pending'
type: string
reason:
description: Reason represents the errors when calling sub-controllers
type: string
starRocksBeStatus:
description: Represents the status of be. the status have running,
failed and creating pods.
Expand Down
12 changes: 10 additions & 2 deletions deploy/starrocks.com_starrocksclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .status.phase
name: Phase
type: string
- jsonPath: .status.starRocksFeStatus.phase
name: FeStatus
type: string
- jsonPath: .status.starRocksBeStatus.phase
name: BeStatus
type: string
- jsonPath: .status.starRocksCnStatus.phase
name: CnStatus
type: string
- jsonPath: .status.starRocksBeStatus.phase
name: BeStatus
- jsonPath: .status.starRocksFeProxyStatus.phase
name: FeProxyStatus
type: string
name: v1
schema:
Expand Down Expand Up @@ -2602,6 +2608,8 @@ spec:
properties:
phase:
type: string
reason:
type: string
starRocksBeStatus:
properties:
creatingInstances:
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/starrocks/v1/starrockscluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type StarRocksClusterStatus struct {
// Represents the state of cluster. the possible value are: running, failed, pending
Phase Phase `json:"phase"`

// Reason represents the errors when calling sub-controllers
Reason string `json:"reason,omitempty"`

// Represents the status of fe. the status have running, failed and creating pods.
StarRocksFeStatus *StarRocksFeStatus `json:"starRocksFeStatus,omitempty"`

Expand Down Expand Up @@ -242,9 +245,11 @@ type HorizontalScaler struct {
// +kubebuilder:resource:shortName=src
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="FeStatus",type=string,JSONPath=`.status.starRocksFeStatus.phase`
// +kubebuilder:printcolumn:name="CnStatus",type=string,JSONPath=`.status.starRocksCnStatus.phase`
// +kubebuilder:printcolumn:name="BeStatus",type=string,JSONPath=`.status.starRocksBeStatus.phase`
// +kubebuilder:printcolumn:name="CnStatus",type=string,JSONPath=`.status.starRocksCnStatus.phase`
// +kubebuilder:printcolumn:name="FeProxyStatus",type=string,JSONPath=`.status.starRocksFeProxyStatus.phase`
// +kubebuilder:storageversion
// +k8s:openapi-gen=true
// +genclient
Expand Down
73 changes: 35 additions & 38 deletions pkg/controllers/starrockscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package controllers

import (
"context"
"encoding/json"
"fmt"

"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
Expand All @@ -30,9 +31,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

srapi "github.com/StarRocks/starrocks-kubernetes-operator/pkg/apis/starrocks/v1"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/hash"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/feproxy"
)

const (
Expand Down Expand Up @@ -81,8 +85,8 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
src := esrc.DeepCopy()

// record the src updated or not by process.
oldHashValue := r.hashStarRocksCluster(src)
d, _ := json.Marshal(src)
fmt.Printf("old src: %v", string(d))
// reconcile src deleted
if !src.DeletionTimestamp.IsZero() {
logger.Info("deletion timestamp is not zero, clear StarRocksCluster related resources")
Expand All @@ -96,21 +100,23 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
logger.Info("sub controller sync spec", kvs...)
if err = rc.SyncCluster(ctx, src); err != nil {
logger.Error(err, "sub controller reconciles spec failed", kvs...)
handleSyncClusterError(src, rc, err)
if updateError := r.UpdateStarRocksClusterStatus(ctx, src); updateError != nil {
logger.Error(updateError, "failed to update StarRocksCluster Status")
}
return requeueIfError(err)
}
}

newHashValue := r.hashStarRocksCluster(src)
if oldHashValue != newHashValue {
logger.Info("the hash value of StarRocksCluster CR changed, send patch request to kubernetes")
return ctrl.Result{Requeue: true}, r.PatchStarRocksCluster(ctx, src)
}

for _, rc := range r.Scs {
kvs := []interface{}{"subController", rc.GetControllerName()}
logger.Info("sub controller update status", kvs...)
if err = rc.UpdateClusterStatus(ctx, src); err != nil {
logger.Error(err, "sub controller update status failed", kvs...)
handleSyncClusterError(src, rc, err)
if updateError := r.UpdateStarRocksClusterStatus(ctx, src); updateError != nil {
logger.Error(updateError, "failed to update StarRocksCluster Status")
}
return requeueIfError(err)
}
}
Expand Down Expand Up @@ -139,20 +145,6 @@ func (r *StarRocksClusterReconciler) UpdateStarRocksClusterStatus(ctx context.Co
})
}

// PatchStarRocksCluster patch spec, metadata
func (r *StarRocksClusterReconciler) PatchStarRocksCluster(ctx context.Context, src *srapi.StarRocksCluster) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var esrc srapi.StarRocksCluster
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: src.Namespace, Name: src.Name}, &esrc); err != nil {
return err
}

src.ResourceVersion = esrc.ResourceVersion

return k8sutils.PatchClientObject(ctx, r.Client, src)
})
}

// UpdateStarRocksCluster update the starrockscluster metadata, spec.
func (r *StarRocksClusterReconciler) UpdateStarRocksCluster(ctx context.Context, src *srapi.StarRocksCluster) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
Expand All @@ -168,22 +160,9 @@ func (r *StarRocksClusterReconciler) UpdateStarRocksCluster(ctx context.Context,
})
}

// hash the starrockscluster for check the crd modified or not.
func (r *StarRocksClusterReconciler) hashStarRocksCluster(src *srapi.StarRocksCluster) string {
type hashObject struct {
metav1.ObjectMeta
Spec srapi.StarRocksClusterSpec
}
ho := &hashObject{
ObjectMeta: src.ObjectMeta,
Spec: src.Spec,
}

return hash.HashObject(ho)
}

func (r *StarRocksClusterReconciler) reconcileStatus(_ context.Context, src *srapi.StarRocksCluster) {
src.Status.Phase = srapi.ClusterRunning
src.Status.Reason = ""
phase := GetPhaseFromComponent(&src.Status.StarRocksFeStatus.StarRocksComponentStatus)
if phase != "" {
src.Status.Phase = phase
Expand All @@ -205,6 +184,24 @@ func (r *StarRocksClusterReconciler) reconcileStatus(_ context.Context, src *sra
}
}

// handleSyncClusterError handle errors from sub-controller, and log it in StarRocksCluster Status
func handleSyncClusterError(src *srapi.StarRocksCluster, subController subcontrollers.ClusterSubController, err error) {
reason := err.Error()
switch subController.(type) {
case *fe.FeController:
reason = fmt.Sprintf("error from FE controller: %v", reason)
case *be.BeController:
reason = fmt.Sprintf("error from BE controller: %v", reason)
case *cn.CnController:
reason = fmt.Sprintf("error from CN controller: %v", reason)
case *feproxy.FeProxyController:
reason = fmt.Sprintf("error from fe-proxy controller: %v", reason)
}

src.Status.Phase = srapi.ClusterFailed
src.Status.Reason = reason
}

func requeueIfError(err error) (ctrl.Result, error) {
return ctrl.Result{}, err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/starrockswarehouse_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (r *StarRocksWarehouseReconciler) Reconcile(ctx context.Context, req ctrl.R
logger.Info("sub controller update warehouse status", kvs...)
if err = controller.UpdateWarehouseStatus(ctx, warehouse); err != nil {
logger.Error(err, "update warehouse status failed", kvs...)
warehouse.Status.Phase = srapi.ComponentFailed
warehouse.Status.Reason = err.Error()
if updateError := r.UpdateStarRocksWarehouseStatus(ctx, warehouse); updateError != nil {
logger.Error(err, "failed to update warehouse status")
}
return requeueIfError(err)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/subcontrollers/cn/cn_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (cc *CnController) ClearWarehouse(ctx context.Context, namespace string, na
func (cc *CnController) deployAutoScaler(ctx context.Context, object object.StarRocksObject, cnSpec *srapi.StarRocksCnSpec,
policy srapi.AutoScalingPolicy, target *appv1.StatefulSet) error {
logger := logr.FromContextOrDiscard(ctx)
logger.Info("create or update k8s hpa resource")

labels := rutils.Labels{}
labels.AddLabel(target.Labels)
Expand Down
3 changes: 2 additions & 1 deletion pkg/subcontrollers/subcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type ClusterSubController interface {
// GetControllerName return the controller name, beController, feController,cnController for log.
GetControllerName() string

// UpdateClusterStatus update the component status on src.
// UpdateClusterStatus only update the component status in src.
// todo(yandongxiao): better to rename it, because it does not send update request to k8s
UpdateClusterStatus(ctx context.Context, src *srapi.StarRocksCluster) error
}

Expand Down

0 comments on commit b92ff24

Please sign in to comment.