Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docker: support for requesting chunks without end offset #2391

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions copy/progress_bars.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"math"
"time"

"github.com/containers/image/v5/internal/private"
Expand Down Expand Up @@ -151,12 +152,18 @@ type blobChunkAccessorProxy struct {
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
// If the Length for the last chunk is set to math.MaxUint64, then it
// fully fetches the remaining data from the offset to the end of the blob.
func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
start := time.Now()
rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks)
mtrmac marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
total := int64(0)
for _, c := range chunks {
// do not update the progress bar if there is a chunk with unknown length.
if c.Length == math.MaxUint64 {
return rc, errs, err
}
total += int64(c.Length)
}
s.bar.EwmaIncrInt64(total, time.Since(start))
Expand Down
23 changes: 21 additions & 2 deletions docker/docker_image_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"mime"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -260,9 +261,15 @@ func splitHTTP200ResponseToPartial(streams chan io.ReadCloser, errs chan error,
}
currentOffset += toSkip
}
var reader io.Reader
if c.Length == math.MaxUint64 {
reader = body
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking:

Suggested change
reader = body
reader = body // The caller has ensured that this is the last requested chunk

} else {
reader = io.LimitReader(body, int64(c.Length))
}
s := signalCloseReader{
closed: make(chan struct{}),
stream: io.NopCloser(io.LimitReader(body, int64(c.Length))),
stream: io.NopCloser(reader),
consumeStream: true,
}
streams <- s
Expand Down Expand Up @@ -343,12 +350,24 @@ func parseMediaType(contentType string) (string, map[string]string, error) {
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
// If the Length for the last chunk is set to math.MaxUint64, then it
// fully fetches the remaining data from the offset to the end of the blob.
func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
headers := make(map[string][]string)

rangeVals := make([]string, 0, len(chunks))
lastFound := false
for _, c := range chunks {
rangeVals = append(rangeVals, fmt.Sprintf("%d-%d", c.Offset, c.Offset+c.Length-1))
if lastFound {
return nil, nil, fmt.Errorf("internal error: another chunk requested after an util-EOF chunk")
}
// If the Length is set to -1, then request anything after the specified offset.
if c.Length == math.MaxUint64 {
lastFound = true
rangeVals = append(rangeVals, fmt.Sprintf("%d-", c.Offset))
} else {
rangeVals = append(rangeVals, fmt.Sprintf("%d-%d", c.Offset, c.Offset+c.Length-1))
}
}

headers["Range"] = []string{fmt.Sprintf("bytes=%s", strings.Join(rangeVals, ","))}
Expand Down
2 changes: 2 additions & 0 deletions internal/imagesource/stubs/get_blob_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (stub NoGetBlobAtInitialize) SupportsGetBlobAt() bool {
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
// If the Length for the last chunk is set to math.MaxUint64, then it
// fully fetches the remaining data from the offset to the end of the blob.
func (stub NoGetBlobAtInitialize) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
return nil, nil, fmt.Errorf("internal error: GetBlobAt is not supported by the %q transport", stub.transportName)
}
Expand Down
6 changes: 6 additions & 0 deletions internal/private/private.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ type ReusedBlob struct {
// ImageSourceChunk is a portion of a blob.
// This API is experimental and can be changed without bumping the major version number.
type ImageSourceChunk struct {
// Offset specifies the starting position of the chunk within the source blob.
Offset uint64

// Length specifies the size of the chunk. If it is set to math.MaxUint64,
// then it refers to all the data from Offset to the end of the blob.
Length uint64
}

Expand All @@ -154,6 +158,8 @@ type BlobChunkAccessor interface {
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
// If the Length for the last chunk is set to math.MaxUint64, then it
// fully fetches the remaining data from the offset to the end of the blob.
mtrmac marked this conversation as resolved.
Show resolved Hide resolved
GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []ImageSourceChunk) (chan io.ReadCloser, chan error, error)
}

Expand Down
2 changes: 2 additions & 0 deletions oci/archive/oci_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func (s *ociArchiveImageSource) SupportsGetBlobAt() bool {
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
// If the Length for the last chunk is set to math.MaxUint64, then it
// fully fetches the remaining data from the offset to the end of the blob.
func (s *ociArchiveImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
return s.unpackedSrc.GetBlobAt(ctx, info, chunks)
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/blobcache/src.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"sync"

Expand Down Expand Up @@ -226,9 +227,15 @@ func streamChunksFromFile(streams chan io.ReadCloser, errs chan error, file io.R
errs <- err
break
}
var stream io.Reader
if c.Length != math.MaxUint64 {
stream = io.LimitReader(file, int64(c.Length))
} else {
stream = file
}
s := signalCloseReader{
closed: make(chan struct{}),
stream: io.LimitReader(file, int64(c.Length)),
stream: stream,
}
streams <- s

Expand Down Expand Up @@ -256,6 +263,8 @@ func (s signalCloseReader) Close() error {
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
// If the Length for the last chunk is set to math.MaxUint64, then it
// fully fetches the remaining data from the offset to the end of the blob.
mtrmac marked this conversation as resolved.
Show resolved Hide resolved
func (s *blobCacheSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
blobPath, _, _, err := s.reference.findBlob(info)
if err != nil {
Expand Down