Skip to content

Commit

Permalink
chunked: improve function to merge chunks
Browse files Browse the repository at this point in the history
improve the function that combines neighbor chunks.  Instead of using
the number of parts, which also includes local files, use only the
number of chunks that must be retrieved from the network.

In addition, introduce a threshold limit to merge chunks so that we
further reduce the number of requested ranges.

Signed-off-by: Giuseppe Scrivano <[email protected]>
  • Loading branch information
giuseppe committed Mar 1, 2024
1 parent 226cffb commit 43b836e
Showing 1 changed file with 46 additions and 34 deletions.
80 changes: 46 additions & 34 deletions pkg/chunked/storage_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

const (
maxNumberMissingChunks = 1024
autoMergePartsThreshold = 128 // if the gap between two ranges is below this threshold, automatically merge them.
newFileFlags = (unix.O_CREAT | unix.O_TRUNC | unix.O_EXCL | unix.O_WRONLY)
containersOverrideXattr = "user.containers.override_stat"
bigDataKey = "zstd-chunked-manifest"
Expand Down Expand Up @@ -1180,22 +1181,12 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
}

func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
getGap := func(missingParts []missingPart, i int) int {
getGap := func(missingParts []missingPart, i int) uint64 {
prev := missingParts[i-1].SourceChunk.Offset + missingParts[i-1].SourceChunk.Length
return int(missingParts[i].SourceChunk.Offset - prev)
}
getCost := func(missingParts []missingPart, i int) int {
cost := getGap(missingParts, i)
if missingParts[i-1].OriginFile != nil {
cost += int(missingParts[i-1].SourceChunk.Length)
}
if missingParts[i].OriginFile != nil {
cost += int(missingParts[i].SourceChunk.Length)
}
return cost
return missingParts[i].SourceChunk.Offset - prev
}

// simple case: merge chunks from the same file.
// simple case: merge chunks from the same file. Useful to reduce the number of parts to work with later.
newMissingParts := missingParts[0:1]
prevIndex := 0
for i := 1; i < len(missingParts); i++ {
Expand All @@ -1215,28 +1206,50 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
}
missingParts = newMissingParts

if len(missingParts) <= target {
return missingParts
}

// this implementation doesn't account for duplicates, so it could merge
// more than necessary to reach the specified target. Since target itself
// is a heuristic value, it doesn't matter.
costs := make([]int, len(missingParts)-1)
for i := 1; i < len(missingParts); i++ {
costs[i-1] = getCost(missingParts, i)
type gap struct {
from int
to int
cost uint64
}
var requestGaps []gap
lastOffset := int(-1)
numberSourceChunks := 0
for i, c := range missingParts {
if c.OriginFile != nil || c.Hole {
// it does not require a network request
continue
}
numberSourceChunks++
if lastOffset >= 0 {
prevEnd := missingParts[lastOffset].SourceChunk.Offset + missingParts[lastOffset].SourceChunk.Length
cost := c.SourceChunk.Offset - prevEnd
g := gap{
from: lastOffset,
to: i,
cost: cost,
}
requestGaps = append(requestGaps, g)
}
lastOffset = i
}
sort.Ints(costs)

toShrink := len(missingParts) - target
if toShrink >= len(costs) {
toShrink = len(costs) - 1
sort.Slice(requestGaps, func(i, j int) bool {
return requestGaps[i].cost < requestGaps[j].cost
})
toMergeMap := make([]bool, len(missingParts))
remainingToMerge := numberSourceChunks - target
for _, g := range requestGaps {
if remainingToMerge < 0 && g.cost > autoMergePartsThreshold {
continue
}
for i := g.from + 1; i <= g.to; i++ {
toMergeMap[i] = true
}
remainingToMerge--
}
targetValue := costs[toShrink]

newMissingParts = missingParts[0:1]
for i := 1; i < len(missingParts); i++ {
if getCost(missingParts, i) > targetValue {
if !toMergeMap[i] {
newMissingParts = append(newMissingParts, missingParts[i])
} else {
gap := getGap(missingParts, i)
Expand Down Expand Up @@ -1268,6 +1281,7 @@ func (c *chunkedDiffer) retrieveMissingFiles(stream ImageSourceSeekable, dest st
}
}

missingParts = mergeMissingChunks(missingParts, maxNumberMissingChunks)
calculateChunksToRequest()

// There are some missing files. Prepare a multirange request for the missing chunks.
Expand All @@ -1281,14 +1295,13 @@ func (c *chunkedDiffer) retrieveMissingFiles(stream ImageSourceSeekable, dest st
}

if _, ok := err.(ErrBadRequest); ok {
requested := len(missingParts)
// If the server cannot handle at least 64 chunks in a single request, just give up.
if requested < 64 {
if len(chunksToRequest) < 64 {
return err
}

// Merge more chunks to request
missingParts = mergeMissingChunks(missingParts, requested/2)
missingParts = mergeMissingChunks(missingParts, len(chunksToRequest)/2)
calculateChunksToRequest()
continue
}
Expand Down Expand Up @@ -1999,7 +2012,6 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
}
// There are some missing files. Prepare a multirange request for the missing chunks.
if len(missingParts) > 0 {
missingParts = mergeMissingChunks(missingParts, maxNumberMissingChunks)
if err := c.retrieveMissingFiles(stream, dest, dirfd, missingParts, options); err != nil {
return output, err
}
Expand Down

0 comments on commit 43b836e

Please sign in to comment.