From 6dbaf1a1e1d03f49f4fb0daff44999c63434cdff Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 19 Mar 2024 13:45:58 +0800 Subject: [PATCH] test: fix the stuck issue when use raw kv restore --- br/pkg/restore/import.go | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 8f909cea1fc63..be549ac13c6a0 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -431,7 +431,7 @@ func NewFileImporter( } func (importer *FileImporter) ShouldBlock() bool { - if importer != nil { + if importer != nil && importer.useTokenBucket { return importer.downloadTokensMap.ShouldBlock() || importer.ingestTokensMap.ShouldBlock() } return false @@ -1113,15 +1113,17 @@ func (importer *FileImporter) downloadSSTV2( for _, p := range regionInfo.Region.GetPeers() { peer := p eg.Go(func() error { - tokenCh := importer.downloadTokensMap.acquireTokenCh(peer.GetStoreId(), importer.concurrencyPerStore) - select { - case <-ectx.Done(): - return ectx.Err() - case <-tokenCh: + if importer.useTokenBucket { + tokenCh := importer.downloadTokensMap.acquireTokenCh(peer.GetStoreId(), importer.concurrencyPerStore) + select { + case <-ectx.Done(): + return ectx.Err() + case <-tokenCh: + } + defer func() { + importer.releaseToken(tokenCh) + }() } - defer func() { - importer.releaseToken(tokenCh) - }() for _, file := range files { req, ok := downloadReqsMap[file.Name] if !ok { @@ -1259,15 +1261,17 @@ func (importer *FileImporter) ingest( info *split.RegionInfo, downloadMetas []*import_sstpb.SSTMeta, ) error { - tokenCh := importer.ingestTokensMap.acquireTokenCh(info.Leader.GetStoreId(), importer.concurrencyPerStore) - select { - case <-ctx.Done(): - return ctx.Err() - case <-tokenCh: - } - defer func() { - importer.releaseToken(tokenCh) - }() + if importer.useTokenBucket { + tokenCh := importer.ingestTokensMap.acquireTokenCh(info.Leader.GetStoreId(), importer.concurrencyPerStore) + select { + case <-ctx.Done(): + return ctx.Err() + case <-tokenCh: + } + defer func() { + importer.releaseToken(tokenCh) + }() + } for { ingestResp, errIngest := importer.ingestSSTs(ctx, downloadMetas, info) if errIngest != nil {