Skip to content

Commit

Permalink
br: calculate the global checkpoint-ts of log-back task. (#38815)
Browse files Browse the repository at this point in the history
close #38776
  • Loading branch information
joccau authored Nov 3, 2022
1 parent 8db7c3b commit adf1adf
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
13 changes: 11 additions & 2 deletions br/pkg/streamhelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,22 @@ func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error) {
initialized := false
checkpoint := t.Info.StartTs
for _, cp := range checkPointMap {
if !initialized || cp.TS < checkpoint {
if cp.Type() == CheckpointTypeGlobal {
return cp.TS, nil
}

if cp.Type() == CheckpointTypeStore && (!initialized || cp.TS < checkpoint) {
initialized = true
checkpoint = cp.TS
}
}

return checkpoint, nil
ts, err := t.GetStorageCheckpoint(ctx)
if err != nil {
return 0, errors.Trace(err)
}

return mathutil.Max(checkpoint, ts), nil
}

// Step forwards the progress (next_backup_ts) of some region.
Expand Down
44 changes: 42 additions & 2 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"

"github.com/pingcap/errors"
backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -138,7 +139,8 @@ func TestIntegration(t *testing.T) {
metaCli := streamhelper.MetaDataClient{Client: cli}
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli, etcd) })
t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli) })
t.Run("testGetGlobalCheckPointTS", func(t *testing.T) { testGetGlobalCheckPointTS(t, metaCli) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
}
Expand Down Expand Up @@ -233,7 +235,7 @@ func testForwardProgress(t *testing.T, metaCli streamhelper.MetaDataClient, etcd
require.Equal(t, store2Checkpoint, uint64(40))
}

func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Etcd) {
func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient) {
var (
taskName = "my_task"
ctx = context.Background()
Expand Down Expand Up @@ -264,6 +266,44 @@ func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient,
ts, err := task.GetStorageCheckpoint(ctx)
require.NoError(t, err)
require.Equal(t, uint64(10002), ts)

ts, err = task.GetGlobalCheckPointTS(ctx)
require.NoError(t, err)
require.Equal(t, uint64(10002), ts)
}

func testGetGlobalCheckPointTS(t *testing.T, metaCli streamhelper.MetaDataClient) {
var (
taskName = "my_task"
ctx = context.Background()
value = make([]byte, 8)
)

cases := []struct {
storeID string
storageCheckPoint uint64
}{
{
"1",
10001,
}, {
"2",
10002,
},
}
for _, c := range cases {
key := path.Join(streamhelper.StorageCheckpointOf(taskName), c.storeID)
binary.BigEndian.PutUint64(value, c.storageCheckPoint)
_, err := metaCli.Put(ctx, key, string(value))
require.NoError(t, err)
}

task := streamhelper.NewTask(&metaCli, backup.StreamBackupTaskInfo{Name: taskName})
task.UploadGlobalCheckpoint(ctx, 1003)

globalTS, err := task.GetGlobalCheckPointTS(ctx)
require.NoError(t, err)
require.Equal(t, globalTS, uint64(1003))
}

func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
Expand Down

0 comments on commit adf1adf

Please sign in to comment.