diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 31bc27c2c99a1..dea579e12fec5 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -68,7 +68,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 28, + shard_count = 32, deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 08a0bc2fe883a..c646bde8234c2 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -423,8 +423,17 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error c.task = e.Info c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] }) c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0))) - c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs) - p, err := c.env.BlockGCUntil(ctx, c.task.StartTs) + globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name) + if err != nil { + log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err)) + return err + } + if globalCheckpointTs < c.task.StartTs { + globalCheckpointTs = c.task.StartTs + } + log.Info("get global checkpoint", zap.Uint64("checkpoint", globalCheckpointTs)) + c.lastCheckpoint = newCheckpointWithTS(globalCheckpointTs) + p, err := c.env.BlockGCUntil(ctx, globalCheckpointTs) if err != nil { log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err)) } diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 7092edb3686ea..7557770eec093 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -156,6 +156,8 @@ type StreamMeta interface { Begin(ctx context.Context, ch chan<- TaskEvent) error // UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store. UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error + // GetGlobalCheckpointForTask gets the global checkpoint from the meta store. + GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) // ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store. ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error PauseTask(ctx context.Context, taskName string) error diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 27c28f4d51787..955cf6823b295 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + backup "github.com/pingcap/kvproto/pkg/brpb" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/streamhelper" @@ -20,9 +21,11 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/txnlock" "go.uber.org/atomic" + "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -38,7 +41,7 @@ func TestBasic(t *testing.T) { c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") ctx := context.Background() minCheckpoint := c.advanceCheckpoints() - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) coll := streamhelper.NewClusterCollector(ctx, env) err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll) @@ -57,7 +60,7 @@ func TestTick(t *testing.T) { c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) require.NoError(t, adv.OnTick(ctx)) @@ -79,7 +82,7 @@ func TestWithFailure(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) require.NoError(t, adv.OnTick(ctx)) @@ -130,7 +133,7 @@ func TestCollectorFailure(t *testing.T) { } c.splitAndScatter(splitKeys...) - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) coll := streamhelper.NewClusterCollector(ctx, env) @@ -171,7 +174,7 @@ func TestOneStoreFailure(t *testing.T) { c.splitAndScatter(splitKeys...) c.flushAll() - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) require.NoError(t, adv.OnTick(ctx)) @@ -195,7 +198,7 @@ func TestGCServiceSafePoint(t *testing.T) { c := createFakeCluster(t, 4, true) ctx := context.Background() c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) @@ -221,7 +224,9 @@ func TestTaskRanges(t *testing.T) { c.splitAndScatter("0001", "0002", "0012", "0034", "0048") c.advanceCheckpoints() c.flushAllExcept("0000", "0049") - env := &testEnv{fakeCluster: c, testCtx: t, ranges: []kv.KeyRange{{StartKey: []byte("0002"), EndKey: []byte("0048")}}} + env := newTestEnv(c, t) + env.ranges = []kv.KeyRange{{StartKey: []byte("0002"), EndKey: []byte("0048")}} + env.task.Ranges = env.ranges adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) @@ -238,7 +243,9 @@ func TestTaskRangesWithSplit(t *testing.T) { c.splitAndScatter("0012", "0034", "0048") c.advanceCheckpoints() c.flushAllExcept("0049") - env := &testEnv{fakeCluster: c, testCtx: t, ranges: []kv.KeyRange{{StartKey: []byte("0002"), EndKey: []byte("0048")}}} + env := newTestEnv(c, t) + env.ranges = []kv.KeyRange{{StartKey: []byte("0002"), EndKey: []byte("0048")}} + env.task.Ranges = env.ranges adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) @@ -282,7 +289,7 @@ func TestClearCache(t *testing.T) { // mark one store failed is enough break } - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) var err error @@ -313,7 +320,7 @@ func TestBlocked(t *testing.T) { marked = true } req.True(marked, "failed to mark the cluster: ") - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) adv.UpdateConfigWith(func(c *config.Config) { @@ -344,7 +351,7 @@ func TestResolveLock(t *testing.T) { c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") ctx := context.Background() minCheckpoint := c.advanceCheckpoints() - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) lockRegion := c.findRegionByKey([]byte("01")) allLocks := []*txnlock.Lock{ @@ -410,7 +417,7 @@ func TestOwnerDropped(t *testing.T) { c := createFakeCluster(t, 4, false) c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") installSubscribeSupport(c) - env := &testEnv{testCtx: t, fakeCluster: c} + env := newTestEnv(c, t) fp := "github.com/pingcap/tidb/br/pkg/streamhelper/get_subscriber" defer func() { if t.Failed() { @@ -446,10 +453,7 @@ func TestRemoveTaskAndFlush(t *testing.T) { ctx := context.Background() c := createFakeCluster(t, 4, true) installSubscribeSupport(c) - env := &testEnv{ - fakeCluster: c, - testCtx: t, - } + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) adv.SpawnSubscriptionHandler(ctx) @@ -474,11 +478,29 @@ func TestEnableCheckPointLimit(t *testing.T) { c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - env := &testEnv{fakeCluster: c, testCtx: t} + + env := newTestEnv(c, t) + rngs := env.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + env.task = streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)), + }, + Ranges: rngs, + } + log.Info("Start Time:", zap.Uint64("StartTs", env.task.Info.StartTs)) adv := streamhelper.NewCheckpointAdvancer(env) adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute }) + + c.advanceClusterTimeBy(1 * time.Minute) + c.advanceCheckpointBy(1 * time.Minute) adv.StartTaskListener(ctx) for i := 0; i < 5; i++ { c.advanceClusterTimeBy(30 * time.Second) @@ -495,13 +517,27 @@ func TestCheckPointLagged(t *testing.T) { c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) + rngs := env.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + env.task = streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)), + }, + Ranges: rngs, + } + adv := streamhelper.NewCheckpointAdvancer(env) adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute }) adv.StartTaskListener(ctx) - c.advanceClusterTimeBy(1 * time.Minute) + c.advanceClusterTimeBy(2 * time.Minute) require.NoError(t, adv.OnTick(ctx)) c.advanceClusterTimeBy(1 * time.Minute) require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") @@ -511,6 +547,7 @@ func TestCheckPointLagged(t *testing.T) { }, 5*time.Second, 100*time.Millisecond) } +// If the paused task are manually resumed, it should run normally func TestCheckPointResume(t *testing.T) { c := createFakeCluster(t, 4, false) defer func() { @@ -519,7 +556,7 @@ func TestCheckPointResume(t *testing.T) { c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute @@ -551,7 +588,7 @@ func TestUnregisterAfterPause(t *testing.T) { c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute @@ -571,12 +608,175 @@ func TestUnregisterAfterPause(t *testing.T) { }, 5*time.Second, 300*time.Millisecond) } +// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally. +func TestAddTaskWithLongRunTask0(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + env := newTestEnv(c, t) + rngs := env.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + env.task = streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(2 * time.Minute)), + }, + Ranges: rngs, + } + + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + c.advanceClusterTimeBy(3 * time.Minute) + c.advanceCheckpointBy(1 * time.Minute) + env.advanceCheckpointBy(1 * time.Minute) + env.mockPDConnectionError() + adv.StartTaskListener(ctx) + // Try update checkpoint + require.NoError(t, adv.OnTick(ctx)) + // Verify no err raised + require.NoError(t, adv.OnTick(ctx)) +} + +// If the start ts is lagged, as long as cluster has advanced, the task should run normally. +func TestAddTaskWithLongRunTask1(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + env := newTestEnv(c, t) + rngs := env.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + env.task = streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)), + }, + Ranges: rngs, + } + + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + c.advanceClusterTimeBy(3 * time.Minute) + c.advanceCheckpointBy(2 * time.Minute) + env.advanceCheckpointBy(1 * time.Minute) + adv.StartTaskListener(ctx) + // Try update checkpoint + require.NoError(t, adv.OnTick(ctx)) + // Verify no err raised + require.NoError(t, adv.OnTick(ctx)) +} + +// If the start ts is lagged, as long as pd stored the advanced checkpoint, the task should run normally. +// Also, temporary connection error won't affect the task. +func TestAddTaskWithLongRunTask2(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + env := newTestEnv(c, t) + rngs := env.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + env.task = streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)), + }, + Ranges: rngs, + } + + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + c.advanceClusterTimeBy(3 * time.Minute) + c.advanceCheckpointBy(1 * time.Minute) + env.advanceCheckpointBy(2 * time.Minute) + env.mockPDConnectionError() + adv.StartTaskListener(ctx) + // Try update checkpoint + require.NoError(t, adv.OnTick(ctx)) + // Verify no err raised + require.NoError(t, adv.OnTick(ctx)) +} + +// If the start ts, pd, and cluster checkpoint are all lagged, the task should pause. +func TestAddTaskWithLongRunTask3(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + env := newTestEnv(c, t) + rngs := env.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + env.task = streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)), + }, + Ranges: rngs, + } + + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + c.advanceClusterTimeBy(3 * time.Minute) + c.advanceCheckpointBy(1 * time.Minute) + env.advanceCheckpointBy(1 * time.Minute) + env.mockPDConnectionError() + adv.StartTaskListener(ctx) + // Try update checkpoint + require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") + // Verify no err raised after paused + require.Eventually(t, func() bool { + err := adv.OnTick(ctx) + return err == nil + }, 5*time.Second, 300*time.Millisecond) +} + func TestOwnershipLost(t *testing.T) { c := createFakeCluster(t, 4, false) c.splitAndScatter(manyRegions(0, 10240)...) installSubscribeSupport(c) ctx, cancel := context.WithCancel(context.Background()) - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.OnStart(ctx) adv.OnBecomeOwner(ctx) @@ -597,7 +797,7 @@ func TestSubscriptionPanic(t *testing.T) { c.splitAndScatter(manyRegions(0, 20)...) installSubscribeSupport(c) ctx, cancel := context.WithCancel(context.Background()) - env := &testEnv{fakeCluster: c, testCtx: t} + env := newTestEnv(c, t) adv := streamhelper.NewCheckpointAdvancer(env) adv.OnStart(ctx) adv.OnBecomeOwner(ctx) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 03db8db7a7a10..900b1fb23ee59 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -635,10 +635,12 @@ func (f *fakeCluster) String() string { type testEnv struct { *fakeCluster - checkpoint uint64 - testCtx *testing.T - ranges []kv.KeyRange - taskCh chan<- streamhelper.TaskEvent + checkpoint uint64 + pdDisconnected atomic.Bool + testCtx *testing.T + ranges []kv.KeyRange + taskCh chan<- streamhelper.TaskEvent + task streamhelper.TaskEvent resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error) @@ -646,12 +648,16 @@ type testEnv struct { pd.Client } -func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) error { - rngs := t.ranges +func newTestEnv(c *fakeCluster, t *testing.T) *testEnv { + env := &testEnv{ + fakeCluster: c, + testCtx: t, + } + rngs := env.ranges if len(rngs) == 0 { rngs = []kv.KeyRange{{}} } - tsk := streamhelper.TaskEvent{ + env.task = streamhelper.TaskEvent{ Type: streamhelper.EventAdd, Name: "whole", Info: &backup.StreamBackupTaskInfo{ @@ -659,7 +665,11 @@ func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) e }, Ranges: rngs, } - ch <- tsk + return env +} + +func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) error { + ch <- t.task t.taskCh = ch return nil } @@ -675,6 +685,25 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, return nil } +func (t *testEnv) mockPDConnectionError() { + t.pdDisconnected.Store(true) +} + +func (t *testEnv) connectPD() bool { + if !t.pdDisconnected.Load() { + return true + } + t.pdDisconnected.Store(false) + return false +} + +func (t *testEnv) GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) { + if !t.connectPD() { + return 0, status.Error(codes.Unavailable, "pd disconnected") + } + return t.checkpoint, nil +} + func (t *testEnv) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error { t.mu.Lock() defer t.mu.Unlock() @@ -706,6 +735,13 @@ func (t *testEnv) getCheckpoint() uint64 { return t.checkpoint } +func (t *testEnv) advanceCheckpointBy(duration time.Duration) { + t.mu.Lock() + defer t.mu.Unlock() + + t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration)) +} + func (t *testEnv) unregisterTask() { t.taskCh <- streamhelper.TaskEvent{ Type: streamhelper.EventDel,