Skip to content

Commit

Permalink
cherry pick pingcap#825 to release-4.0 (pingcap#826)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
Co-authored-by: Neil Shen <[email protected]>
  • Loading branch information
ti-srebot and overvenus authored Mar 9, 2021
1 parent 2d1d44f commit 46c5999
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 31 deletions.
8 changes: 5 additions & 3 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
sp.BackupTS = cfg.LastBackupTS
}

log.Info("current backup safePoint job",
zap.Object("safePoint", sp))
utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp)
log.Info("current backup safePoint job", zap.Object("safePoint", sp))
err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp)
if err != nil {
return errors.Trace(err)
}

isIncrementalBackup := cfg.LastBackupTS > 0

Expand Down
5 changes: 4 additions & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
// restore checksum will check safe point with its start ts, see details at
// https://github.com/pingcap/tidb/blob/180c02127105bed73712050594da6ead4d70a85f/store/tikv/kv.go#L186-L190
// so, we should keep the safe point unchangeable. to avoid GC life time is shorter than transaction duration.
utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp)
err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp)
if err != nil {
return errors.Trace(err)
}

var newTS uint64
if client.IsIncremental() {
Expand Down
54 changes: 29 additions & 25 deletions pkg/utils/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,11 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error
return nil
}

// UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.
func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error {
log.Debug("update PD safePoint limit with TTL",
zap.Object("safePoint", sp))
// updateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.
func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error {
log.Debug("update PD safePoint limit with TTL", zap.Object("safePoint", sp))

lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx,
sp.ID, sp.TTL, sp.BackupTS-1)
lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, sp.ID, sp.TTL, sp.BackupTS-1)
if lastSafePoint > sp.BackupTS-1 {
log.Warn("service GC safe point lost, we may fail to back up if GC lifetime isn't long enough",
zap.Uint64("lastSafePoint", lastSafePoint),
Expand All @@ -96,27 +94,23 @@ func StartServiceSafePointKeeper(
ctx context.Context,
pdClient pd.Client,
sp BRServiceSafePoint,
) {
// It would be OK since TTL won't be zero, so gapTime should > `0.
updateGapTime := time.Duration(sp.TTL) * time.Second / preUpdateServiceSafePointFactor
update := func(ctx context.Context) {
if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil {
log.Warn("failed to update service safe point, backup may fail if gc triggered",
zap.Error(err),
)
}
) error {
if sp.ID == "" || sp.TTL <= 0 {
return errors.Annotatef(berrors.ErrInvalidArgument, "invalid service safe point %v", sp)
}
check := func(ctx context.Context) {
if err := CheckGCSafePoint(ctx, pdClient, sp.BackupTS); err != nil {
log.Panic("cannot pass gc safe point check, aborting",
zap.Error(err),
zap.Object("safePoint", sp),
)
}
if err := CheckGCSafePoint(ctx, pdClient, sp.BackupTS); err != nil {
return errors.Trace(err)
}
// Update service safe point immediately to cover the gap between starting
// update goroutine and updating service safe point.
if err := updateServiceSafePoint(ctx, pdClient, sp); err != nil {
return errors.Trace(err)
}

// It would be OK since TTL won't be zero, so gapTime should > `0.
updateGapTime := time.Duration(sp.TTL) * time.Second / preUpdateServiceSafePointFactor
updateTick := time.NewTicker(updateGapTime)
checkTick := time.NewTicker(checkGCSafePointGapTime)
update(ctx)
go func() {
defer updateTick.Stop()
defer checkTick.Stop()
Expand All @@ -126,10 +120,20 @@ func StartServiceSafePointKeeper(
log.Debug("service safe point keeper exited")
return
case <-updateTick.C:
update(ctx)
if err := updateServiceSafePoint(ctx, pdClient, sp); err != nil {
log.Warn("failed to update service safe point, backup may fail if gc triggered",
zap.Error(err),
)
}
case <-checkTick.C:
check(ctx)
if err := CheckGCSafePoint(ctx, pdClient, sp.BackupTS); err != nil {
log.Panic("cannot pass gc safe point check, aborting",
zap.Error(err),
zap.Object("safePoint", sp),
)
}
}
}
}()
return nil
}
82 changes: 80 additions & 2 deletions pkg/utils/safe_point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,93 @@ func (s *testSafePointSuite) TestCheckGCSafepoint(c *C) {
type mockSafePoint struct {
sync.Mutex
pd.Client
safepoint uint64
safepoint uint64
minServiceSafepoint uint64
}

func (m *mockSafePoint) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
m.Lock()
defer m.Unlock()

if m.safepoint > safePoint {
return m.safepoint, nil
}
if m.minServiceSafepoint == 0 || m.minServiceSafepoint > safePoint {
m.minServiceSafepoint = safePoint
}
return m.minServiceSafepoint, nil
}

func (m *mockSafePoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
m.Lock()
defer m.Unlock()

if m.safepoint < safePoint {
if m.safepoint < safePoint && safePoint < m.minServiceSafepoint {
m.safepoint = safePoint
}
return m.safepoint, nil
}

func (s *testSafePointSuite) TestStartServiceSafePointKeeper(c *C) {
pdClient := &mockSafePoint{safepoint: 2333}

cases := []struct {
sp utils.BRServiceSafePoint
ok bool
}{
{
utils.BRServiceSafePoint{
ID: "br",
TTL: 10,
BackupTS: 2333 + 1,
},
true,
},

// Invalid TTL.
{
utils.BRServiceSafePoint{
ID: "br",
TTL: 0,
BackupTS: 2333 + 1,
}, false,
},

// Invalid ID.
{
utils.BRServiceSafePoint{
ID: "",
TTL: 0,
BackupTS: 2333 + 1,
},
false,
},

// BackupTS is too small.
{
utils.BRServiceSafePoint{
ID: "br",
TTL: 10,
BackupTS: 2333,
}, false,
},
{
utils.BRServiceSafePoint{
ID: "br",
TTL: 10,
BackupTS: 2333 - 1,
},
false,
},
}
for i, cs := range cases {
ctx, cancel := context.WithCancel(context.Background())
err := utils.StartServiceSafePointKeeper(ctx, pdClient, cs.sp)
checker := IsNil
if !cs.ok {
checker = NotNil
}
c.Assert(err, checker, Commentf("case #%d, %v", i, cs))
cancel()
}
}

0 comments on commit 46c5999

Please sign in to comment.