Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Enable checkpoint advancer to pause tasks lagged too large (#51441) #52105

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 22,
shard_count = 25,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand All @@ -89,8 +89,10 @@ go_test(
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
Expand Down
41 changes: 40 additions & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
lastCheckpoint *checkpoint
lastCheckpointMu sync.Mutex
inResolvingLock atomic.Bool
isPaused atomic.Bool

checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex
Expand Down Expand Up @@ -444,6 +445,14 @@
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
case EventPause:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(false, true)
}
case EventResume:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(true, false)
}
case EventErr:
return e.Err
}
Expand Down Expand Up @@ -542,13 +551,43 @@
return c.subscriber.PendingErrors()
}

func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, error) {
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}

now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false, err
}

Check warning on line 562 in br/pkg/streamhelper/advancer.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer.go#L561-L562

Added lines #L561 - L562 were not covered by tests

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
return true, nil
}
return false, nil
}

func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(ctx, c.checkpoints.Min())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
isLagged, err := c.isCheckpointLagged(ctx)
if err != nil {
return errors.Annotate(err, "failed to check timestamp")
}

Check warning on line 583 in br/pkg/streamhelper/advancer.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer.go#L582-L583

Added lines #L582 - L583 were not covered by tests
if isLagged {
err := c.env.PauseTask(ctx, c.task.Name)
if err != nil {
return errors.Annotate(err, "failed to pause task")
}

Check warning on line 588 in br/pkg/streamhelper/advancer.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer.go#L587-L588

Added lines #L587 - L588 were not covered by tests
return errors.Annotate(errors.Errorf("check point lagged too large"), "check point lagged too large")
}
return nil
}

Expand Down Expand Up @@ -589,7 +628,7 @@
func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
if c.task == nil || c.isPaused.Load() {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
Expand Down
86 changes: 66 additions & 20 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
EventAdd EventType = iota
EventDel
EventErr
EventPause
EventResume
)

func (t EventType) String() string {
Expand All @@ -39,6 +41,10 @@
return "Del"
case EventErr:
return "Err"
case EventPause:
return "Pause"
case EventResume:
return "Resume"
}
return "Unknown"
}
Expand Down Expand Up @@ -70,28 +76,47 @@
}

func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (TaskEvent, error) {
if !bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task path (%s)", string(event.Kv.Key))
te := TaskEvent{}
var prefix string

if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
prefix = PrefixOfTask()
te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix)
} else if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfPause())) {
prefix = PrefixOfPause()
te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix)
} else {
return TaskEvent{},
errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task/pause path (%s)",
string(event.Kv.Key))

Check warning on line 91 in br/pkg/streamhelper/advancer_cliext.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer_cliext.go#L86-L91

Added lines #L86 - L91 were not covered by tests
}

te := TaskEvent{}
te.Name = strings.TrimPrefix(string(event.Kv.Key), PrefixOfTask())
if event.Type == clientv3.EventTypeDelete {
te.Type = EventDel
} else if event.Type == clientv3.EventTypePut {
switch {
case event.Type == clientv3.EventTypePut && prefix == PrefixOfTask():
te.Type = EventAdd
} else {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "event type is wrong (%s)", event.Type)
case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfTask():
te.Type = EventDel
case event.Type == clientv3.EventTypePut && prefix == PrefixOfPause():
te.Type = EventPause
case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfPause():
te.Type = EventResume
default:
return TaskEvent{},
errors.Annotatef(berrors.ErrInvalidArgument,
"invalid event type or prefix: type=%s, prefix=%s", event.Type, prefix)

Check warning on line 106 in br/pkg/streamhelper/advancer_cliext.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer_cliext.go#L99-L106

Added lines #L99 - L106 were not covered by tests
}

te.Info = new(backuppb.StreamBackupTaskInfo)
if err := proto.Unmarshal(event.Kv.Value, te.Info); err != nil {
return TaskEvent{}, err
}

var err error
te.Ranges, err = t.MetaDataClient.TaskByInfo(*te.Info).Ranges(ctx)
if err != nil {
return TaskEvent{}, err
}

return te, nil
}

Expand All @@ -112,7 +137,10 @@
}

func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) {
c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
taskCh := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
pauseCh := t.Client.Watcher.Watch(ctx, PrefixOfPause(), clientv3.WithPrefix(), clientv3.WithRev(rev))

// inner function def
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := t.eventFromWatch(ctx, resp)
if err != nil {
Expand All @@ -125,20 +153,26 @@
}
return true
}

// inner function def
collectRemaining := func() {
log.Info("[log backup advancer] Start collecting remaining events in the channel.", zap.Int("remained", len(c)))
defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.")
log.Info("Start collecting remaining events in the channel.", zap.String("category", "log backup advancer"),
zap.Int("remained", len(taskCh)))
defer log.Info("Finish collecting remaining events in the channel.", zap.String("category", "log backup advancer"))
for {
if taskCh == nil && pauseCh == nil {
return
}

select {
case resp, ok := <-c:
if !ok {
return
case resp, ok := <-taskCh:
if !ok || !handleResponse(resp) {
taskCh = nil
}
if !handleResponse(resp) {
return
case resp, ok := <-pauseCh:
if !ok || !handleResponse(resp) {
pauseCh = nil
}
default:
return
}
}
}
Expand All @@ -147,7 +181,7 @@
defer close(ch)
for {
select {
case resp, ok := <-c:
case resp, ok := <-taskCh:
failpoint.Inject("advancer_close_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
Expand All @@ -159,6 +193,18 @@
if !handleResponse(resp) {
return
}
case resp, ok := <-pauseCh:
failpoint.Inject("advancer_close_pause_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
})
if !ok {
ch <- errorEvent(io.EOF)
return
}
if !handleResponse(resp) {
return
}

Check warning on line 207 in br/pkg/streamhelper/advancer_cliext.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer_cliext.go#L196-L207

Added lines #L196 - L207 were not covered by tests
case <-ctx.Done():
collectRemaining()
ch <- errorEvent(ctx.Err())
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/util/engine"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
Expand All @@ -36,6 +37,11 @@
pd.Client
}

// TODO: It should be able to synchoronize the current TS with the PD.
func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil

Check warning on line 42 in br/pkg/streamhelper/advancer_env.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer_env.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}

// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (c PDRegionScanner) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]RegionWithLeader, error) {
Expand Down Expand Up @@ -140,6 +146,7 @@
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
PauseTask(ctx context.Context, taskName string) error
}

var _ tikv.RegionLockResolver = &AdvancerLockResolver{}
Expand Down
78 changes: 78 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
Expand Down Expand Up @@ -440,3 +441,80 @@ func TestRemoveTaskAndFlush(t *testing.T) {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}

func TestEnableCheckPointLimit(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(30 * time.Second)
c.advanceCheckpointBy(20 * time.Second)
require.NoError(t, adv.OnTick(ctx))
}
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
}

func TestCheckPointResume(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//now the checkpoint issue is fixed and resumed
c.advanceCheckpointBy(1 * time.Minute)
env.ResumeTask(ctx)
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//with time passed, the checkpoint will exceed the limit again
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}
Loading
Loading