Skip to content

Commit

Permalink
test: fix the stuck issue when use raw kv restore (#51885) (#51919)
Browse files Browse the repository at this point in the history
close #51920
  • Loading branch information
ti-chi-bot authored Mar 20, 2024
1 parent ec16b75 commit ceea10c
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func NewFileImporter(
}

func (importer *FileImporter) ShouldBlock() bool {
if importer != nil {
if importer != nil && importer.useTokenBucket {
return importer.downloadTokensMap.ShouldBlock() || importer.ingestTokensMap.ShouldBlock()
}
return false
Expand Down Expand Up @@ -1113,15 +1113,17 @@ func (importer *FileImporter) downloadSSTV2(
for _, p := range regionInfo.Region.GetPeers() {
peer := p
eg.Go(func() error {
tokenCh := importer.downloadTokensMap.acquireTokenCh(peer.GetStoreId(), importer.concurrencyPerStore)
select {
case <-ectx.Done():
return ectx.Err()
case <-tokenCh:
if importer.useTokenBucket {
tokenCh := importer.downloadTokensMap.acquireTokenCh(peer.GetStoreId(), importer.concurrencyPerStore)
select {
case <-ectx.Done():
return ectx.Err()
case <-tokenCh:
}
defer func() {
importer.releaseToken(tokenCh)
}()
}
defer func() {
importer.releaseToken(tokenCh)
}()
for _, file := range files {
req, ok := downloadReqsMap[file.Name]
if !ok {
Expand Down Expand Up @@ -1259,15 +1261,17 @@ func (importer *FileImporter) ingest(
info *split.RegionInfo,
downloadMetas []*import_sstpb.SSTMeta,
) error {
tokenCh := importer.ingestTokensMap.acquireTokenCh(info.Leader.GetStoreId(), importer.concurrencyPerStore)
select {
case <-ctx.Done():
return ctx.Err()
case <-tokenCh:
}
defer func() {
importer.releaseToken(tokenCh)
}()
if importer.useTokenBucket {
tokenCh := importer.ingestTokensMap.acquireTokenCh(info.Leader.GetStoreId(), importer.concurrencyPerStore)
select {
case <-ctx.Done():
return ctx.Err()
case <-tokenCh:
}
defer func() {
importer.releaseToken(tokenCh)
}()
}
for {
ingestResp, errIngest := importer.ingestSSTs(ctx, downloadMetas, info)
if errIngest != nil {
Expand Down

0 comments on commit ceea10c

Please sign in to comment.