Skip to content

Commit

Permalink
chunk/store: limit max retry for async upload as well (#1673)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyXSD authored Mar 29, 2022
1 parent bc7f6ac commit b4f2d4a
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,6 @@ func (c *wChunk) asyncUpload(key string, block *Page, stagingPath string) {
return
}
blockSize := len(block.Data)
defer c.store.bcache.uploaded(key, blockSize)
defer func() {
<-c.store.currentUpload
}()
Expand All @@ -495,19 +494,24 @@ func (c *wChunk) asyncUpload(key string, block *Page, stagingPath string) {
block.Release()

try := 0
for c.uploadError == nil {
for try <= 3 { // for async, c.uploadError is always nil
err = c.put(key, buf)
if err == nil {
break
}
logger.Warnf("upload %s: %s (tried %d)", key, err, try)
try++
time.Sleep(time.Second * time.Duration(try))
logger.Warnf("upload %s: %s (tried %d)", key, err, try)
time.Sleep(time.Second * time.Duration(try*try))
}
buf.Release()
if err = os.Remove(stagingPath); err == nil {
stageBlocks.Sub(1)
stageBlockBytes.Sub(float64(blockSize))
if err == nil {
c.store.bcache.uploaded(key, blockSize)
if os.Remove(stagingPath) == nil {
stageBlocks.Sub(1)
stageBlockBytes.Sub(float64(blockSize))
}
} else { // add to delay list and wait for later scanning
c.store.addDelayedStaging(key, stagingPath, time.Now().Add(time.Second*30), false)
}
}

Expand Down Expand Up @@ -878,7 +882,7 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
}
compressed := buf.Data[:n]
try := 0
for {
for try <= 3 {
if store.upLimit != nil {
store.upLimit.Wait(int64(len(compressed)))
}
Expand All @@ -896,15 +900,17 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
} else {
objectReqErrors.Add(1)
}
logger.Warnf("upload %s: %s (try %d)", key, err, try)
try++
logger.Warnf("upload %s: %s (try %d)", key, err, try)
time.Sleep(time.Second * time.Duration(try*try))
}
store.bcache.uploaded(key, blockSize)
store.removeStaging(key)
if err = os.Remove(stagingPath); err == nil {
stageBlocks.Sub(1)
stageBlockBytes.Sub(float64(blockSize))
if err == nil {
store.bcache.uploaded(key, blockSize)
store.removeStaging(key)
if os.Remove(stagingPath) == nil {
stageBlocks.Sub(1)
stageBlockBytes.Sub(float64(blockSize))
}
}
}

Expand Down

0 comments on commit b4f2d4a

Please sign in to comment.