Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup/Restore: support configuring TiKV GC life time #1835

Merged
merged 15 commits into from
Mar 4, 2020
Merged
13 changes: 4 additions & 9 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,12 @@ import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
)

// Options contains the input arguments to the backup command
type Options struct {
Namespace string
BackupName string
// BackupOpts contains the input arguments to the backup command
LinuxGit marked this conversation as resolved.
Show resolved Hide resolved
type BackupOpts struct {
util.GenericBackupOptions
}

func (bo *Options) String() string {
return fmt.Sprintf("%s/%s", bo.Namespace, bo.BackupName)
}

func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {
func (bo *BackupOpts) backupData(backup *v1alpha1.Backup) (string, error) {
args, path, err := constructOptions(backup)
if err != nil {
return "", err
Expand Down
118 changes: 111 additions & 7 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,33 @@
package backup

import (
"database/sql"
"fmt"
"time"

"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
)

// Manager mainly used to manage backup related work
type Manager struct {
backupLister listers.BackupLister
StatusUpdater controller.BackupConditionUpdaterInterface
Options
BackupOpts
}

// NewManager return a Manager
func NewManager(
backupLister listers.BackupLister,
statusUpdater controller.BackupConditionUpdaterInterface,
backupOpts Options) *Manager {
backupOpts BackupOpts) *Manager {
return &Manager{
backupLister,
statusUpdater,
Expand All @@ -60,10 +64,32 @@ func (bm *Manager) ProcessBackup() error {
if backup.Spec.BR == nil {
return fmt.Errorf("no br config in %s", bm)
}
return bm.performBackup(backup.DeepCopy())

var db *sql.DB
err = wait.PollImmediate(constants.PollInterval, constants.CheckTimeout, func() (done bool, err error) {
db, err = util.OpenDB(bm.GetDSN(constants.TidbMetaDB))
if err != nil {
klog.Warningf("can't connect to tidb cluster %s, err: %s", bm, err)
return false, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return nil for the error case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://godoc.org/k8s.io/apimachinery/pkg/util/wait#ConditionFunc ConditionFunc returns true if the condition is satisfied, or an error if the loop should be aborted.

}
return true, nil
})

if err != nil {
klog.Errorf("cluster %s connect failed, err: %s", bm, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ConnectTidbFailed",
Message: err.Error(),
})
}

defer db.Close()
return bm.performBackup(backup.DeepCopy(), db)
}

func (bm *Manager) performBackup(backup *v1alpha1.Backup) error {
func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
started := time.Now()

err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Expand All @@ -74,16 +100,94 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup) error {
return err
}

backupFullPath, err := bm.backupData(backup)
oldTikvGCTime, err := bm.GetTikvGCLifeTime(db)
if err != nil {
klog.Errorf("backup cluster %s data failed, err: %s", bm, err)
klog.Errorf("cluster %s get %s failed, err: %s", bm, constants.TikvGCVariable, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "BackupDataToRemoteFailed",
Reason: "GetTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
klog.Infof("cluster %s %s is %s", bm, constants.TikvGCVariable, oldTikvGCTime)

oldTikvGCTimeDuration, err := time.ParseDuration(oldTikvGCTime)
if err != nil {
klog.Errorf("cluster %s parse old %s failed, err: %s", bm, constants.TikvGCVariable, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ParseOldTikvGCLifeTimeFailed",
Message: err.Error(),
})
}

var tikvGCTimeDuration time.Duration
var tikvGCLifeTime string
if backup.Spec.TikvGCLifeTime != nil {
tikvGCLifeTime = *backup.Spec.TikvGCLifeTime
tikvGCTimeDuration, err = time.ParseDuration(tikvGCLifeTime)
if err != nil {
klog.Errorf("cluster %s parse configured %s failed, err: %s", bm, constants.TikvGCVariable, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ParseConfiguredTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
} else {
tikvGCLifeTime = constants.TikvGCLifeTime
tikvGCTimeDuration, err = time.ParseDuration(tikvGCLifeTime)
if err != nil {
klog.Errorf("cluster %s parse default %s failed, err: %s", bm, constants.TikvGCVariable, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ParseDefaultTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
}

if oldTikvGCTimeDuration < tikvGCTimeDuration {
err = bm.SetTikvGCLifeTime(db, tikvGCLifeTime)
if err != nil {
klog.Errorf("cluster %s set tikv GC life time to %s failed, err: %s", bm, tikvGCLifeTime, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "SetTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
klog.Infof("set cluster %s %s to %s success", bm, constants.TikvGCVariable, tikvGCLifeTime)
}

backupFullPath, backupErr := bm.backupData(backup)
if oldTikvGCTimeDuration < tikvGCTimeDuration {
err = bm.SetTikvGCLifeTime(db, oldTikvGCTime)
if err != nil {
klog.Errorf("cluster %s reset tikv GC life time to %s failed, err: %s", bm, oldTikvGCTime, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ResetTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
klog.Infof("reset cluster %s %s to %s success", bm, constants.TikvGCVariable, oldTikvGCTime)
}
if backupErr != nil {
klog.Errorf("backup cluster %s data failed, err: %s", bm, backupErr)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "BackupDataToRemoteFailed",
Message: backupErr.Error(),
})
}
klog.Infof("backup cluster %s data to %s success", bm, backupFullPath)

// Note: The size get from remote may be incorrect because the blobs
Expand Down
10 changes: 8 additions & 2 deletions cmd/backup-manager/app/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/backup"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
bkconstants "github.com/pingcap/tidb-operator/pkg/backup/constants"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/spf13/cobra"
Expand All @@ -29,7 +30,7 @@ import (

// NewBackupCommand implements the backup command
func NewBackupCommand() *cobra.Command {
bo := backup.Options{}
bo := backup.BackupOpts{}

cmd := &cobra.Command{
Use: "backup",
Expand All @@ -42,10 +43,15 @@ func NewBackupCommand() *cobra.Command {

cmd.Flags().StringVar(&bo.Namespace, "namespace", "", "Backup CR's namespace")
cmd.Flags().StringVar(&bo.BackupName, "backupName", "", "Backup CRD object name")
cmd.Flags().StringVar(&bo.Host, "host", "", "Tidb cluster access address")
cmd.Flags().Int32Var(&bo.Port, "port", bkconstants.DefaultTidbPort, "Port number to use for connecting tidb cluster")
cmd.Flags().StringVar(&bo.Password, bkconstants.TidbPasswordKey, "", "Password to use when connecting to tidb cluster")
cmd.Flags().StringVar(&bo.User, "user", "", "User for login tidb cluster")
util.SetFlagsFromEnv(cmd.Flags(), bkconstants.BackupManagerEnvVarPrefix)
onlymellb marked this conversation as resolved.
Show resolved Hide resolved
return cmd
}

func runBackup(backupOpts backup.Options, kubecfg string) error {
func runBackup(backupOpts backup.BackupOpts, kubecfg string) error {
kubeCli, cli, err := util.NewKubeAndCRCli(kubecfg)
cmdutil.CheckErr(err)
options := []informers.SharedInformerOption{
Expand Down
36 changes: 1 addition & 35 deletions cmd/backup-manager/app/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package export

import (
"database/sql"
"fmt"
"io/ioutil"
"os/exec"
Expand All @@ -32,20 +31,11 @@ import (

// BackupOpts contains the input arguments to the backup command
type BackupOpts struct {
Namespace string
BackupName string
util.GenericBackupOptions
Bucket string
Host string
Port int32
Password string
User string
StorageType string
}

func (bo *BackupOpts) String() string {
return fmt.Sprintf("%s/%s", bo.Namespace, bo.BackupName)
}

func (bo *BackupOpts) getBackupFullPath() string {
return filepath.Join(constants.BackupRootPath, bo.getBackupRelativePath())
}
Expand All @@ -59,26 +49,6 @@ func (bo *BackupOpts) getDestBucketURI(remotePath string) string {
return fmt.Sprintf("%s://%s", bo.StorageType, remotePath)
}

func (bo *BackupOpts) getTikvGCLifeTime(db *sql.DB) (string, error) {
var tikvGCTime string
sql := fmt.Sprintf("select variable_value from %s where variable_name= ?", constants.TidbMetaTable)
row := db.QueryRow(sql, constants.TikvGCVariable)
err := row.Scan(&tikvGCTime)
if err != nil {
return tikvGCTime, fmt.Errorf("query cluster %s %s failed, sql: %s, err: %v", bo, constants.TikvGCVariable, sql, err)
}
return tikvGCTime, nil
}

func (bo *BackupOpts) setTikvGCLifeTime(db *sql.DB, gcTime string) error {
sql := fmt.Sprintf("update %s set variable_value = ? where variable_name = ?", constants.TidbMetaTable)
_, err := db.Exec(sql, gcTime, constants.TikvGCVariable)
if err != nil {
return fmt.Errorf("set cluster %s %s failed, sql: %s, err: %v", bo, constants.TikvGCVariable, sql, err)
}
return nil
}

func (bo *BackupOpts) dumpTidbClusterData() (string, error) {
bfPath := bo.getBackupFullPath()
err := util.EnsureDirectoryExist(bfPath)
Expand Down Expand Up @@ -125,10 +95,6 @@ func (bo *BackupOpts) backupDataToRemote(source, bucketURI string) error {
return nil
}

func (bo *BackupOpts) getDSN(db string) string {
return fmt.Sprintf("%s:%s@(%s:%d)/%s?charset=utf8", bo.User, bo.Password, bo.Host, bo.Port, db)
}

/*
getCommitTsFromMetadata get commitTs from mydumper's metadata file

Expand Down
Loading