Skip to content

Commit

Permalink
tikv scaling takes precedence over upgrading
Browse files Browse the repository at this point in the history
  • Loading branch information
cofyc committed Jul 8, 2020
1 parent 2560d8d commit 3d9990d
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 37 deletions.
27 changes: 18 additions & 9 deletions pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,18 @@ func (tkmm *tikvMemberManager) syncStatefulSetForTidbCluster(tc *v1alpha1.TidbCl
return err
}

if !templateEqual(newSet, oldSet) || tc.Status.TiKV.Phase == v1alpha1.UpgradePhase {
if err := tkmm.tikvUpgrader.Upgrade(tc, oldSet, newSet); err != nil {
return err
}
}

// Scaling takes precedence over upgrading because:
// - if a store fails in the upgrading, users may want to delete it or add
// new replicas
// - it's ok to scale in the middle of upgrading (in statefulset controller
// scaling takes precedence over upgrading too)
if err := tkmm.tikvScaler.Scale(tc, oldSet, newSet); err != nil {
return err
}

// Perform failover logic if necessary. Note that this will only update
// TidbCluster status. The actual scaling performs in next sync loop (if a
// new replica needs to be added).
if tkmm.autoFailover && tc.Spec.TiKV.MaxFailoverCount != nil {
if tc.TiKVAllPodsStarted() && !tc.TiKVAllStoresReady() {
if err := tkmm.tikvFailover.Failover(tc); err != nil {
Expand All @@ -253,6 +255,12 @@ func (tkmm *tikvMemberManager) syncStatefulSetForTidbCluster(tc *v1alpha1.TidbCl
}
}

if !templateEqual(newSet, oldSet) || tc.Status.TiKV.Phase == v1alpha1.UpgradePhase {
if err := tkmm.tikvUpgrader.Upgrade(tc, oldSet, newSet); err != nil {
return err
}
}

return updateStatefulSet(tkmm.setControl, tc, newSet, oldSet)
}

Expand Down Expand Up @@ -607,10 +615,11 @@ func (tkmm *tikvMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, s
if err != nil {
return err
}
if upgrading && tc.Status.PD.Phase != v1alpha1.UpgradePhase {
tc.Status.TiKV.Phase = v1alpha1.UpgradePhase
} else if tc.TiKVStsDesiredReplicas() != *set.Spec.Replicas {
// Scaling takes precedence over upgrading.
if tc.TiKVStsDesiredReplicas() != *set.Spec.Replicas {
tc.Status.TiKV.Phase = v1alpha1.ScalePhase
} else if upgrading && tc.Status.PD.Phase != v1alpha1.UpgradePhase {
tc.Status.TiKV.Phase = v1alpha1.UpgradePhase
} else {
tc.Status.TiKV.Phase = v1alpha1.NormalPhase
}
Expand Down
14 changes: 1 addition & 13 deletions pkg/manager/member/tikv_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ func (tsd *tikvScaler) Scale(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet,
func (tsd *tikvScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
_, ordinal, replicas, deleteSlots := scaleOne(oldSet, newSet)
resetReplicas(newSet, oldSet)
if tc.TiKVUpgrading() {
klog.Infof("TidbCluster: [%s/%s]'s tikv is upgrading, can not scale out until the upgrade completed",
tc.Namespace, tc.Name)
return nil
}

klog.Infof("scaling out tikv statefulset %s/%s, ordinal: %d (replicas: %d, delete slots: %v)", oldSet.Namespace, oldSet.Name, ordinal, replicas, deleteSlots.List())
_, err := tsd.deleteDeferDeletingPVC(tc, oldSet.GetName(), v1alpha1.TiKVMemberType, ordinal)
Expand All @@ -81,13 +76,6 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
resetReplicas(newSet, oldSet)
setName := oldSet.GetName()

// tikv can not scale in when it is upgrading
if tc.TiKVUpgrading() {
klog.Infof("TidbCluster: [%s/%s]'s tikv is upgrading, can not scale in until upgrade completed",
ns, tcName)
return nil
}

klog.Infof("scaling in tikv statefulset %s/%s, ordinal: %d (replicas: %d, delete slots: %v)", oldSet.Namespace, oldSet.Name, ordinal, replicas, deleteSlots.List())
// We need remove member from cluster before reducing statefulset replicas
podName := ordinalPodName(v1alpha1.TiKVMemberType, tcName, ordinal)
Expand Down Expand Up @@ -115,7 +103,7 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
}
klog.Infof("tikv scale in: delete store %d for tikv %s/%s successfully", id, ns, podName)
}
return controller.RequeueErrorf("TiKV %s/%s store %d still in cluster, state: %s", ns, podName, id, state)
return controller.RequeueErrorf("TiKV %s/%s store %d is still in cluster, state: %s", ns, podName, id, state)
}
}
for id, store := range tc.Status.TiKV.TombstoneStores {
Expand Down
26 changes: 14 additions & 12 deletions pkg/manager/member/tikv_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ func TestTiKVScalerScaleOut(t *testing.T) {
changed bool
}

testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
testFn := func(test testcase, t *testing.T) {
tc := newTidbClusterForPD()

if test.tikvUpgrading {
Expand Down Expand Up @@ -105,7 +104,7 @@ func TestTiKVScalerScaleOut(t *testing.T) {
annoIsNil: true,
pvcDeleteErr: false,
errExpectFn: errExpectNil,
changed: false,
changed: true,
},
{
name: "cache don't have pvc",
Expand Down Expand Up @@ -138,8 +137,10 @@ func TestTiKVScalerScaleOut(t *testing.T) {
},
}

for i := range tests {
testFn(&tests[i], t)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testFn(tt, t)
})
}
}

Expand All @@ -161,8 +162,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {

controller.ResyncDuration = 0

testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
testFn := func(test testcase, t *testing.T) {
tc := newTidbClusterForPD()
test.storeFun(tc)

Expand Down Expand Up @@ -253,17 +253,17 @@ func TestTiKVScalerScaleIn(t *testing.T) {
changed: false,
},
{
name: "tikv is upgrading",
name: "able to scale in while upgrading",
tikvUpgrading: true,
storeFun: normalStoreFun,
storeFun: tombstoneStoreFun,
delStoreErr: false,
hasPVC: true,
storeIDSynced: true,
isPodReady: true,
hasSynced: true,
pvcUpdateErr: false,
errExpectFn: errExpectNil,
changed: false,
changed: true,
},
{
name: "status.TiKV.Stores is empty",
Expand Down Expand Up @@ -423,8 +423,10 @@ func TestTiKVScalerScaleIn(t *testing.T) {
},
}

for i := range tests {
testFn(&tests[i], t)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testFn(tt, t)
})
}
}

Expand Down
68 changes: 65 additions & 3 deletions tests/e2e/tidbcluster/stability.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,69 @@ var _ = ginkgo.Describe("[tidb-operator][Stability]", func() {
framework.ExpectNoError(err)
})

// https://github.com/pingcap/tidb-operator/issues/2739
ginkgo.It("[Feature: AutoFailover] Failover can work if a store fails to upgrade", func() {
clusterName := "scale"
tc := fixture.GetTidbCluster(ns, clusterName, utilimage.TiDBV4Version)
tc.Spec.PD.Replicas = 1
tc.Spec.PD.Config.Schedule = &v1alpha1.PDScheduleConfig{
// By default, PD set the state of disconnected store to Down
// after 30 minutes. Use a short time in testing.
MaxStoreDownTime: pointer.StringPtr("1m"),
}
tc.Spec.TiKV.Replicas = 3
tc.Spec.TiDB.Replicas = 1
err := genericCli.Create(context.TODO(), tc)
framework.ExpectNoError(err)
err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 15*time.Second)
framework.ExpectNoError(err)

ginkgo.By("Fail a TiKV store")
podName := controller.TiKVMemberName(clusterName) + "-1"
f.ExecCommandInContainer(podName, "tikv", "sh", "-c", "rm -rf /var/lib/tikv/*")

ginkgo.By("Waiting for the store to be in Down state")
err = utiltidbcluster.WaitForTidbClusterCondition(cli, tc.Namespace, tc.Name, time.Minute*5, func(tc *v1alpha1.TidbCluster) (bool, error) {
for _, store := range tc.Status.TiKV.Stores {
if store.PodName == podName && store.State == v1alpha1.TiKVStateDown {
return true, nil
}
}
return false, nil
})
framework.ExpectNoError(err)

ginkgo.By("Upgrade TiKV configuration")
updateStrategy := v1alpha1.ConfigUpdateStrategyRollingUpdate
err = controller.GuaranteedUpdate(genericCli, tc, func() error {
tc.Spec.TiKV.Config.LogLevel = pointer.StringPtr("info")
tc.Spec.TiKV.ConfigUpdateStrategy = &updateStrategy
return nil
})
framework.ExpectNoError(err)

ginkgo.By("Waiting for the store to be put into failsure stores")
err = utiltidbcluster.WaitForTidbClusterCondition(cli, tc.Namespace, tc.Name, time.Minute*5, func(tc *v1alpha1.TidbCluster) (bool, error) {
for _, failureStore := range tc.Status.TiKV.FailureStores {
if failureStore.PodName == podName {
return true, nil
}
}
return false, nil
})
framework.ExpectNoError(err)

ginkgo.By("Waiting for the new pod to be created")
newPodName := controller.TiKVMemberName(clusterName) + "-3"
err = wait.PollImmediate(time.Second*10, 1*time.Minute, func() (bool, error) {
_, err := c.CoreV1().Pods(ns).Get(newPodName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, nil
}
return !apierrors.IsNotFound(err), nil
})
framework.ExpectNoError(err)
})
})

ginkgo.Context("[Feature: AdvancedStatefulSet][Feature: AutoFailover] operator with advanced statefulset and short auto-failover periods", func() {
Expand Down Expand Up @@ -923,13 +986,12 @@ var _ = ginkgo.Describe("[tidb-operator][Stability]", func() {

ginkgo.By("Waiting for the store to be put into failsure stores")
err = utiltidbcluster.WaitForTidbClusterCondition(cli, tc.Namespace, tc.Name, time.Minute*5, func(tc *v1alpha1.TidbCluster) (bool, error) {
exist := false
for _, failureStore := range tc.Status.TiKV.FailureStores {
if failureStore.PodName == podName {
exist = true
return true, nil
}
}
return exist, nil
return false, nil
})
framework.ExpectNoError(err)

Expand Down

0 comments on commit 3d9990d

Please sign in to comment.