Skip to content

Commit

Permalink
prepare_snap: establish connection to all stores before pausing admin (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 25, 2024
1 parent 4fd9e85 commit b366852
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 22 deletions.
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 7,
shard_count = 9,
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
56 changes: 42 additions & 14 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type Preparer struct {
RetryBackoff time.Duration
RetryLimit int
LeaseDuration time.Duration

/* Observers. Initialize them before starting.*/
AfterConnectionsEstablished func()
}

func New(env Env) *Preparer {
Expand Down Expand Up @@ -155,10 +158,13 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error {
zap.Int("retry_limit", p.RetryLimit),
zap.Duration("lease_duration", p.LeaseDuration))
p.retryTime = 0
if err := p.prepareConnections(ctx); err != nil {
if err := p.PrepareConnections(ctx); err != nil {
log.Error("failed to prepare connections", logutil.ShortError(err))
return errors.Annotate(err, "failed to prepare connections")
}
if p.AfterConnectionsEstablished != nil {
p.AfterConnectionsEstablished()
}
if err := p.AdvanceState(ctx); err != nil {
log.Error("failed to check the progress of our work", logutil.ShortError(err))
return errors.Annotate(err, "failed to begin step")
Expand Down Expand Up @@ -386,23 +392,31 @@ func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) erro
}

func (p *Preparer) streamOf(ctx context.Context, storeID uint64) (*prepareStream, error) {
s, ok := p.clients[storeID]
_, ok := p.clients[storeID]
if !ok {
log.Warn("stream of store found a store not established connection", zap.Uint64("store", storeID))
cli, err := p.env.ConnectToStore(ctx, storeID)
if err != nil {
return nil, errors.Annotatef(err, "failed to dial store %d", storeID)
}
s = new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
s.leaseDuration = p.LeaseDuration
err = s.InitConn(ctx, cli)
if err != nil {
return nil, err
if err := p.createAndCacheStream(ctx, cli, storeID); err != nil {
return nil, errors.Annotatef(err, "failed to create and cache stream for store %d", storeID)
}
p.clients[storeID] = s
}
return s, nil
return p.clients[storeID], nil
}

func (p *Preparer) createAndCacheStream(ctx context.Context, cli PrepareClient, storeID uint64) error {
s := new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
s.leaseDuration = p.LeaseDuration
err := s.InitConn(ctx, cli)
if err != nil {
return err
}
p.clients[storeID] = s
return nil
}

func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
Expand All @@ -415,17 +429,31 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
p.inflightReqs[region.GetMeta().Id] = *region.GetMeta()
}

func (p *Preparer) prepareConnections(ctx context.Context) error {
// PrepareConnections prepares the connections for each store.
// This will pause the admin commands for each store.
func (p *Preparer) PrepareConnections(ctx context.Context) error {
log.Info("Preparing connections to stores.")
stores, err := p.env.GetAllLiveStores(ctx)
if err != nil {
return errors.Annotate(err, "failed to get all live stores")
}

log.Info("Start to initialize the connections.", zap.Int("stores", len(stores)))
clients := map[uint64]PrepareClient{}
for _, store := range stores {
_, err := p.streamOf(ctx, store.Id)
cli, err := p.env.ConnectToStore(ctx, store.Id)
if err != nil {
return errors.Annotatef(err, "failed to prepare connection to store %d", store.Id)
return errors.Annotatef(err, "failed to dial the store %d", store.Id)
}
clients[store.Id] = cli
}

for id, cli := range clients {
log.Info("Start to pause the admin commands.", zap.Uint64("store", id))
if err := p.createAndCacheStream(ctx, cli, id); err != nil {
return errors.Annotatef(err, "failed to create and cache stream for store %d", id)
}
}

return nil
}
90 changes: 86 additions & 4 deletions br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"sort"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -110,15 +111,24 @@ type mockStores struct {
mu sync.Mutex
stores map[uint64]*mockStore
onCreateStore func(*mockStore)
connectDelay func(uint64) <-chan struct{}
onConnectToStore func(uint64) error

pdc *tikv.RegionCache
}

func newTestEnv(pdc pd.Client) *mockStores {
r := tikv.NewRegionCache(pdc)
stores, err := pdc.GetAllStores(context.Background())
if err != nil {
panic(err)
}
ss := map[uint64]*mockStore{}
for _, store := range stores {
ss[store.Id] = nil
}
ms := &mockStores{
stores: map[uint64]*mockStore{},
stores: ss,
pdc: r,
onCreateStore: func(ms *mockStore) {},
}
Expand All @@ -138,7 +148,14 @@ func (m *mockStores) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, err

func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
m.mu.Lock()
defer m.mu.Unlock()
defer func() {
m.mu.Unlock()
if m.connectDelay != nil {
if ch := m.connectDelay(storeID); ch != nil {
<-ch
}
}
}()

if m.onConnectToStore != nil {
err := m.onConnectToStore(storeID)
Expand All @@ -147,8 +164,8 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
}
}

_, ok := m.stores[storeID]
if !ok {
s, ok := m.stores[storeID]
if !ok || s == nil {
m.stores[storeID] = &mockStore{
output: make(chan brpb.PrepareSnapshotBackupResponse, 16),
successRegions: []metapb.Region{},
Expand Down Expand Up @@ -456,3 +473,68 @@ func TestSplitEnv(t *testing.T) {
require.Equal(t, cc.PrepareClient.(*counterClient).send, 1)
require.ElementsMatch(t, cc.PrepareClient.(*counterClient).regions, tinyRequest.Regions)
}

func TestConnectionDelay(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
ms := newTestEnv(pdc)
called := 0
delayConn := make(chan struct{})
blocked := make(chan struct{}, 64)
ms.connectDelay = func(i uint64) <-chan struct{} {
called += 1
if called == 2 {
blocked <- struct{}{}
return delayConn
}
return nil
}
ctx := context.Background()
prep := New(ms)
connectionPrepareResult := make(chan error)
go func() {
connectionPrepareResult <- prep.PrepareConnections(ctx)
}()
<-blocked
ms.mu.Lock()
nonNilStore := 0
for id, store := range ms.stores {
// We must not create and lease (i.e. reject admin command from any tikv) here.
if store != nil {
req.True(store.leaseUntil.Before(time.Now()), "%d->%s", id, store.leaseUntil)
nonNilStore += 1
}
}
req.GreaterOrEqual(nonNilStore, 2)
ms.mu.Unlock()
delayConn <- struct{}{}
req.NoError(<-connectionPrepareResult)
}

func TestHooks(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
pauseWaitApply := make(chan struct{})
ms := newTestEnv(pdc)
ms.onCreateStore = func(ms *mockStore) {
ms.onWaitApply = func(r *metapb.Region) error {
<-pauseWaitApply
return nil
}
}
adv := New(ms)
connectionsEstablished := new(atomic.Bool)
adv.AfterConnectionsEstablished = func() {
connectionsEstablished.Store(true)
}
errCh := make(chan error, 1)
go func() {
errCh <- adv.DriveLoopAndWaitPrepare(context.Background())
}()
req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond)
close(pauseWaitApply)
req.NoError(<-errCh)
ms.AssertSafeForBackup(t)
req.NoError(adv.Finalize(context.Background()))
ms.AssertIsNormalMode(t)
}
1 change: 1 addition & 0 deletions br/pkg/backup/prepare_snap/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (p *prepareStream) InitConn(ctx context.Context, cli PrepareClient) error {
p.cli = cli
p.clientLoopHandle, ctx = errgroup.WithContext(ctx)
ctx, p.stopBgTasks = context.WithCancel(ctx)
log.Info("initializing", zap.Uint64("store", p.storeID))
return p.GoLeaseLoop(ctx, p.leaseDuration)
}

Expand Down
16 changes: 13 additions & 3 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
}
defer cx.Close()

initChan := make(chan struct{})
cx.run(func() error { return pauseGCKeeper(cx) })
cx.run(func() error { return pauseSchedulerKeeper(cx) })
cx.run(func() error { return pauseAdminAndWaitApply(cx) })
cx.run(func() error {
log.Info("Pause scheduler waiting all connections established.")
<-initChan
log.Info("Pause scheduler noticed connections established.")
return pauseSchedulerKeeper(cx)
})
cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) })
go func() {
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
Expand All @@ -156,7 +162,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
return eg.Wait()
}

func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectionsEstablished chan<- struct{}) error {
env := preparesnap.CliEnv{
Cache: tikv.NewRegionCache(cx.pdMgr.GetPDClient()),
Mgr: cx.kvMgr,
Expand All @@ -166,6 +172,10 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
begin := time.Now()
prep := preparesnap.New(retryEnv)
prep.LeaseDuration = cx.cfg.TTL
prep.AfterConnectionsEstablished = func() {
log.Info("All connections are stablished.")
close(afterConnectionsEstablished)
}

defer cx.cleanUpWith(func(ctx context.Context) {
if err := prep.Finalize(ctx); err != nil {
Expand Down

0 comments on commit b366852

Please sign in to comment.