diff --git a/deploy/cluster/cluster_role.yaml b/deploy/cluster/cluster_role.yaml index 8aa1a3cca..0796f7f74 100644 --- a/deploy/cluster/cluster_role.yaml +++ b/deploy/cluster/cluster_role.yaml @@ -19,6 +19,7 @@ rules: - "" resources: - configmaps + - pods/exec - secrets - services - events diff --git a/deploy/namespace/role.yaml b/deploy/namespace/role.yaml index b92bca55d..6248880de 100644 --- a/deploy/namespace/role.yaml +++ b/deploy/namespace/role.yaml @@ -19,6 +19,7 @@ rules: - "" resources: - configmaps + - pods/exec - secrets - services - events diff --git a/go.sum b/go.sum index 7bdd898aa..9e41eb1a0 100644 --- a/go.sum +++ b/go.sum @@ -203,6 +203,7 @@ github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/docker/libnetwork v0.0.0-20180830151422-a9cd636e3789/go.mod h1:93m0aTqz6z+g32wla4l4WxTrdtvBRmVzYRkYvasA5Z8= github.com/docker/libtrust v0.0.0-20150526203908-9cbd2a1374f4/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= +github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c h1:ZfSZ3P3BedhKGUhzj7BQlPSU4OvT6tfOKe3DVHzOA7s= github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -211,7 +212,9 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= +github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f h1:8GDPb0tCY8LQ+OJ3dbHb5sA6YZWXFORQYZx5sdsTlMs= github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= +github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f h1:AUj1VoZUfhPhOPHULCQQDnGhRelpFWHMLhQVWDsS0v4= github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.3+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git a/hack/e2e.sh b/hack/e2e.sh index 4dda36096..6eb1349b1 100644 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -24,6 +24,8 @@ if [[ -z $TEST_TIMEOUT ]]; then fi echo "run e2e tests..." -echo "cd ${GOPATH}/src/${REPO_PATH} && ginkgo -v --skip=${GINKGO_SKIP} --timeout=${TEST_TIMEOUT} test/e2e/... -- --rename-command-path=${GOPATH}/src/${REPO_PATH}/test/e2e --rename-command-file=rename.conf" -cd ${GOPATH}/src/${REPO_PATH} && ginkgo -v --skip=${GINKGO_SKIP} --timeout=${TEST_TIMEOUT} test/e2e/... -- --rename-command-path=${GOPATH}/src/${REPO_PATH}/test/e2e --rename-command-file=rename.conf +e2ecmd="cd ${GOPATH}/src/${REPO_PATH} && ginkgo -v --mod=vendor --failFast --skip=${GINKGO_SKIP} --timeout=${TEST_TIMEOUT} test/e2e/... -- --rename-command-path=${GOPATH}/src/${REPO_PATH}/test/e2e --rename-command-file=rename.conf" +echo "${e2ecmd}" +eval "${e2ecmd}" + diff --git a/pkg/apis/redis/v1alpha1/constants.go b/pkg/apis/redis/v1alpha1/constants.go index 410ab52e8..6be886f7d 100644 --- a/pkg/apis/redis/v1alpha1/constants.go +++ b/pkg/apis/redis/v1alpha1/constants.go @@ -45,6 +45,8 @@ const ( ClusterStatusRebalancing ClusterStatus = "Rebalancing" // ClusterStatusRollingUpdate ClusterStatus RollingUpdate ClusterStatusRollingUpdate ClusterStatus = "RollingUpdate" + // ClusterStatusResetPassword ClusterStatus ResetPassword + ClusterStatusResetPassword ClusterStatus = "ResetPassword" ) // NodesPlacementInfo Redis Nodes placement mode information diff --git a/pkg/apis/redis/v1alpha1/distributedrediscluster_types.go b/pkg/apis/redis/v1alpha1/distributedrediscluster_types.go index 7403846f2..4f8472fee 100644 --- a/pkg/apis/redis/v1alpha1/distributedrediscluster_types.go +++ b/pkg/apis/redis/v1alpha1/distributedrediscluster_types.go @@ -15,22 +15,25 @@ type DistributedRedisClusterSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html - Image string `json:"image,omitempty"` - Command []string `json:"command,omitempty"` - MasterSize int32 `json:"masterSize,omitempty"` - ClusterReplicas int32 `json:"clusterReplicas,omitempty"` - ServiceName string `json:"serviceName,omitempty"` - Config map[string]string `json:"config,omitempty"` - Affinity *corev1.Affinity `json:"affinity,omitempty"` - NodeSelector map[string]string `json:"nodeSelector,omitempty"` - ToleRations []corev1.Toleration `json:"toleRations,omitempty"` - SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` - Storage *RedisStorage `json:"storage,omitempty"` - Resources *corev1.ResourceRequirements `json:"resources,omitempty"` - PasswordSecret *corev1.LocalObjectReference `json:"passwordSecret,omitempty"` - Monitor *AgentSpec `json:"monitor,omitempty"` - Init *InitSpec `json:"init,omitempty"` + Image string `json:"image,omitempty"` + ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` + ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + Command []string `json:"command,omitempty"` + Env []corev1.EnvVar `json:"env,omitempty"` + MasterSize int32 `json:"masterSize,omitempty"` + ClusterReplicas int32 `json:"clusterReplicas,omitempty"` + ServiceName string `json:"serviceName,omitempty"` + Config map[string]string `json:"config,omitempty"` + Affinity *corev1.Affinity `json:"affinity,omitempty"` + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + ToleRations []corev1.Toleration `json:"toleRations,omitempty"` + SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + Storage *RedisStorage `json:"storage,omitempty"` + Resources *corev1.ResourceRequirements `json:"resources,omitempty"` + PasswordSecret *corev1.LocalObjectReference `json:"passwordSecret,omitempty"` + Monitor *AgentSpec `json:"monitor,omitempty"` + Init *InitSpec `json:"init,omitempty"` } type AgentSpec struct { diff --git a/pkg/apis/redis/v1alpha1/redisclusterbackup_types.go b/pkg/apis/redis/v1alpha1/redisclusterbackup_types.go index 188a274aa..bc9fac315 100644 --- a/pkg/apis/redis/v1alpha1/redisclusterbackup_types.go +++ b/pkg/apis/redis/v1alpha1/redisclusterbackup_types.go @@ -16,11 +16,12 @@ const ( // RedisClusterBackupSpec defines the desired state of RedisClusterBackup // +k8s:openapi-gen=true type RedisClusterBackupSpec struct { - Image string `json:"image,omitempty"` - RedisClusterName string `json:"redisClusterName"` - Storage *RedisStorage `json:"storage,omitempty"` - store.Backend `json:",inline"` - PodSpec *PodSpec `json:"podSpec,omitempty"` + Image string `json:"image,omitempty"` + RedisClusterName string `json:"redisClusterName"` + Storage *RedisStorage `json:"storage,omitempty"` + store.Backend `json:",inline"` + PodSpec *PodSpec `json:"podSpec,omitempty"` + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` } type PodSpec struct { diff --git a/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go b/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go index 6a41e518d..cdac41db7 100644 --- a/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go +++ b/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go @@ -8,7 +8,11 @@ import ( "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + runtimeschema "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -22,8 +26,10 @@ import ( "github.com/ucloud/redis-cluster-operator/pkg/config" "github.com/ucloud/redis-cluster-operator/pkg/controller/heal" clustermanger "github.com/ucloud/redis-cluster-operator/pkg/controller/manager" + "github.com/ucloud/redis-cluster-operator/pkg/exec" "github.com/ucloud/redis-cluster-operator/pkg/k8sutil" "github.com/ucloud/redis-cluster-operator/pkg/redisutil" + "github.com/ucloud/redis-cluster-operator/pkg/resources/statefulsets" "github.com/ucloud/redis-cluster-operator/pkg/utils" ) @@ -55,11 +61,22 @@ func FlagSet() *pflag.FlagSet { // Add creates a new DistributedRedisCluster Controller and adds it to the Manager. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager) error { - return add(mgr, newReconciler(mgr)) + gvk := runtimeschema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + } + restClient, err := apiutil.RESTClientForGVK(gvk, mgr.GetConfig(), serializer.NewCodecFactory(scheme.Scheme)) + if err != nil { + return err + } + execer := exec.NewRemoteExec(restClient, mgr.GetConfig(), log) + + return add(mgr, newReconciler(mgr, execer)) } // newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager) reconcile.Reconciler { +func newReconciler(mgr manager.Manager, execer exec.IExec) reconcile.Reconciler { reconiler := &ReconcileDistributedRedisCluster{client: mgr.GetClient(), scheme: mgr.GetScheme()} reconiler.statefulSetController = k8sutil.NewStatefulSetController(reconiler.client) reconiler.serviceController = k8sutil.NewServiceController(reconiler.client) @@ -68,6 +85,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { reconiler.crController = k8sutil.NewCRControl(reconiler.client) reconiler.ensurer = clustermanger.NewEnsureResource(reconiler.client, log) reconiler.checker = clustermanger.NewCheck(reconiler.client) + reconiler.execer = execer return reconiler } @@ -136,6 +154,7 @@ type ReconcileDistributedRedisCluster struct { scheme *runtime.Scheme ensurer clustermanger.IEnsureResource checker clustermanger.ICheck + execer exec.IExec statefulSetController k8sutil.IStatefulSetControl serviceController k8sutil.IServiceControl pdbController k8sutil.IPodDisruptionBudgetControl @@ -208,7 +227,7 @@ func (r *ReconcileDistributedRedisCluster) Reconcile(request reconcile.Request) return reconcile.Result{RequeueAfter: requeueAfter}, nil } - password, err := getClusterPassword(r.client, instance) + password, err := statefulsets.GetClusterPassword(r.client, instance) if err != nil { return reconcile.Result{}, Kubernetes.Wrap(err, "getClusterPassword") } diff --git a/pkg/controller/distributedrediscluster/helper.go b/pkg/controller/distributedrediscluster/helper.go index 34790b492..6df04bba7 100644 --- a/pkg/controller/distributedrediscluster/helper.go +++ b/pkg/controller/distributedrediscluster/helper.go @@ -1,15 +1,12 @@ package distributedrediscluster import ( - "context" "fmt" "net" "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" redisv1alpha1 "github.com/ucloud/redis-cluster-operator/pkg/apis/redis/v1alpha1" "github.com/ucloud/redis-cluster-operator/pkg/config" @@ -24,8 +21,6 @@ var ( } ) -const passwordKey = "password" - func getLabels(cluster *redisv1alpha1.DistributedRedisCluster) map[string]string { dynLabels := map[string]string{ redisv1alpha1.LabelClusterName: cluster.Name, @@ -33,21 +28,6 @@ func getLabels(cluster *redisv1alpha1.DistributedRedisCluster) map[string]string return utils.MergeLabels(defaultLabels, dynLabels, cluster.Labels) } -func getClusterPassword(client client.Client, cluster *redisv1alpha1.DistributedRedisCluster) (string, error) { - if cluster.Spec.PasswordSecret == nil { - return "", nil - } - secret := &corev1.Secret{} - err := client.Get(context.TODO(), types.NamespacedName{ - Name: cluster.Spec.PasswordSecret.Name, - Namespace: cluster.Namespace, - }, secret) - if err != nil { - return "", err - } - return string(secret.Data[passwordKey]), nil -} - // newRedisAdmin builds and returns new redis.Admin from the list of pods func newRedisAdmin(pods []*corev1.Pod, password string, cfg *config.Redis, reqLogger logr.Logger) (redisutil.IAdmin, error) { nodesAddrs := []string{} diff --git a/pkg/controller/distributedrediscluster/status.go b/pkg/controller/distributedrediscluster/status.go index 44df692c8..ff83c461f 100644 --- a/pkg/controller/distributedrediscluster/status.go +++ b/pkg/controller/distributedrediscluster/status.go @@ -38,6 +38,11 @@ func SetClusterUpdating(status *redisv1alpha1.DistributedRedisClusterStatus, rea status.Reason = reason } +func SetClusterResetPassword(status *redisv1alpha1.DistributedRedisClusterStatus, reason string) { + status.Status = redisv1alpha1.ClusterStatusResetPassword + status.Reason = reason +} + func buildClusterStatus(clusterInfos *redisutil.ClusterInfos, pods []*corev1.Pod, cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) *redisv1alpha1.DistributedRedisClusterStatus { oldStatus := cluster.Status diff --git a/pkg/controller/distributedrediscluster/sync_handler.go b/pkg/controller/distributedrediscluster/sync_handler.go index 8ae48cdb2..d2968c6ae 100644 --- a/pkg/controller/distributedrediscluster/sync_handler.go +++ b/pkg/controller/distributedrediscluster/sync_handler.go @@ -8,6 +8,7 @@ import ( corev1 "k8s.io/api/core/v1" redisv1alpha1 "github.com/ucloud/redis-cluster-operator/pkg/apis/redis/v1alpha1" + "github.com/ucloud/redis-cluster-operator/pkg/config" "github.com/ucloud/redis-cluster-operator/pkg/controller/clustering" "github.com/ucloud/redis-cluster-operator/pkg/controller/manager" "github.com/ucloud/redis-cluster-operator/pkg/k8sutil" @@ -45,6 +46,11 @@ func (r *ReconcileDistributedRedisCluster) ensureCluster(ctx *syncContext) error if err := r.ensurer.EnsureRedisConfigMap(cluster, labels); err != nil { return Kubernetes.Wrap(err, "EnsureRedisConfigMap") } + + if err := r.resetClusterPassword(ctx); err != nil { + return Cluster.Wrap(err, "ResetPassword") + } + if updated, err := r.ensurer.EnsureRedisStatefulsets(cluster, labels); err != nil { ctx.reqLogger.Error(err, "EnsureRedisStatefulSets") return Kubernetes.Wrap(err, "EnsureRedisStatefulSets") @@ -306,3 +312,57 @@ func (r *ReconcileDistributedRedisCluster) scalingDown(ctx *syncContext, current } return nil } + +func (r *ReconcileDistributedRedisCluster) resetClusterPassword(ctx *syncContext) error { + if err := r.checker.CheckRedisNodeNum(ctx.cluster); err == nil { + namespace := ctx.cluster.Namespace + name := ctx.cluster.Name + sts, err := r.statefulSetController.GetStatefulSet(namespace, statefulsets.ClusterStatefulSetName(name, 0)) + if err != nil { + return err + } + + if !statefulsets.IsPasswordChanged(ctx.cluster, sts) { + return nil + } + + SetClusterResetPassword(&ctx.cluster.Status, "updating cluster's password") + r.crController.UpdateCRStatus(ctx.cluster) + + matchLabels := getLabels(ctx.cluster) + redisClusterPods, err := r.statefulSetController.GetStatefulSetPodsByLabels(namespace, matchLabels) + if err != nil { + return err + } + + oldPassword, err := statefulsets.GetOldRedisClusterPassword(r.client, sts) + if err != nil { + return err + } + + newPassword, err := statefulsets.GetClusterPassword(r.client, ctx.cluster) + if err != nil { + return err + } + + podSet := clusterPods(redisClusterPods.Items) + admin, err := newRedisAdmin(podSet, oldPassword, config.RedisConf(), ctx.reqLogger) + if err != nil { + return err + } + defer admin.Close() + + // Update the password recorded in the file /etc/redis_password, redis pod preStop hook + // need /etc/redis_password do CLUSTER FAILOVER + cmd := fmt.Sprintf("echo %s > /etc/redis_password", newPassword) + if err := r.execer.ExecCommandInPodSet(podSet, "/bin/sh", "-c", cmd); err != nil { + return err + } + + // Reset all redis pod's password. + if err := admin.ResetPassword(newPassword); err != nil { + return err + } + } + return nil +} diff --git a/pkg/controller/manager/checker.go b/pkg/controller/manager/checker.go index 88cc0216f..391cb6964 100644 --- a/pkg/controller/manager/checker.go +++ b/pkg/controller/manager/checker.go @@ -49,6 +49,9 @@ func (c *realCheck) checkRedisNodeNum(expectNodeNum int32, ss *appsv1.StatefulSe if expectNodeNum != ss.Status.ReadyReplicas { return fmt.Errorf("redis pods are not all ready") } + if expectNodeNum != ss.Status.CurrentReplicas { + return fmt.Errorf("redis pods need to be updated") + } return nil } diff --git a/pkg/controller/manager/ensurer.go b/pkg/controller/manager/ensurer.go index 390e20f3c..2070e6620 100644 --- a/pkg/controller/manager/ensurer.go +++ b/pkg/controller/manager/ensurer.go @@ -5,7 +5,6 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" @@ -101,15 +100,9 @@ func shouldUpdateRedis(cluster *redisv1alpha1.DistributedRedisCluster, sts *apps if cluster.Spec.Image != sts.Spec.Template.Spec.Containers[0].Image { return true } - if cluster.Spec.PasswordSecret != nil { - envSet := sts.Spec.Template.Spec.Containers[0].Env - secretName := getSecretKeyRefByKey(redisv1alpha1.PasswordENV, envSet) - if secretName == "" { - return true - } - if secretName != cluster.Spec.PasswordSecret.Name { - return true - } + + if statefulsets.IsPasswordChanged(cluster, sts) { + return true } expectResource := cluster.Spec.Resources @@ -129,17 +122,6 @@ func shouldUpdateRedis(cluster *redisv1alpha1.DistributedRedisCluster, sts *apps return false } -func getSecretKeyRefByKey(key string, envSet []corev1.EnvVar) string { - for _, value := range envSet { - if key == value.Name { - if value.ValueFrom != nil && value.ValueFrom.SecretKeyRef != nil { - return value.ValueFrom.SecretKeyRef.Name - } - } - } - return "" -} - func (r *realEnsureResource) ensureRedisPDB(cluster *redisv1alpha1.DistributedRedisCluster, name string, labels map[string]string) error { _, err := r.pdbClient.GetPodDisruptionBudget(cluster.Namespace, name) if err != nil && errors.IsNotFound(err) { diff --git a/pkg/controller/redisclusterbackup/sync_handler.go b/pkg/controller/redisclusterbackup/sync_handler.go index 864e9c490..6d97e70ef 100644 --- a/pkg/controller/redisclusterbackup/sync_handler.go +++ b/pkg/controller/redisclusterbackup/sync_handler.go @@ -243,6 +243,7 @@ func (r *ReconcileRedisClusterBackup) getBackupJob(reqLogger logr.Logger, backup }, }, Spec: batchv1.JobSpec{ + ActiveDeadlineSeconds: backup.Spec.ActiveDeadlineSeconds, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: containers, diff --git a/pkg/exec/exec.go b/pkg/exec/exec.go new file mode 100644 index 000000000..643465c9a --- /dev/null +++ b/pkg/exec/exec.go @@ -0,0 +1,129 @@ +package exec + +import ( + "bytes" + "io" + "net/url" + "strings" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +// IExec is an injectable interface for running remote exec commands. +type IExec interface { + // ExecCommandInPodSet exec cmd in pod set. + ExecCommandInPodSet(podSet []*corev1.Pod, cmd ...string) error +} + +type remoteExec struct { + restGVKClient rest.Interface + logger logr.Logger + config *rest.Config +} + +// NewRemoteExec returns a new IExec which will exec remote cmd. +func NewRemoteExec(restGVKClient rest.Interface, config *rest.Config, logger logr.Logger) IExec { + return &remoteExec{ + restGVKClient: restGVKClient, + logger: logger, + config: config, + } +} + +// ExecOptions passed to ExecWithOptions. +type ExecOptions struct { + Command []string + + Namespace string + PodName string + ContainerName string + + Stdin io.Reader + CaptureStdout bool + CaptureStderr bool + // If false, whitespace in std{err,out} will be removed. + PreserveWhitespace bool +} + +// ExecCommandInPodSet implements IExec interface. +func (e *remoteExec) ExecCommandInPodSet(podSet []*corev1.Pod, cmd ...string) error { + for _, pod := range podSet { + if _, err := e.ExecCommandInContainer(pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, cmd...); err != nil { + return err + } + } + return nil +} + +// ExecCommandInContainer executes a command in the specified container. +func (e *remoteExec) ExecCommandInContainer(namespace, podName, containerName string, cmd ...string) (string, error) { + stdout, stderr, err := e.ExecCommandInContainerWithFullOutput(namespace, podName, containerName, cmd...) + if stderr != "" { + e.logger.Info("ExecCommand", "command", cmd, "stderr", stderr) + } + return stdout, err +} + +// ExecCommandInContainerWithFullOutput executes a command in the +// specified container and return stdout, stderr and error +func (e *remoteExec) ExecCommandInContainerWithFullOutput(namespace, podName, containerName string, cmd ...string) (string, string, error) { + return e.ExecWithOptions(ExecOptions{ + Command: cmd, + Namespace: namespace, + PodName: podName, + ContainerName: containerName, + + Stdin: nil, + CaptureStdout: true, + CaptureStderr: true, + PreserveWhitespace: false, + }) +} + +// ExecWithOptions executes a command in the specified container, +// returning stdout, stderr and error. `options` allowed for +// additional parameters to be passed. +func (e *remoteExec) ExecWithOptions(options ExecOptions) (string, string, error) { + const tty = false + + req := e.restGVKClient.Post(). + Resource("pods"). + Name(options.PodName). + Namespace(options.Namespace). + SubResource("exec"). + Param("container", options.ContainerName) + + req.VersionedParams(&corev1.PodExecOptions{ + Container: options.ContainerName, + Command: options.Command, + Stdin: options.Stdin != nil, + Stdout: options.CaptureStdout, + Stderr: options.CaptureStderr, + TTY: tty, + }, scheme.ParameterCodec) + + var stdout, stderr bytes.Buffer + err := execute("POST", req.URL(), e.config, options.Stdin, &stdout, &stderr, tty) + + if options.PreserveWhitespace { + return stdout.String(), stderr.String(), err + } + return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err +} + +func execute(method string, url *url.URL, config *rest.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + exec, err := remotecommand.NewSPDYExecutor(config, method, url) + if err != nil { + return err + } + return exec.Stream(remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + }) +} diff --git a/pkg/redisutil/admin.go b/pkg/redisutil/admin.go index dad0a8c3b..0756d1910 100644 --- a/pkg/redisutil/admin.go +++ b/pkg/redisutil/admin.go @@ -71,6 +71,8 @@ type IAdmin interface { FlushAndReset(addr string, mode string) error // GetHashMaxSlot get the max slot value GetHashMaxSlot() Slot + // ResetPassword reset redis node masterauth and requirepass. + ResetPassword(newPassword string) error } // AdminOptions optional options for redis admin @@ -521,3 +523,23 @@ func (a *Admin) FlushAndReset(addr string, mode string) error { return nil } + +// ResetPassword reset redis node masterauth and requirepass. +func (a *Admin) ResetPassword(newPassword string) error { + all := a.Connections().GetAll() + if len(all) == 0 { + return fmt.Errorf("no connection for other redis-node found") + } + for addr, c := range a.Connections().GetAll() { + a.log.Info("reset password", "addr", addr) + setMasterauth := c.Cmd("CONFIG", "SET", "masterauth", newPassword) + if err := a.Connections().ValidateResp(setMasterauth, addr, "cannot set new masterauth"); err != nil { + return err + } + setPasswdResp := c.Cmd("CONFIG", "SET", "requirepass", newPassword) + if err := a.Connections().ValidateResp(setPasswdResp, addr, "cannot set new requirepass"); err != nil { + return err + } + } + return nil +} diff --git a/pkg/resources/configmaps/configmap.go b/pkg/resources/configmaps/configmap.go index c02165fa5..07b05bef7 100644 --- a/pkg/resources/configmaps/configmap.go +++ b/pkg/resources/configmaps/configmap.go @@ -24,18 +24,23 @@ func NewConfigMapForCR(cluster *redisv1alpha1.DistributedRedisCluster, labels ma shutdownContent := `#!/bin/sh CLUSTER_CONFIG="/data/nodes.conf" failover() { - echo "Do CLUSTER FAILOVER" - masterID=$(cat ${CLUSTER_CONFIG} | grep "myself" | awk '{print $1}') - echo "Master: ${masterID}" - slave=$(cat ${CLUSTER_CONFIG} | grep ${masterID} | grep "slave" | awk 'NR==1{print $2}' | sed 's/:6379@16379//') - echo "Slave: ${slave}" - redis-cli -h ${slave} -a "${REDIS_PASSWORD}" CLUSTER FAILOVER + echo "Do CLUSTER FAILOVER" + masterID=$(cat ${CLUSTER_CONFIG} | grep "myself" | awk '{print $1}') + echo "Master: ${masterID}" + slave=$(cat ${CLUSTER_CONFIG} | grep ${masterID} | grep "slave" | awk 'NR==1{print $2}' | sed 's/:6379@16379//') + echo "Slave: ${slave}" + password=$(cat /etc/redis_password) + if [[ -z "${password}" ]]; then + redis-cli -h ${slave} CLUSTER FAILOVER + else + redis-cli -h ${slave} -a "${password}" CLUSTER FAILOVER + fi echo "Wait for MASTER <-> SLAVE syncFinished" sleep 20 } if [ -f ${CLUSTER_CONFIG} ]; then - cat ${CLUSTER_CONFIG} | grep "myself" | grep "master" && \ - failover + cat ${CLUSTER_CONFIG} | grep "myself" | grep "master" && \ + failover fi` // Fixed Nodes.conf does not update IP address of a node when IP changes after restart, diff --git a/pkg/resources/statefulsets/helper.go b/pkg/resources/statefulsets/helper.go new file mode 100644 index 000000000..d96f37f1d --- /dev/null +++ b/pkg/resources/statefulsets/helper.go @@ -0,0 +1,74 @@ +package statefulsets + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + redisv1alpha1 "github.com/ucloud/redis-cluster-operator/pkg/apis/redis/v1alpha1" +) + +const passwordKey = "password" + +// IsPasswordChanged determine whether the password is changed. +func IsPasswordChanged(cluster *redisv1alpha1.DistributedRedisCluster, sts *appsv1.StatefulSet) bool { + if cluster.Spec.PasswordSecret != nil { + envSet := sts.Spec.Template.Spec.Containers[0].Env + secretName := getSecretKeyRefByKey(redisv1alpha1.PasswordENV, envSet) + if secretName == "" { + return true + } + if secretName != cluster.Spec.PasswordSecret.Name { + return true + } + } + return false +} + +func getSecretKeyRefByKey(key string, envSet []corev1.EnvVar) string { + for _, value := range envSet { + if key == value.Name { + if value.ValueFrom != nil && value.ValueFrom.SecretKeyRef != nil { + return value.ValueFrom.SecretKeyRef.Name + } + } + } + return "" +} + +// GetOldRedisClusterPassword return old redis cluster's password. +func GetOldRedisClusterPassword(client client.Client, sts *appsv1.StatefulSet) (string, error) { + envSet := sts.Spec.Template.Spec.Containers[0].Env + secretName := getSecretKeyRefByKey(redisv1alpha1.PasswordENV, envSet) + if secretName == "" { + return "", nil + } + secret := &corev1.Secret{} + err := client.Get(context.TODO(), types.NamespacedName{ + Name: secretName, + Namespace: sts.Namespace, + }, secret) + if err != nil { + return "", err + } + return string(secret.Data[passwordKey]), nil +} + +// GetClusterPassword return current redis cluster's password. +func GetClusterPassword(client client.Client, cluster *redisv1alpha1.DistributedRedisCluster) (string, error) { + if cluster.Spec.PasswordSecret == nil { + return "", nil + } + secret := &corev1.Secret{} + err := client.Get(context.TODO(), types.NamespacedName{ + Name: cluster.Spec.PasswordSecret.Name, + Namespace: cluster.Namespace, + }, secret) + if err != nil { + return "", err + } + return string(secret.Data[passwordKey]), nil +} diff --git a/pkg/resources/statefulsets/statefulset.go b/pkg/resources/statefulsets/statefulset.go index fdbece093..2591f1889 100644 --- a/pkg/resources/statefulsets/statefulset.go +++ b/pkg/resources/statefulsets/statefulset.go @@ -60,10 +60,11 @@ func NewStatefulSetForCR(cluster *redisv1alpha1.DistributedRedisCluster, ssName, Annotations: cluster.Spec.Annotations, }, Spec: corev1.PodSpec{ - Affinity: getAffinity(spec.Affinity, labels), - Tolerations: spec.ToleRations, - SecurityContext: spec.SecurityContext, - NodeSelector: cluster.Spec.NodeSelector, + ImagePullSecrets: cluster.Spec.ImagePullSecrets, + Affinity: getAffinity(spec.Affinity, labels), + Tolerations: spec.ToleRations, + SecurityContext: spec.SecurityContext, + NodeSelector: cluster.Spec.NodeSelector, Containers: []corev1.Container{ redisServerContainer(cluster, password), }, @@ -201,8 +202,9 @@ func redisServerContainer(cluster *redisv1alpha1.DistributedRedisCluster, passwo probeArg := "redis-cli -h $(hostname) ping" container := corev1.Container{ - Name: redisServerName, - Image: cluster.Spec.Image, + Name: redisServerName, + Image: cluster.Spec.Image, + ImagePullPolicy: cluster.Spec.ImagePullPolicy, Ports: []corev1.ContainerPort{ { Name: "client", @@ -256,6 +258,11 @@ func redisServerContainer(cluster *redisv1alpha1.DistributedRedisCluster, passwo Resources: *cluster.Spec.Resources, // TODO store redis data when pod stop Lifecycle: &corev1.Lifecycle{ + PostStart: &corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo ${REDIS_PASSWORD} > /etc/redis_password"}, + }, + }, PreStop: &corev1.Handler{ Exec: &corev1.ExecAction{ Command: []string{"/bin/sh", "/conf/shutdown.sh"}, @@ -268,6 +275,8 @@ func redisServerContainer(cluster *redisv1alpha1.DistributedRedisCluster, passwo container.Env = append(container.Env, *password) } + container.Env = customContainerEnv(container.Env, cluster.Spec.Env) + return container } @@ -294,6 +303,9 @@ func redisExporterContainer(cluster *redisv1alpha1.DistributedRedisCluster, pass if password != nil { container.Env = append(container.Env, *password) } + + container.Env = customContainerEnv(container.Env, cluster.Spec.Env) + return container } @@ -375,9 +387,16 @@ func redisInitContainer(cluster *redisv1alpha1.DistributedRedisCluster, password container.Lifecycle = backup.Spec.PodSpec.Lifecycle } + container.Env = customContainerEnv(container.Env, cluster.Spec.Env) + return container, nil } +func customContainerEnv(env []corev1.EnvVar, customEnv []corev1.EnvVar) []corev1.EnvVar { + env = append(env, customEnv...) + return env +} + func volumeMounts() []corev1.VolumeMount { return []corev1.VolumeMount{ { diff --git a/pkg/resources/statefulsets/statefulset_test.go b/pkg/resources/statefulsets/statefulset_test.go index d6b60415b..5fc45b734 100644 --- a/pkg/resources/statefulsets/statefulset_test.go +++ b/pkg/resources/statefulsets/statefulset_test.go @@ -3,6 +3,8 @@ package statefulsets import ( "reflect" "testing" + + corev1 "k8s.io/api/core/v1" ) func Test_mergeRenameCmds(t *testing.T) { @@ -89,3 +91,87 @@ func Test_mergeRenameCmds(t *testing.T) { }) } } + +func Test_customContainerEnv(t *testing.T) { + type args struct { + env []corev1.EnvVar + customEnv []corev1.EnvVar + } + tests := []struct { + name string + args args + want []corev1.EnvVar + }{ + { + name: "nil all", + args: args{ + env: nil, + customEnv: nil, + }, + want: nil, + }, + { + name: "nil env", + args: args{ + env: nil, + customEnv: []corev1.EnvVar{{ + Name: "foo", + Value: "", + ValueFrom: nil, + }}, + }, + want: []corev1.EnvVar{{ + Name: "foo", + Value: "", + ValueFrom: nil, + }}, + }, + { + name: "nil custom env", + args: args{ + customEnv: nil, + env: []corev1.EnvVar{{ + Name: "foo", + Value: "", + ValueFrom: nil, + }}, + }, + want: []corev1.EnvVar{{ + Name: "foo", + Value: "", + ValueFrom: nil, + }}, + }, + { + name: "env for bar", + args: args{ + env: []corev1.EnvVar{{ + Name: "foo", + Value: "", + ValueFrom: nil, + }}, + customEnv: []corev1.EnvVar{{ + Name: "bar", + Value: "", + ValueFrom: nil, + }}, + }, + want: []corev1.EnvVar{{ + Name: "foo", + Value: "", + ValueFrom: nil, + }, { + Name: "bar", + Value: "", + ValueFrom: nil, + }}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := customContainerEnv(tt.args.env, tt.args.customEnv); !reflect.DeepEqual(got, tt.want) { + t.Errorf("customContainerEnv() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/test/e2e/drc/drc_test.go b/test/e2e/drc/drc_test.go index 5c57f4dae..6b335adea 100644 --- a/test/e2e/drc/drc_test.go +++ b/test/e2e/drc/drc_test.go @@ -1,6 +1,8 @@ package drc_test import ( + "time" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -18,7 +20,7 @@ var _ = Describe("DistributedRedisCluster CRUD", func() { name := e2e.RandString(8) password := e2e.RandString(8) drc = e2e.NewDistributedRedisCluster(name, f.Namespace(), e2e.Redis5_0_4, f.PasswordName(), 3, 1) - Ω(f.CreateRedisClusterPassword(password)).Should(Succeed()) + Ω(f.CreateRedisClusterPassword(f.PasswordName(), password)).Should(Succeed()) Ω(f.CreateRedisCluster(drc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) goredis = e2e.NewGoRedisClient(name, f.Namespace(), password) @@ -58,6 +60,16 @@ var _ = Describe("DistributedRedisCluster CRUD", func() { Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) }) + It("should reset the DistributedRedisCluster password", func() { + newPassword := e2e.RandString(8) + Ω(f.CreateRedisClusterPassword(f.NewPasswordName(), newPassword)).Should(Succeed()) + e2e.ResetPassword(drc, f.NewPasswordName()) + Ω(f.UpdateRedisCluster(drc)).Should(Succeed()) + time.Sleep(5 * time.Second) + Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) + goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), newPassword) + Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) + }) It("should update the DistributedRedisCluster minor version", func() { e2e.RollingUpdateDRC(drc) Ω(f.UpdateRedisCluster(drc)).Should(Succeed()) diff --git a/test/e2e/drcb/drcb_test.go b/test/e2e/drcb/drcb_test.go index bb5a38e18..c04d785da 100644 --- a/test/e2e/drcb/drcb_test.go +++ b/test/e2e/drcb/drcb_test.go @@ -2,9 +2,11 @@ package drcb_test import ( "os" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/ucloud/redis-cluster-operator/test/e2e" ) @@ -19,7 +21,7 @@ var _ = Describe("Restore DistributedRedisCluster From RedisClusterBackup", func name := e2e.RandString(8) password := e2e.RandString(8) drc = e2e.NewDistributedRedisCluster(name, f.Namespace(), e2e.Redis5_0_4, f.PasswordName(), 3, 1) - Ω(f.CreateRedisClusterPassword(password)).Should(Succeed()) + Ω(f.CreateRedisClusterPassword(f.PasswordName(), password)).Should(Succeed()) Ω(f.CreateRedisCluster(drc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) goredis = e2e.NewGoRedisClient(name, f.Namespace(), password) @@ -81,6 +83,16 @@ var _ = Describe("Restore DistributedRedisCluster From RedisClusterBackup", func Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) }) + It("should reset the DistributedRedisCluster password", func() { + newPassword := e2e.RandString(8) + Ω(f.CreateRedisClusterPassword(f.NewPasswordName(), newPassword)).Should(Succeed()) + e2e.ResetPassword(rdrc, f.NewPasswordName()) + Ω(f.UpdateRedisCluster(rdrc)).Should(Succeed()) + time.Sleep(5 * time.Second) + Eventually(e2e.IsDistributedRedisClusterProperly(f, rdrc), "10m", "10s").ShouldNot(HaveOccurred()) + goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), newPassword) + Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) + }) It("should update the DistributedRedisCluster minor version", func() { e2e.RollingUpdateDRC(rdrc) Ω(f.UpdateRedisCluster(rdrc)).Should(Succeed()) diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 034175edf..e640c97d4 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -114,8 +114,7 @@ func (f *Framework) CreateRedisClusterBackup(instance *redisv1alpha1.RedisCluste } // CreateRedisClusterPassword creates a password for DistributedRedisCluster -func (f *Framework) CreateRedisClusterPassword(password string) error { - name := f.PasswordName() +func (f *Framework) CreateRedisClusterPassword(name, password string) error { f.Logf("Creating DistributedRedisCluster secret %s", name) secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -276,6 +275,10 @@ func (f *Framework) PasswordName() string { return fmt.Sprintf("redis-admin-passwd") } +func (f *Framework) NewPasswordName() string { + return fmt.Sprintf("redis-admin-newpasswd") +} + func (f *Framework) S3SecretName() string { return fmt.Sprintf("s3-secret") } diff --git a/test/e2e/operator_util.go b/test/e2e/operator_util.go index f86a7ba66..55630e9c4 100644 --- a/test/e2e/operator_util.go +++ b/test/e2e/operator_util.go @@ -137,6 +137,10 @@ func IsDistributedRedisClusterProperly(f *Framework, drc *redisv1alpha1.Distribu return LogAndReturnErrorf("DistributedRedisCluster %s wrong ready replicas, want: %d, got: %d", drc.Name, drc.Spec.ClusterReplicas+1, sts.Status.ReadyReplicas) } + if sts.Status.CurrentReplicas != (drc.Spec.ClusterReplicas + 1) { + return LogAndReturnErrorf("DistributedRedisCluster %s wrong current replicas, want: %d, got: %d", + drc.Name, drc.Spec.ClusterReplicas+1, sts.Status.ReadyReplicas) + } } password, err := getClusterPassword(f.Client, drc) @@ -247,6 +251,10 @@ func ScaleUPDown(drc *redisv1alpha1.DistributedRedisCluster) { drc.Spec.MasterSize = 3 } +func ResetPassword(drc *redisv1alpha1.DistributedRedisCluster, passwordSecret string) { + drc.Spec.PasswordSecret = &corev1.LocalObjectReference{Name: passwordSecret} +} + func RollingUpdateDRC(drc *redisv1alpha1.DistributedRedisCluster) { drc.Spec.Image = Redis5_0_6 }