From 3770bca4b59910084ecfbecd5a71e04921a8b331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Tue, 15 Feb 2022 14:35:51 +0100 Subject: [PATCH 1/9] Move blobCacheSource into a separate source file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ... to make the files a bit shorter. Only moves the code without modification, should not change behavior. Signed-off-by: Miloslav Trmač --- pkg/blobcache/blobcache.go | 163 --------------------------------- pkg/blobcache/src.go | 180 +++++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 163 deletions(-) create mode 100644 pkg/blobcache/src.go diff --git a/pkg/blobcache/blobcache.go b/pkg/blobcache/blobcache.go index 6818caef42..5373af2ba6 100644 --- a/pkg/blobcache/blobcache.go +++ b/pkg/blobcache/blobcache.go @@ -12,13 +12,11 @@ import ( "github.com/containers/image/v5/docker/reference" "github.com/containers/image/v5/internal/image" "github.com/containers/image/v5/manifest" - "github.com/containers/image/v5/pkg/compression" "github.com/containers/image/v5/transports" "github.com/containers/image/v5/types" "github.com/containers/storage/pkg/archive" "github.com/containers/storage/pkg/ioutils" digest "github.com/opencontainers/go-digest" - v1 "github.com/opencontainers/image-spec/specs-go/v1" perrors "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -47,17 +45,6 @@ type BlobCache struct { compress types.LayerCompression } -type blobCacheSource struct { - reference *BlobCache - source types.ImageSource - sys types.SystemContext - // this mutex synchronizes the counters below - mu sync.Mutex - cacheHits int64 - cacheMisses int64 - cacheErrors int64 -} - type blobCacheDestination struct { reference *BlobCache destination types.ImageDestination @@ -162,15 +149,6 @@ func (b *BlobCache) NewImage(ctx context.Context, sys *types.SystemContext) (typ return image.FromReference(ctx, sys, b) } -func (b *BlobCache) NewImageSource(ctx context.Context, sys *types.SystemContext) (types.ImageSource, error) { - src, err := b.reference.NewImageSource(ctx, sys) - if err != nil { - return nil, perrors.Wrapf(err, "error creating new image source %q", transports.ImageName(b.reference)) - } - logrus.Debugf("starting to read from image %q using blob cache in %q (compression=%v)", transports.ImageName(b.reference), b.directory, b.compress) - return &blobCacheSource{reference: b, source: src, sys: *sys}, nil -} - func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemContext) (types.ImageDestination, error) { dest, err := b.reference.NewImageDestination(ctx, sys) if err != nil { @@ -180,147 +158,6 @@ func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemCo return &blobCacheDestination{reference: b, destination: dest}, nil } -func (s *blobCacheSource) Reference() types.ImageReference { - return s.reference -} - -func (s *blobCacheSource) Close() error { - logrus.Debugf("finished reading from image %q using blob cache: cache had %d hits, %d misses, %d errors", transports.ImageName(s.reference), s.cacheHits, s.cacheMisses, s.cacheErrors) - return s.source.Close() -} - -func (s *blobCacheSource) GetManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) { - if instanceDigest != nil { - filename := filepath.Join(s.reference.directory, makeFilename(*instanceDigest, false)) - manifestBytes, err := os.ReadFile(filename) - if err == nil { - s.cacheHits++ - return manifestBytes, manifest.GuessMIMEType(manifestBytes), nil - } - if !os.IsNotExist(err) { - s.cacheErrors++ - return nil, "", perrors.Wrap(err, "checking for manifest file") - } - } - s.cacheMisses++ - return s.source.GetManifest(ctx, instanceDigest) -} - -func (s *blobCacheSource) HasThreadSafeGetBlob() bool { - return s.source.HasThreadSafeGetBlob() -} - -func (s *blobCacheSource) GetBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) { - present, size, err := s.reference.HasBlob(blobinfo) - if err != nil { - return nil, -1, err - } - if present { - for _, isConfig := range []bool{false, true} { - filename := filepath.Join(s.reference.directory, makeFilename(blobinfo.Digest, isConfig)) - f, err := os.Open(filename) - if err == nil { - s.mu.Lock() - s.cacheHits++ - s.mu.Unlock() - return f, size, nil - } - if !os.IsNotExist(err) { - s.mu.Lock() - s.cacheErrors++ - s.mu.Unlock() - return nil, -1, perrors.Wrap(err, "checking for cache") - } - } - } - s.mu.Lock() - s.cacheMisses++ - s.mu.Unlock() - rc, size, err := s.source.GetBlob(ctx, blobinfo, cache) - if err != nil { - return rc, size, perrors.Wrapf(err, "error reading blob from source image %q", transports.ImageName(s.reference)) - } - return rc, size, nil -} - -func (s *blobCacheSource) GetSignatures(ctx context.Context, instanceDigest *digest.Digest) ([][]byte, error) { - return s.source.GetSignatures(ctx, instanceDigest) -} - -func (s *blobCacheSource) LayerInfosForCopy(ctx context.Context, instanceDigest *digest.Digest) ([]types.BlobInfo, error) { - signatures, err := s.source.GetSignatures(ctx, instanceDigest) - if err != nil { - return nil, perrors.Wrapf(err, "error checking if image %q has signatures", transports.ImageName(s.reference)) - } - canReplaceBlobs := !(len(signatures) > 0 && len(signatures[0]) > 0) - - infos, err := s.source.LayerInfosForCopy(ctx, instanceDigest) - if err != nil { - return nil, perrors.Wrapf(err, "error getting layer infos for copying image %q through cache", transports.ImageName(s.reference)) - } - if infos == nil { - img, err := image.FromUnparsedImage(ctx, &s.sys, image.UnparsedInstance(s.source, instanceDigest)) - if err != nil { - return nil, perrors.Wrapf(err, "error opening image to get layer infos for copying image %q through cache", transports.ImageName(s.reference)) - } - infos = img.LayerInfos() - } - - if canReplaceBlobs && s.reference.compress != types.PreserveOriginal { - replacedInfos := make([]types.BlobInfo, 0, len(infos)) - for _, info := range infos { - var replaceDigest []byte - var err error - blobFile := filepath.Join(s.reference.directory, makeFilename(info.Digest, false)) - alternate := "" - switch s.reference.compress { - case types.Compress: - alternate = blobFile + compressedNote - replaceDigest, err = os.ReadFile(alternate) - case types.Decompress: - alternate = blobFile + decompressedNote - replaceDigest, err = os.ReadFile(alternate) - } - if err == nil && digest.Digest(replaceDigest).Validate() == nil { - alternate = filepath.Join(filepath.Dir(alternate), makeFilename(digest.Digest(replaceDigest), false)) - fileInfo, err := os.Stat(alternate) - if err == nil { - switch info.MediaType { - case v1.MediaTypeImageLayer, v1.MediaTypeImageLayerGzip: - switch s.reference.compress { - case types.Compress: - info.MediaType = v1.MediaTypeImageLayerGzip - info.CompressionAlgorithm = &compression.Gzip - case types.Decompress: - info.MediaType = v1.MediaTypeImageLayer - info.CompressionAlgorithm = nil - } - case manifest.DockerV2SchemaLayerMediaTypeUncompressed, manifest.DockerV2Schema2LayerMediaType: - switch s.reference.compress { - case types.Compress: - info.MediaType = manifest.DockerV2Schema2LayerMediaType - info.CompressionAlgorithm = &compression.Gzip - case types.Decompress: - // nope, not going to suggest anything, it's not allowed by the spec - replacedInfos = append(replacedInfos, info) - continue - } - } - logrus.Debugf("suggesting cached blob with digest %q, type %q, and compression %v in place of blob with digest %q", string(replaceDigest), info.MediaType, s.reference.compress, info.Digest.String()) - info.CompressionOperation = s.reference.compress - info.Digest = digest.Digest(replaceDigest) - info.Size = fileInfo.Size() - logrus.Debugf("info = %#v", info) - } - } - replacedInfos = append(replacedInfos, info) - } - infos = replacedInfos - } - - return infos, nil -} - func (d *blobCacheDestination) Reference() types.ImageReference { return d.reference } diff --git a/pkg/blobcache/src.go b/pkg/blobcache/src.go new file mode 100644 index 0000000000..4fd237956c --- /dev/null +++ b/pkg/blobcache/src.go @@ -0,0 +1,180 @@ +package blobcache + +import ( + "context" + "io" + "os" + "path/filepath" + "sync" + + "github.com/containers/image/v5/internal/image" + "github.com/containers/image/v5/manifest" + "github.com/containers/image/v5/pkg/compression" + "github.com/containers/image/v5/transports" + "github.com/containers/image/v5/types" + digest "github.com/opencontainers/go-digest" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + perrors "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type blobCacheSource struct { + reference *BlobCache + source types.ImageSource + sys types.SystemContext + // this mutex synchronizes the counters below + mu sync.Mutex + cacheHits int64 + cacheMisses int64 + cacheErrors int64 +} + +func (b *BlobCache) NewImageSource(ctx context.Context, sys *types.SystemContext) (types.ImageSource, error) { + src, err := b.reference.NewImageSource(ctx, sys) + if err != nil { + return nil, perrors.Wrapf(err, "error creating new image source %q", transports.ImageName(b.reference)) + } + logrus.Debugf("starting to read from image %q using blob cache in %q (compression=%v)", transports.ImageName(b.reference), b.directory, b.compress) + return &blobCacheSource{reference: b, source: src, sys: *sys}, nil +} + +func (s *blobCacheSource) Reference() types.ImageReference { + return s.reference +} + +func (s *blobCacheSource) Close() error { + logrus.Debugf("finished reading from image %q using blob cache: cache had %d hits, %d misses, %d errors", transports.ImageName(s.reference), s.cacheHits, s.cacheMisses, s.cacheErrors) + return s.source.Close() +} + +func (s *blobCacheSource) GetManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) { + if instanceDigest != nil { + filename := filepath.Join(s.reference.directory, makeFilename(*instanceDigest, false)) + manifestBytes, err := os.ReadFile(filename) + if err == nil { + s.cacheHits++ + return manifestBytes, manifest.GuessMIMEType(manifestBytes), nil + } + if !os.IsNotExist(err) { + s.cacheErrors++ + return nil, "", perrors.Wrap(err, "checking for manifest file") + } + } + s.cacheMisses++ + return s.source.GetManifest(ctx, instanceDigest) +} + +func (s *blobCacheSource) HasThreadSafeGetBlob() bool { + return s.source.HasThreadSafeGetBlob() +} + +func (s *blobCacheSource) GetBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) { + present, size, err := s.reference.HasBlob(blobinfo) + if err != nil { + return nil, -1, err + } + if present { + for _, isConfig := range []bool{false, true} { + filename := filepath.Join(s.reference.directory, makeFilename(blobinfo.Digest, isConfig)) + f, err := os.Open(filename) + if err == nil { + s.mu.Lock() + s.cacheHits++ + s.mu.Unlock() + return f, size, nil + } + if !os.IsNotExist(err) { + s.mu.Lock() + s.cacheErrors++ + s.mu.Unlock() + return nil, -1, perrors.Wrap(err, "checking for cache") + } + } + } + s.mu.Lock() + s.cacheMisses++ + s.mu.Unlock() + rc, size, err := s.source.GetBlob(ctx, blobinfo, cache) + if err != nil { + return rc, size, perrors.Wrapf(err, "error reading blob from source image %q", transports.ImageName(s.reference)) + } + return rc, size, nil +} + +func (s *blobCacheSource) GetSignatures(ctx context.Context, instanceDigest *digest.Digest) ([][]byte, error) { + return s.source.GetSignatures(ctx, instanceDigest) +} + +func (s *blobCacheSource) LayerInfosForCopy(ctx context.Context, instanceDigest *digest.Digest) ([]types.BlobInfo, error) { + signatures, err := s.source.GetSignatures(ctx, instanceDigest) + if err != nil { + return nil, perrors.Wrapf(err, "error checking if image %q has signatures", transports.ImageName(s.reference)) + } + canReplaceBlobs := !(len(signatures) > 0 && len(signatures[0]) > 0) + + infos, err := s.source.LayerInfosForCopy(ctx, instanceDigest) + if err != nil { + return nil, perrors.Wrapf(err, "error getting layer infos for copying image %q through cache", transports.ImageName(s.reference)) + } + if infos == nil { + img, err := image.FromUnparsedImage(ctx, &s.sys, image.UnparsedInstance(s.source, instanceDigest)) + if err != nil { + return nil, perrors.Wrapf(err, "error opening image to get layer infos for copying image %q through cache", transports.ImageName(s.reference)) + } + infos = img.LayerInfos() + } + + if canReplaceBlobs && s.reference.compress != types.PreserveOriginal { + replacedInfos := make([]types.BlobInfo, 0, len(infos)) + for _, info := range infos { + var replaceDigest []byte + var err error + blobFile := filepath.Join(s.reference.directory, makeFilename(info.Digest, false)) + alternate := "" + switch s.reference.compress { + case types.Compress: + alternate = blobFile + compressedNote + replaceDigest, err = os.ReadFile(alternate) + case types.Decompress: + alternate = blobFile + decompressedNote + replaceDigest, err = os.ReadFile(alternate) + } + if err == nil && digest.Digest(replaceDigest).Validate() == nil { + alternate = filepath.Join(filepath.Dir(alternate), makeFilename(digest.Digest(replaceDigest), false)) + fileInfo, err := os.Stat(alternate) + if err == nil { + switch info.MediaType { + case v1.MediaTypeImageLayer, v1.MediaTypeImageLayerGzip: + switch s.reference.compress { + case types.Compress: + info.MediaType = v1.MediaTypeImageLayerGzip + info.CompressionAlgorithm = &compression.Gzip + case types.Decompress: + info.MediaType = v1.MediaTypeImageLayer + info.CompressionAlgorithm = nil + } + case manifest.DockerV2SchemaLayerMediaTypeUncompressed, manifest.DockerV2Schema2LayerMediaType: + switch s.reference.compress { + case types.Compress: + info.MediaType = manifest.DockerV2Schema2LayerMediaType + info.CompressionAlgorithm = &compression.Gzip + case types.Decompress: + // nope, not going to suggest anything, it's not allowed by the spec + replacedInfos = append(replacedInfos, info) + continue + } + } + logrus.Debugf("suggesting cached blob with digest %q, type %q, and compression %v in place of blob with digest %q", string(replaceDigest), info.MediaType, s.reference.compress, info.Digest.String()) + info.CompressionOperation = s.reference.compress + info.Digest = digest.Digest(replaceDigest) + info.Size = fileInfo.Size() + logrus.Debugf("info = %#v", info) + } + } + replacedInfos = append(replacedInfos, info) + } + infos = replacedInfos + } + + return infos, nil +} From 13c5e5bdb04934c50776b5636ffc7f475720a918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Tue, 15 Feb 2022 14:39:59 +0100 Subject: [PATCH 2/9] Move blobCacheDestination into a separate source file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ... to make the files a bit shorter. Only moves the code without modification, should not change behavior. Signed-off-by: Miloslav Trmač --- pkg/blobcache/blobcache.go | 234 ----------------------------------- pkg/blobcache/dest.go | 246 +++++++++++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+), 234 deletions(-) create mode 100644 pkg/blobcache/dest.go diff --git a/pkg/blobcache/blobcache.go b/pkg/blobcache/blobcache.go index 5373af2ba6..08cd35d6e5 100644 --- a/pkg/blobcache/blobcache.go +++ b/pkg/blobcache/blobcache.go @@ -1,24 +1,17 @@ package blobcache import ( - "bytes" "context" "fmt" - "io" "os" "path/filepath" - "sync" "github.com/containers/image/v5/docker/reference" "github.com/containers/image/v5/internal/image" - "github.com/containers/image/v5/manifest" "github.com/containers/image/v5/transports" "github.com/containers/image/v5/types" - "github.com/containers/storage/pkg/archive" - "github.com/containers/storage/pkg/ioutils" digest "github.com/opencontainers/go-digest" perrors "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) var ( @@ -45,11 +38,6 @@ type BlobCache struct { compress types.LayerCompression } -type blobCacheDestination struct { - reference *BlobCache - destination types.ImageDestination -} - func makeFilename(blobSum digest.Digest, isConfig bool) string { if isConfig { return blobSum.String() + ".config" @@ -148,225 +136,3 @@ func (b *BlobCache) ClearCache() error { func (b *BlobCache) NewImage(ctx context.Context, sys *types.SystemContext) (types.ImageCloser, error) { return image.FromReference(ctx, sys, b) } - -func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemContext) (types.ImageDestination, error) { - dest, err := b.reference.NewImageDestination(ctx, sys) - if err != nil { - return nil, perrors.Wrapf(err, "error creating new image destination %q", transports.ImageName(b.reference)) - } - logrus.Debugf("starting to write to image %q using blob cache in %q", transports.ImageName(b.reference), b.directory) - return &blobCacheDestination{reference: b, destination: dest}, nil -} - -func (d *blobCacheDestination) Reference() types.ImageReference { - return d.reference -} - -func (d *blobCacheDestination) Close() error { - logrus.Debugf("finished writing to image %q using blob cache", transports.ImageName(d.reference)) - return d.destination.Close() -} - -func (d *blobCacheDestination) SupportedManifestMIMETypes() []string { - return d.destination.SupportedManifestMIMETypes() -} - -func (d *blobCacheDestination) SupportsSignatures(ctx context.Context) error { - return d.destination.SupportsSignatures(ctx) -} - -func (d *blobCacheDestination) DesiredLayerCompression() types.LayerCompression { - return d.destination.DesiredLayerCompression() -} - -func (d *blobCacheDestination) AcceptsForeignLayerURLs() bool { - return d.destination.AcceptsForeignLayerURLs() -} - -func (d *blobCacheDestination) MustMatchRuntimeOS() bool { - return d.destination.MustMatchRuntimeOS() -} - -func (d *blobCacheDestination) IgnoresEmbeddedDockerReference() bool { - return d.destination.IgnoresEmbeddedDockerReference() -} - -// Decompress and save the contents of the decompressReader stream into the passed-in temporary -// file. If we successfully save all of the data, rename the file to match the digest of the data, -// and make notes about the relationship between the file that holds a copy of the compressed data -// and this new file. -func saveStream(wg *sync.WaitGroup, decompressReader io.ReadCloser, tempFile *os.File, compressedFilename string, compressedDigest digest.Digest, isConfig bool, alternateDigest *digest.Digest) { - defer wg.Done() - // Decompress from and digest the reading end of that pipe. - decompressed, err3 := archive.DecompressStream(decompressReader) - digester := digest.Canonical.Digester() - if err3 == nil { - // Read the decompressed data through the filter over the pipe, blocking until the - // writing end is closed. - _, err3 = io.Copy(io.MultiWriter(tempFile, digester.Hash()), decompressed) - } else { - // Drain the pipe to keep from stalling the PutBlob() thread. - if _, err := io.Copy(io.Discard, decompressReader); err != nil { - logrus.Debugf("error draining the pipe: %v", err) - } - } - decompressReader.Close() - decompressed.Close() - tempFile.Close() - // Determine the name that we should give to the uncompressed copy of the blob. - decompressedFilename := filepath.Join(filepath.Dir(tempFile.Name()), makeFilename(digester.Digest(), isConfig)) - if err3 == nil { - // Rename the temporary file. - if err3 = os.Rename(tempFile.Name(), decompressedFilename); err3 != nil { - logrus.Debugf("error renaming new decompressed copy of blob %q into place at %q: %v", digester.Digest().String(), decompressedFilename, err3) - // Remove the temporary file. - if err3 = os.Remove(tempFile.Name()); err3 != nil { - logrus.Debugf("error cleaning up temporary file %q for decompressed copy of blob %q: %v", tempFile.Name(), compressedDigest.String(), err3) - } - } else { - *alternateDigest = digester.Digest() - // Note the relationship between the two files. - if err3 = ioutils.AtomicWriteFile(decompressedFilename+compressedNote, []byte(compressedDigest.String()), 0600); err3 != nil { - logrus.Debugf("error noting that the compressed version of %q is %q: %v", digester.Digest().String(), compressedDigest.String(), err3) - } - if err3 = ioutils.AtomicWriteFile(compressedFilename+decompressedNote, []byte(digester.Digest().String()), 0600); err3 != nil { - logrus.Debugf("error noting that the decompressed version of %q is %q: %v", compressedDigest.String(), digester.Digest().String(), err3) - } - } - } else { - // Remove the temporary file. - if err3 = os.Remove(tempFile.Name()); err3 != nil { - logrus.Debugf("error cleaning up temporary file %q for decompressed copy of blob %q: %v", tempFile.Name(), compressedDigest.String(), err3) - } - } -} - -func (d *blobCacheDestination) HasThreadSafePutBlob() bool { - return d.destination.HasThreadSafePutBlob() -} - -func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - var tempfile *os.File - var err error - var n int - var alternateDigest digest.Digest - var closer io.Closer - wg := new(sync.WaitGroup) - needToWait := false - compression := archive.Uncompressed - if inputInfo.Digest != "" { - filename := filepath.Join(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) - tempfile, err = os.CreateTemp(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) - if err == nil { - stream = io.TeeReader(stream, tempfile) - defer func() { - if err == nil { - if err = os.Rename(tempfile.Name(), filename); err != nil { - if err2 := os.Remove(tempfile.Name()); err2 != nil { - logrus.Debugf("error cleaning up temporary file %q for blob %q: %v", tempfile.Name(), inputInfo.Digest.String(), err2) - } - err = perrors.Wrapf(err, "error renaming new layer for blob %q into place at %q", inputInfo.Digest.String(), filename) - } - } else { - if err2 := os.Remove(tempfile.Name()); err2 != nil { - logrus.Debugf("error cleaning up temporary file %q for blob %q: %v", tempfile.Name(), inputInfo.Digest.String(), err2) - } - } - tempfile.Close() - }() - } else { - logrus.Debugf("error while creating a temporary file under %q to hold blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err) - } - if !isConfig { - initial := make([]byte, 8) - n, err = stream.Read(initial) - if n > 0 { - // Build a Reader that will still return the bytes that we just - // read, for PutBlob()'s sake. - stream = io.MultiReader(bytes.NewReader(initial[:n]), stream) - if n >= len(initial) { - compression = archive.DetectCompression(initial[:n]) - } - if compression == archive.Gzip { - // The stream is compressed, so create a file which we'll - // use to store a decompressed copy. - decompressedTemp, err2 := os.CreateTemp(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) - if err2 != nil { - logrus.Debugf("error while creating a temporary file under %q to hold decompressed blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err2) - decompressedTemp.Close() - } else { - // Write a copy of the compressed data to a pipe, - // closing the writing end of the pipe after - // PutBlob() returns. - decompressReader, decompressWriter := io.Pipe() - closer = decompressWriter - stream = io.TeeReader(stream, decompressWriter) - // Let saveStream() close the reading end and handle the temporary file. - wg.Add(1) - needToWait = true - go saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, isConfig, &alternateDigest) - } - } - } - } - } - newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, cache, isConfig) - if closer != nil { - closer.Close() - } - if needToWait { - wg.Wait() - } - if err != nil { - return newBlobInfo, perrors.Wrapf(err, "error storing blob to image destination for cache %q", transports.ImageName(d.reference)) - } - if alternateDigest.Validate() == nil { - logrus.Debugf("added blob %q (also %q) to the cache at %q", inputInfo.Digest.String(), alternateDigest.String(), d.reference.directory) - } else { - logrus.Debugf("added blob %q to the cache at %q", inputInfo.Digest.String(), d.reference.directory) - } - return newBlobInfo, nil -} - -func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, cache, canSubstitute) - if err != nil || present { - return present, reusedInfo, err - } - - for _, isConfig := range []bool{false, true} { - filename := filepath.Join(d.reference.directory, makeFilename(info.Digest, isConfig)) - f, err := os.Open(filename) - if err == nil { - defer f.Close() - uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig) - if err != nil { - return false, types.BlobInfo{}, err - } - return true, uploadedInfo, nil - } - } - - return false, types.BlobInfo{}, nil -} - -func (d *blobCacheDestination) PutManifest(ctx context.Context, manifestBytes []byte, instanceDigest *digest.Digest) error { - manifestDigest, err := manifest.Digest(manifestBytes) - if err != nil { - logrus.Warnf("error digesting manifest %q: %v", string(manifestBytes), err) - } else { - filename := filepath.Join(d.reference.directory, makeFilename(manifestDigest, false)) - if err = ioutils.AtomicWriteFile(filename, manifestBytes, 0600); err != nil { - logrus.Warnf("error saving manifest as %q: %v", filename, err) - } - } - return d.destination.PutManifest(ctx, manifestBytes, instanceDigest) -} - -func (d *blobCacheDestination) PutSignatures(ctx context.Context, signatures [][]byte, instanceDigest *digest.Digest) error { - return d.destination.PutSignatures(ctx, signatures, instanceDigest) -} - -func (d *blobCacheDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error { - return d.destination.Commit(ctx, unparsedToplevel) -} diff --git a/pkg/blobcache/dest.go b/pkg/blobcache/dest.go new file mode 100644 index 0000000000..c16c56b367 --- /dev/null +++ b/pkg/blobcache/dest.go @@ -0,0 +1,246 @@ +package blobcache + +import ( + "bytes" + "context" + "io" + "os" + "path/filepath" + "sync" + + "github.com/containers/image/v5/manifest" + "github.com/containers/image/v5/transports" + "github.com/containers/image/v5/types" + "github.com/containers/storage/pkg/archive" + "github.com/containers/storage/pkg/ioutils" + digest "github.com/opencontainers/go-digest" + perrors "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type blobCacheDestination struct { + reference *BlobCache + destination types.ImageDestination +} + +func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemContext) (types.ImageDestination, error) { + dest, err := b.reference.NewImageDestination(ctx, sys) + if err != nil { + return nil, perrors.Wrapf(err, "error creating new image destination %q", transports.ImageName(b.reference)) + } + logrus.Debugf("starting to write to image %q using blob cache in %q", transports.ImageName(b.reference), b.directory) + return &blobCacheDestination{reference: b, destination: dest}, nil +} + +func (d *blobCacheDestination) Reference() types.ImageReference { + return d.reference +} + +func (d *blobCacheDestination) Close() error { + logrus.Debugf("finished writing to image %q using blob cache", transports.ImageName(d.reference)) + return d.destination.Close() +} + +func (d *blobCacheDestination) SupportedManifestMIMETypes() []string { + return d.destination.SupportedManifestMIMETypes() +} + +func (d *blobCacheDestination) SupportsSignatures(ctx context.Context) error { + return d.destination.SupportsSignatures(ctx) +} + +func (d *blobCacheDestination) DesiredLayerCompression() types.LayerCompression { + return d.destination.DesiredLayerCompression() +} + +func (d *blobCacheDestination) AcceptsForeignLayerURLs() bool { + return d.destination.AcceptsForeignLayerURLs() +} + +func (d *blobCacheDestination) MustMatchRuntimeOS() bool { + return d.destination.MustMatchRuntimeOS() +} + +func (d *blobCacheDestination) IgnoresEmbeddedDockerReference() bool { + return d.destination.IgnoresEmbeddedDockerReference() +} + +// Decompress and save the contents of the decompressReader stream into the passed-in temporary +// file. If we successfully save all of the data, rename the file to match the digest of the data, +// and make notes about the relationship between the file that holds a copy of the compressed data +// and this new file. +func saveStream(wg *sync.WaitGroup, decompressReader io.ReadCloser, tempFile *os.File, compressedFilename string, compressedDigest digest.Digest, isConfig bool, alternateDigest *digest.Digest) { + defer wg.Done() + // Decompress from and digest the reading end of that pipe. + decompressed, err3 := archive.DecompressStream(decompressReader) + digester := digest.Canonical.Digester() + if err3 == nil { + // Read the decompressed data through the filter over the pipe, blocking until the + // writing end is closed. + _, err3 = io.Copy(io.MultiWriter(tempFile, digester.Hash()), decompressed) + } else { + // Drain the pipe to keep from stalling the PutBlob() thread. + if _, err := io.Copy(io.Discard, decompressReader); err != nil { + logrus.Debugf("error draining the pipe: %v", err) + } + } + decompressReader.Close() + decompressed.Close() + tempFile.Close() + // Determine the name that we should give to the uncompressed copy of the blob. + decompressedFilename := filepath.Join(filepath.Dir(tempFile.Name()), makeFilename(digester.Digest(), isConfig)) + if err3 == nil { + // Rename the temporary file. + if err3 = os.Rename(tempFile.Name(), decompressedFilename); err3 != nil { + logrus.Debugf("error renaming new decompressed copy of blob %q into place at %q: %v", digester.Digest().String(), decompressedFilename, err3) + // Remove the temporary file. + if err3 = os.Remove(tempFile.Name()); err3 != nil { + logrus.Debugf("error cleaning up temporary file %q for decompressed copy of blob %q: %v", tempFile.Name(), compressedDigest.String(), err3) + } + } else { + *alternateDigest = digester.Digest() + // Note the relationship between the two files. + if err3 = ioutils.AtomicWriteFile(decompressedFilename+compressedNote, []byte(compressedDigest.String()), 0600); err3 != nil { + logrus.Debugf("error noting that the compressed version of %q is %q: %v", digester.Digest().String(), compressedDigest.String(), err3) + } + if err3 = ioutils.AtomicWriteFile(compressedFilename+decompressedNote, []byte(digester.Digest().String()), 0600); err3 != nil { + logrus.Debugf("error noting that the decompressed version of %q is %q: %v", compressedDigest.String(), digester.Digest().String(), err3) + } + } + } else { + // Remove the temporary file. + if err3 = os.Remove(tempFile.Name()); err3 != nil { + logrus.Debugf("error cleaning up temporary file %q for decompressed copy of blob %q: %v", tempFile.Name(), compressedDigest.String(), err3) + } + } +} + +func (d *blobCacheDestination) HasThreadSafePutBlob() bool { + return d.destination.HasThreadSafePutBlob() +} + +func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + var tempfile *os.File + var err error + var n int + var alternateDigest digest.Digest + var closer io.Closer + wg := new(sync.WaitGroup) + needToWait := false + compression := archive.Uncompressed + if inputInfo.Digest != "" { + filename := filepath.Join(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) + tempfile, err = os.CreateTemp(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) + if err == nil { + stream = io.TeeReader(stream, tempfile) + defer func() { + if err == nil { + if err = os.Rename(tempfile.Name(), filename); err != nil { + if err2 := os.Remove(tempfile.Name()); err2 != nil { + logrus.Debugf("error cleaning up temporary file %q for blob %q: %v", tempfile.Name(), inputInfo.Digest.String(), err2) + } + err = perrors.Wrapf(err, "error renaming new layer for blob %q into place at %q", inputInfo.Digest.String(), filename) + } + } else { + if err2 := os.Remove(tempfile.Name()); err2 != nil { + logrus.Debugf("error cleaning up temporary file %q for blob %q: %v", tempfile.Name(), inputInfo.Digest.String(), err2) + } + } + tempfile.Close() + }() + } else { + logrus.Debugf("error while creating a temporary file under %q to hold blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err) + } + if !isConfig { + initial := make([]byte, 8) + n, err = stream.Read(initial) + if n > 0 { + // Build a Reader that will still return the bytes that we just + // read, for PutBlob()'s sake. + stream = io.MultiReader(bytes.NewReader(initial[:n]), stream) + if n >= len(initial) { + compression = archive.DetectCompression(initial[:n]) + } + if compression == archive.Gzip { + // The stream is compressed, so create a file which we'll + // use to store a decompressed copy. + decompressedTemp, err2 := os.CreateTemp(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) + if err2 != nil { + logrus.Debugf("error while creating a temporary file under %q to hold decompressed blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err2) + decompressedTemp.Close() + } else { + // Write a copy of the compressed data to a pipe, + // closing the writing end of the pipe after + // PutBlob() returns. + decompressReader, decompressWriter := io.Pipe() + closer = decompressWriter + stream = io.TeeReader(stream, decompressWriter) + // Let saveStream() close the reading end and handle the temporary file. + wg.Add(1) + needToWait = true + go saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, isConfig, &alternateDigest) + } + } + } + } + } + newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, cache, isConfig) + if closer != nil { + closer.Close() + } + if needToWait { + wg.Wait() + } + if err != nil { + return newBlobInfo, perrors.Wrapf(err, "error storing blob to image destination for cache %q", transports.ImageName(d.reference)) + } + if alternateDigest.Validate() == nil { + logrus.Debugf("added blob %q (also %q) to the cache at %q", inputInfo.Digest.String(), alternateDigest.String(), d.reference.directory) + } else { + logrus.Debugf("added blob %q to the cache at %q", inputInfo.Digest.String(), d.reference.directory) + } + return newBlobInfo, nil +} + +func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, cache, canSubstitute) + if err != nil || present { + return present, reusedInfo, err + } + + for _, isConfig := range []bool{false, true} { + filename := filepath.Join(d.reference.directory, makeFilename(info.Digest, isConfig)) + f, err := os.Open(filename) + if err == nil { + defer f.Close() + uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig) + if err != nil { + return false, types.BlobInfo{}, err + } + return true, uploadedInfo, nil + } + } + + return false, types.BlobInfo{}, nil +} + +func (d *blobCacheDestination) PutManifest(ctx context.Context, manifestBytes []byte, instanceDigest *digest.Digest) error { + manifestDigest, err := manifest.Digest(manifestBytes) + if err != nil { + logrus.Warnf("error digesting manifest %q: %v", string(manifestBytes), err) + } else { + filename := filepath.Join(d.reference.directory, makeFilename(manifestDigest, false)) + if err = ioutils.AtomicWriteFile(filename, manifestBytes, 0600); err != nil { + logrus.Warnf("error saving manifest as %q: %v", filename, err) + } + } + return d.destination.PutManifest(ctx, manifestBytes, instanceDigest) +} + +func (d *blobCacheDestination) PutSignatures(ctx context.Context, signatures [][]byte, instanceDigest *digest.Digest) error { + return d.destination.PutSignatures(ctx, signatures, instanceDigest) +} + +func (d *blobCacheDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error { + return d.destination.Commit(ctx, unparsedToplevel) +} From af676b8d31080e29d504c92f6ea7b7300f740af1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 4 Jul 2022 21:25:28 +0200 Subject: [PATCH 3/9] Fix a nil dereference on an error path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miloslav Trmač --- pkg/blobcache/dest.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/blobcache/dest.go b/pkg/blobcache/dest.go index c16c56b367..7c59176f4c 100644 --- a/pkg/blobcache/dest.go +++ b/pkg/blobcache/dest.go @@ -167,7 +167,6 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in decompressedTemp, err2 := os.CreateTemp(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) if err2 != nil { logrus.Debugf("error while creating a temporary file under %q to hold decompressed blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err2) - decompressedTemp.Close() } else { // Write a copy of the compressed data to a pipe, // closing the writing end of the pipe after From 5a7d97f12d3d6088fa7e4115db7d99a45a0306ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Tue, 15 Feb 2022 14:41:27 +0100 Subject: [PATCH 4/9] Move interface conformance assertions to the test suite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ... to make it quite clear they don't have runtime cost. Signed-off-by: Miloslav Trmač --- pkg/blobcache/blobcache.go | 6 ------ pkg/blobcache/blobcache_test.go | 6 ++++++ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/blobcache/blobcache.go b/pkg/blobcache/blobcache.go index 08cd35d6e5..5af283d404 100644 --- a/pkg/blobcache/blobcache.go +++ b/pkg/blobcache/blobcache.go @@ -14,12 +14,6 @@ import ( perrors "github.com/pkg/errors" ) -var ( - _ types.ImageReference = &BlobCache{} - _ types.ImageSource = &blobCacheSource{} - _ types.ImageDestination = &blobCacheDestination{} -) - const ( compressedNote = ".compressed" decompressedNote = ".decompressed" diff --git a/pkg/blobcache/blobcache_test.go b/pkg/blobcache/blobcache_test.go index a171ba19f3..c8a5028d00 100644 --- a/pkg/blobcache/blobcache_test.go +++ b/pkg/blobcache/blobcache_test.go @@ -27,6 +27,12 @@ import ( "github.com/sirupsen/logrus" ) +var ( + _ types.ImageReference = &BlobCache{} + _ types.ImageSource = &blobCacheSource{} + _ types.ImageDestination = &blobCacheDestination{} +) + func TestMain(m *testing.M) { flag.Parse() if testing.Verbose() { From 73577bfcf09463f74cfab68cbd8f28d8a2f89c01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 4 Jul 2022 21:19:23 +0200 Subject: [PATCH 5/9] Name temporary files based on the final file name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ... by cmoputing the relevant components from the final file name instead of copy&pasting the logic. This will make it easier to consolidate the file name logic. Should not change behavior. Signed-off-by: Miloslav Trmač --- pkg/blobcache/dest.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/blobcache/dest.go b/pkg/blobcache/dest.go index 7c59176f4c..723ca54264 100644 --- a/pkg/blobcache/dest.go +++ b/pkg/blobcache/dest.go @@ -130,7 +130,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in compression := archive.Uncompressed if inputInfo.Digest != "" { filename := filepath.Join(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) - tempfile, err = os.CreateTemp(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) + tempfile, err = os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)) if err == nil { stream = io.TeeReader(stream, tempfile) defer func() { @@ -149,7 +149,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in tempfile.Close() }() } else { - logrus.Debugf("error while creating a temporary file under %q to hold blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err) + logrus.Debugf("error while creating a temporary file under %q to hold blob %q: %v", filepath.Dir(filename), inputInfo.Digest.String(), err) } if !isConfig { initial := make([]byte, 8) @@ -164,9 +164,9 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in if compression == archive.Gzip { // The stream is compressed, so create a file which we'll // use to store a decompressed copy. - decompressedTemp, err2 := os.CreateTemp(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) + decompressedTemp, err2 := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)) if err2 != nil { - logrus.Debugf("error while creating a temporary file under %q to hold decompressed blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err2) + logrus.Debugf("error while creating a temporary file under %q to hold decompressed blob %q: %v", filepath.Dir(filename), inputInfo.Digest.String(), err2) } else { // Write a copy of the compressed data to a pipe, // closing the writing end of the pipe after From 951055f36462677c7271b7eebea73b673fb201ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Tue, 15 Feb 2022 15:53:13 +0100 Subject: [PATCH 6/9] Replace makeFilename with BlobCache.blobPath MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miloslav Trmač --- pkg/blobcache/blobcache.go | 18 ++++++++++-------- pkg/blobcache/dest.go | 12 ++++++------ pkg/blobcache/src.go | 9 ++++----- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/pkg/blobcache/blobcache.go b/pkg/blobcache/blobcache.go index 5af283d404..1d8c36c798 100644 --- a/pkg/blobcache/blobcache.go +++ b/pkg/blobcache/blobcache.go @@ -32,13 +32,6 @@ type BlobCache struct { compress types.LayerCompression } -func makeFilename(blobSum digest.Digest, isConfig bool) string { - if isConfig { - return blobSum.String() + ".config" - } - return blobSum.String() -} - // NewBlobCache creates a new blob cache that wraps an image reference. Any blobs which are // written to the destination image created from the resulting reference will also be stored // as-is to the specified directory or a temporary directory. @@ -85,13 +78,22 @@ func (b *BlobCache) DeleteImage(ctx context.Context, sys *types.SystemContext) e return b.reference.DeleteImage(ctx, sys) } +// blobPath returns the path appropriate for storing a blob with digest. +func (b *BlobCache) blobPath(digest digest.Digest, isConfig bool) string { + baseName := digest.String() + if isConfig { + baseName += ".config" + } + return filepath.Join(b.directory, baseName) +} + func (b *BlobCache) HasBlob(blobinfo types.BlobInfo) (bool, int64, error) { if blobinfo.Digest == "" { return false, -1, nil } for _, isConfig := range []bool{false, true} { - filename := filepath.Join(b.directory, makeFilename(blobinfo.Digest, isConfig)) + filename := b.blobPath(blobinfo.Digest, isConfig) fileInfo, err := os.Stat(filename) if err == nil && (blobinfo.Size == -1 || blobinfo.Size == fileInfo.Size()) { return true, fileInfo.Size(), nil diff --git a/pkg/blobcache/dest.go b/pkg/blobcache/dest.go index 723ca54264..b9d7f62274 100644 --- a/pkg/blobcache/dest.go +++ b/pkg/blobcache/dest.go @@ -69,7 +69,7 @@ func (d *blobCacheDestination) IgnoresEmbeddedDockerReference() bool { // file. If we successfully save all of the data, rename the file to match the digest of the data, // and make notes about the relationship between the file that holds a copy of the compressed data // and this new file. -func saveStream(wg *sync.WaitGroup, decompressReader io.ReadCloser, tempFile *os.File, compressedFilename string, compressedDigest digest.Digest, isConfig bool, alternateDigest *digest.Digest) { +func (d *blobCacheDestination) saveStream(wg *sync.WaitGroup, decompressReader io.ReadCloser, tempFile *os.File, compressedFilename string, compressedDigest digest.Digest, isConfig bool, alternateDigest *digest.Digest) { defer wg.Done() // Decompress from and digest the reading end of that pipe. decompressed, err3 := archive.DecompressStream(decompressReader) @@ -88,7 +88,7 @@ func saveStream(wg *sync.WaitGroup, decompressReader io.ReadCloser, tempFile *os decompressed.Close() tempFile.Close() // Determine the name that we should give to the uncompressed copy of the blob. - decompressedFilename := filepath.Join(filepath.Dir(tempFile.Name()), makeFilename(digester.Digest(), isConfig)) + decompressedFilename := d.reference.blobPath(digester.Digest(), isConfig) if err3 == nil { // Rename the temporary file. if err3 = os.Rename(tempFile.Name(), decompressedFilename); err3 != nil { @@ -129,7 +129,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in needToWait := false compression := archive.Uncompressed if inputInfo.Digest != "" { - filename := filepath.Join(d.reference.directory, makeFilename(inputInfo.Digest, isConfig)) + filename := d.reference.blobPath(inputInfo.Digest, isConfig) tempfile, err = os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)) if err == nil { stream = io.TeeReader(stream, tempfile) @@ -177,7 +177,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in // Let saveStream() close the reading end and handle the temporary file. wg.Add(1) needToWait = true - go saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, isConfig, &alternateDigest) + go d.saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, isConfig, &alternateDigest) } } } @@ -208,7 +208,7 @@ func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.Bl } for _, isConfig := range []bool{false, true} { - filename := filepath.Join(d.reference.directory, makeFilename(info.Digest, isConfig)) + filename := d.reference.blobPath(info.Digest, isConfig) f, err := os.Open(filename) if err == nil { defer f.Close() @@ -228,7 +228,7 @@ func (d *blobCacheDestination) PutManifest(ctx context.Context, manifestBytes [] if err != nil { logrus.Warnf("error digesting manifest %q: %v", string(manifestBytes), err) } else { - filename := filepath.Join(d.reference.directory, makeFilename(manifestDigest, false)) + filename := d.reference.blobPath(manifestDigest, false) if err = ioutils.AtomicWriteFile(filename, manifestBytes, 0600); err != nil { logrus.Warnf("error saving manifest as %q: %v", filename, err) } diff --git a/pkg/blobcache/src.go b/pkg/blobcache/src.go index 4fd237956c..d125411ab7 100644 --- a/pkg/blobcache/src.go +++ b/pkg/blobcache/src.go @@ -4,7 +4,6 @@ import ( "context" "io" "os" - "path/filepath" "sync" "github.com/containers/image/v5/internal/image" @@ -49,7 +48,7 @@ func (s *blobCacheSource) Close() error { func (s *blobCacheSource) GetManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) { if instanceDigest != nil { - filename := filepath.Join(s.reference.directory, makeFilename(*instanceDigest, false)) + filename := s.reference.blobPath(*instanceDigest, false) manifestBytes, err := os.ReadFile(filename) if err == nil { s.cacheHits++ @@ -75,7 +74,7 @@ func (s *blobCacheSource) GetBlob(ctx context.Context, blobinfo types.BlobInfo, } if present { for _, isConfig := range []bool{false, true} { - filename := filepath.Join(s.reference.directory, makeFilename(blobinfo.Digest, isConfig)) + filename := s.reference.blobPath(blobinfo.Digest, isConfig) f, err := os.Open(filename) if err == nil { s.mu.Lock() @@ -129,7 +128,7 @@ func (s *blobCacheSource) LayerInfosForCopy(ctx context.Context, instanceDigest for _, info := range infos { var replaceDigest []byte var err error - blobFile := filepath.Join(s.reference.directory, makeFilename(info.Digest, false)) + blobFile := s.reference.blobPath(info.Digest, false) alternate := "" switch s.reference.compress { case types.Compress: @@ -140,7 +139,7 @@ func (s *blobCacheSource) LayerInfosForCopy(ctx context.Context, instanceDigest replaceDigest, err = os.ReadFile(alternate) } if err == nil && digest.Digest(replaceDigest).Validate() == nil { - alternate = filepath.Join(filepath.Dir(alternate), makeFilename(digest.Digest(replaceDigest), false)) + alternate = s.reference.blobPath(digest.Digest(replaceDigest), false) fileInfo, err := os.Stat(alternate) if err == nil { switch info.MediaType { From b3bf595b8410a14b9b4be589f5e9ad23eab413be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Tue, 15 Feb 2022 16:03:47 +0100 Subject: [PATCH 7/9] Simplify blob lookups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HasBlob did the isConfig checking loop, only for every consumer to repeat that loop. Instead, farm that loop out into BlobCache.findBlob which directly returns the found path, so that callers can just use that blob instead of reconstructing HasBlob's work. Signed-off-by: Miloslav Trmač --- pkg/blobcache/blobcache.go | 31 +++++++++++++++++++++++-------- pkg/blobcache/dest.go | 9 ++++++--- pkg/blobcache/src.go | 31 ++++++++++++++----------------- 3 files changed, 43 insertions(+), 28 deletions(-) diff --git a/pkg/blobcache/blobcache.go b/pkg/blobcache/blobcache.go index 1d8c36c798..17a4a7c990 100644 --- a/pkg/blobcache/blobcache.go +++ b/pkg/blobcache/blobcache.go @@ -87,22 +87,37 @@ func (b *BlobCache) blobPath(digest digest.Digest, isConfig bool) string { return filepath.Join(b.directory, baseName) } -func (b *BlobCache) HasBlob(blobinfo types.BlobInfo) (bool, int64, error) { - if blobinfo.Digest == "" { - return false, -1, nil +// findBlob checks if we have a blob for info in cache (whether a config or not) +// and if so, returns it path and size, and whether it was stored as a config. +// It returns ("", -1, nil) if the blob is not +func (b *BlobCache) findBlob(info types.BlobInfo) (string, int64, bool, error) { + if info.Digest == "" { + return "", -1, false, nil } for _, isConfig := range []bool{false, true} { - filename := b.blobPath(blobinfo.Digest, isConfig) - fileInfo, err := os.Stat(filename) - if err == nil && (blobinfo.Size == -1 || blobinfo.Size == fileInfo.Size()) { - return true, fileInfo.Size(), nil + path := b.blobPath(info.Digest, isConfig) + fileInfo, err := os.Stat(path) + if err == nil && (info.Size == -1 || info.Size == fileInfo.Size()) { + return path, fileInfo.Size(), isConfig, nil } if !os.IsNotExist(err) { - return false, -1, perrors.Wrap(err, "checking size") + return "", -1, false, perrors.Wrap(err, "checking size") } } + return "", -1, false, nil + +} + +func (b *BlobCache) HasBlob(blobinfo types.BlobInfo) (bool, int64, error) { + path, size, _, err := b.findBlob(blobinfo) + if err != nil { + return false, -1, err + } + if path != "" { + return true, size, nil + } return false, -1, nil } diff --git a/pkg/blobcache/dest.go b/pkg/blobcache/dest.go index b9d7f62274..68e2b6652a 100644 --- a/pkg/blobcache/dest.go +++ b/pkg/blobcache/dest.go @@ -207,9 +207,12 @@ func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.Bl return present, reusedInfo, err } - for _, isConfig := range []bool{false, true} { - filename := d.reference.blobPath(info.Digest, isConfig) - f, err := os.Open(filename) + blobPath, _, isConfig, err := d.reference.findBlob(info) + if err != nil { + return false, types.BlobInfo{}, err + } + if blobPath != "" { + f, err := os.Open(blobPath) if err == nil { defer f.Close() uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig) diff --git a/pkg/blobcache/src.go b/pkg/blobcache/src.go index d125411ab7..9ff44eba42 100644 --- a/pkg/blobcache/src.go +++ b/pkg/blobcache/src.go @@ -68,26 +68,23 @@ func (s *blobCacheSource) HasThreadSafeGetBlob() bool { } func (s *blobCacheSource) GetBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) { - present, size, err := s.reference.HasBlob(blobinfo) + blobPath, size, _, err := s.reference.findBlob(blobinfo) if err != nil { return nil, -1, err } - if present { - for _, isConfig := range []bool{false, true} { - filename := s.reference.blobPath(blobinfo.Digest, isConfig) - f, err := os.Open(filename) - if err == nil { - s.mu.Lock() - s.cacheHits++ - s.mu.Unlock() - return f, size, nil - } - if !os.IsNotExist(err) { - s.mu.Lock() - s.cacheErrors++ - s.mu.Unlock() - return nil, -1, perrors.Wrap(err, "checking for cache") - } + if blobPath != "" { + f, err := os.Open(blobPath) + if err == nil { + s.mu.Lock() + s.cacheHits++ + s.mu.Unlock() + return f, size, nil + } + if !os.IsNotExist(err) { + s.mu.Lock() + s.cacheErrors++ + s.mu.Unlock() + return nil, -1, perrors.Wrap(err, "checking for cache") } } s.mu.Lock() From f7703dd8699e74df166514b03f3bc3ded3567df1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Tue, 15 Feb 2022 15:13:36 +0100 Subject: [PATCH 8/9] Forward all of private.ImageSource by blobCacheSource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Also add support for reading blob chunks from a (complete, not partial) cached blob. Signed-off-by: Miloslav Trmač --- pkg/blobcache/blobcache_test.go | 2 + pkg/blobcache/src.go | 89 ++++++++++++++++++++++++++++++++- pkg/blobcache/src_test.go | 57 +++++++++++++++++++++ 3 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 pkg/blobcache/src_test.go diff --git a/pkg/blobcache/blobcache_test.go b/pkg/blobcache/blobcache_test.go index c8a5028d00..63eb3b53e5 100644 --- a/pkg/blobcache/blobcache_test.go +++ b/pkg/blobcache/blobcache_test.go @@ -17,6 +17,7 @@ import ( cp "github.com/containers/image/v5/copy" "github.com/containers/image/v5/directory" "github.com/containers/image/v5/internal/image" + "github.com/containers/image/v5/internal/private" "github.com/containers/image/v5/pkg/blobinfocache/none" "github.com/containers/image/v5/signature" "github.com/containers/image/v5/types" @@ -30,6 +31,7 @@ import ( var ( _ types.ImageReference = &BlobCache{} _ types.ImageSource = &blobCacheSource{} + _ private.ImageSource = (*blobCacheSource)(nil) _ types.ImageDestination = &blobCacheDestination{} ) diff --git a/pkg/blobcache/src.go b/pkg/blobcache/src.go index 9ff44eba42..c2983e6a5c 100644 --- a/pkg/blobcache/src.go +++ b/pkg/blobcache/src.go @@ -2,11 +2,14 @@ package blobcache import ( "context" + "fmt" "io" "os" "sync" "github.com/containers/image/v5/internal/image" + "github.com/containers/image/v5/internal/imagesource" + "github.com/containers/image/v5/internal/private" "github.com/containers/image/v5/manifest" "github.com/containers/image/v5/pkg/compression" "github.com/containers/image/v5/transports" @@ -19,7 +22,7 @@ import ( type blobCacheSource struct { reference *BlobCache - source types.ImageSource + source private.ImageSource sys types.SystemContext // this mutex synchronizes the counters below mu sync.Mutex @@ -34,7 +37,7 @@ func (b *BlobCache) NewImageSource(ctx context.Context, sys *types.SystemContext return nil, perrors.Wrapf(err, "error creating new image source %q", transports.ImageName(b.reference)) } logrus.Debugf("starting to read from image %q using blob cache in %q (compression=%v)", transports.ImageName(b.reference), b.directory, b.compress) - return &blobCacheSource{reference: b, source: src, sys: *sys}, nil + return &blobCacheSource{reference: b, source: imagesource.FromPublic(src), sys: *sys}, nil } func (s *blobCacheSource) Reference() types.ImageReference { @@ -174,3 +177,85 @@ func (s *blobCacheSource) LayerInfosForCopy(ctx context.Context, instanceDigest return infos, nil } + +// SupportsGetBlobAt() returns true if GetBlobAt (BlobChunkAccessor) is supported. +func (s *blobCacheSource) SupportsGetBlobAt() bool { + return s.source.SupportsGetBlobAt() +} + +// streamChunksFromFile generates the channels returned by GetBlobAt for chunks of seekable file +func streamChunksFromFile(streams chan io.ReadCloser, errs chan error, file io.ReadSeekCloser, + chunks []private.ImageSourceChunk) { + defer close(streams) + defer close(errs) + defer file.Close() + + for _, c := range chunks { + // Always seek to the desired offest; that way we don’t need to care about the consumer + // not reading all of the chunk, or about the position going backwards. + if _, err := file.Seek(int64(c.Offset), io.SeekStart); err != nil { + errs <- err + break + } + s := signalCloseReader{ + closed: make(chan interface{}), + stream: io.LimitReader(file, int64(c.Length)), + } + streams <- s + + // Wait until the stream is closed before going to the next chunk + <-s.closed + } +} + +type signalCloseReader struct { + closed chan interface{} + stream io.Reader +} + +func (s signalCloseReader) Read(p []byte) (int, error) { + return s.stream.Read(p) +} + +func (s signalCloseReader) Close() error { + close(s.closed) + return nil +} + +// GetBlobAt returns a sequential channel of readers that contain data for the requested +// blob chunks, and a channel that might get a single error value. +// The specified chunks must be not overlapping and sorted by their offset. +// The readers must be fully consumed, in the order they are returned, before blocking +// to read the next chunk. +func (s *blobCacheSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { + blobPath, _, _, err := s.reference.findBlob(info) + if err != nil { + return nil, nil, err + } + if blobPath != "" { + f, err := os.Open(blobPath) + if err == nil { + s.mu.Lock() + s.cacheHits++ + s.mu.Unlock() + streams := make(chan io.ReadCloser) + errs := make(chan error) + go streamChunksFromFile(streams, errs, f, chunks) + return streams, errs, nil + } + if !os.IsNotExist(err) { + s.mu.Lock() + s.cacheErrors++ + s.mu.Unlock() + return nil, nil, fmt.Errorf("checking for cache: %w", err) + } + } + s.mu.Lock() + s.cacheMisses++ + s.mu.Unlock() + streams, errs, err := s.source.GetBlobAt(ctx, info, chunks) + if err != nil { + return streams, errs, fmt.Errorf("error reading blob chunks from source image %q: %w", transports.ImageName(s.reference), err) + } + return streams, errs, nil +} diff --git a/pkg/blobcache/src_test.go b/pkg/blobcache/src_test.go new file mode 100644 index 0000000000..542838b36c --- /dev/null +++ b/pkg/blobcache/src_test.go @@ -0,0 +1,57 @@ +package blobcache + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + + "github.com/containers/image/v5/internal/private" + "github.com/stretchr/testify/assert" +) + +func readNextStream(streams chan io.ReadCloser, errs chan error) ([]byte, error) { + select { + case r := <-streams: + if r == nil { + return nil, nil + } + defer r.Close() + return ioutil.ReadAll(r) + case err := <-errs: + return nil, err + } +} + +// readSeekerNopCloser adds a no-op Close() method to a readSeeker +type readSeekerNopCloser struct { + io.ReadSeeker +} + +func (c *readSeekerNopCloser) Close() error { + return nil +} + +func TestStreamChunksFromFile(t *testing.T) { + file := &readSeekerNopCloser{bytes.NewReader([]byte("123456789"))} + streams := make(chan io.ReadCloser) + errs := make(chan error) + chunks := []private.ImageSourceChunk{ + {Offset: 1, Length: 2}, + {Offset: 4, Length: 1}, + } + go streamChunksFromFile(streams, errs, file, chunks) + + for _, c := range []struct { + expectedData []byte + expectedError error + }{ + {[]byte("23"), nil}, + {[]byte("5"), nil}, + {[]byte(nil), nil}, + } { + data, err := readNextStream(streams, errs) + assert.Equal(t, c.expectedData, data) + assert.Equal(t, c.expectedError, err) + } +} From 305e6df6eacc929560b5eae86b98a67cd6ebb8ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Tue, 15 Feb 2022 15:27:32 +0100 Subject: [PATCH 9/9] Implement private.ImageDestination in blobCacheDestination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miloslav Trmač --- pkg/blobcache/blobcache_test.go | 9 ++--- pkg/blobcache/dest.go | 61 +++++++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/pkg/blobcache/blobcache_test.go b/pkg/blobcache/blobcache_test.go index 63eb3b53e5..7f9d0f9ba0 100644 --- a/pkg/blobcache/blobcache_test.go +++ b/pkg/blobcache/blobcache_test.go @@ -29,10 +29,11 @@ import ( ) var ( - _ types.ImageReference = &BlobCache{} - _ types.ImageSource = &blobCacheSource{} - _ private.ImageSource = (*blobCacheSource)(nil) - _ types.ImageDestination = &blobCacheDestination{} + _ types.ImageReference = &BlobCache{} + _ types.ImageSource = &blobCacheSource{} + _ private.ImageSource = (*blobCacheSource)(nil) + _ types.ImageDestination = &blobCacheDestination{} + _ private.ImageDestination = (*blobCacheDestination)(nil) ) func TestMain(m *testing.M) { diff --git a/pkg/blobcache/dest.go b/pkg/blobcache/dest.go index 68e2b6652a..8bae8a24b9 100644 --- a/pkg/blobcache/dest.go +++ b/pkg/blobcache/dest.go @@ -8,6 +8,10 @@ import ( "path/filepath" "sync" + "github.com/containers/image/v5/internal/blobinfocache" + "github.com/containers/image/v5/internal/imagedestination" + "github.com/containers/image/v5/internal/imagedestination/impl" + "github.com/containers/image/v5/internal/private" "github.com/containers/image/v5/manifest" "github.com/containers/image/v5/transports" "github.com/containers/image/v5/types" @@ -19,8 +23,10 @@ import ( ) type blobCacheDestination struct { + impl.Compat + reference *BlobCache - destination types.ImageDestination + destination private.ImageDestination } func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemContext) (types.ImageDestination, error) { @@ -29,7 +35,9 @@ func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemCo return nil, perrors.Wrapf(err, "error creating new image destination %q", transports.ImageName(b.reference)) } logrus.Debugf("starting to write to image %q using blob cache in %q", transports.ImageName(b.reference), b.directory) - return &blobCacheDestination{reference: b, destination: dest}, nil + d := &blobCacheDestination{reference: b, destination: imagedestination.FromPublic(dest)} + d.Compat = impl.AddCompat(d) + return d, nil } func (d *blobCacheDestination) Reference() types.ImageReference { @@ -119,7 +127,14 @@ func (d *blobCacheDestination) HasThreadSafePutBlob() bool { return d.destination.HasThreadSafePutBlob() } -func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +// PutBlobWithOptions writes contents of stream and returns data representing the result. +// inputInfo.Digest can be optionally provided if known; if provided, and stream is read to the end without error, the digest MUST match the stream contents. +// inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. +// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available +// to any other readers for download using the supplied digest. +// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. +func (d *blobCacheDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) { var tempfile *os.File var err error var n int @@ -129,7 +144,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in needToWait := false compression := archive.Uncompressed if inputInfo.Digest != "" { - filename := d.reference.blobPath(inputInfo.Digest, isConfig) + filename := d.reference.blobPath(inputInfo.Digest, options.IsConfig) tempfile, err = os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)) if err == nil { stream = io.TeeReader(stream, tempfile) @@ -151,7 +166,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in } else { logrus.Debugf("error while creating a temporary file under %q to hold blob %q: %v", filepath.Dir(filename), inputInfo.Digest.String(), err) } - if !isConfig { + if !options.IsConfig { initial := make([]byte, 8) n, err = stream.Read(initial) if n > 0 { @@ -177,13 +192,13 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in // Let saveStream() close the reading end and handle the temporary file. wg.Add(1) needToWait = true - go d.saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, isConfig, &alternateDigest) + go d.saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, options.IsConfig, &alternateDigest) } } } } } - newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, cache, isConfig) + newBlobInfo, err := d.destination.PutBlobWithOptions(ctx, stream, inputInfo, options) if closer != nil { closer.Close() } @@ -201,8 +216,29 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in return newBlobInfo, nil } -func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, cache, canSubstitute) +// SupportsPutBlobPartial returns true if PutBlobPartial is supported. +func (d *blobCacheDestination) SupportsPutBlobPartial() bool { + return d.destination.SupportsPutBlobPartial() +} + +// PutBlobPartial attempts to create a blob using the data that is already present +// at the destination. chunkAccessor is accessed in a non-sequential way to retrieve the missing chunks. +// It is available only if SupportsPutBlobPartial(). +// Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller +// should fall back to PutBlobWithOptions. +func (d *blobCacheDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (types.BlobInfo, error) { + return d.destination.PutBlobPartial(ctx, chunkAccessor, srcInfo, cache) +} + +// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may +// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be +// reflected in the manifest that will be written. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +func (d *blobCacheDestination) TryReusingBlobWithOptions(ctx context.Context, info types.BlobInfo, options private.TryReusingBlobOptions) (bool, types.BlobInfo, error) { + present, reusedInfo, err := d.destination.TryReusingBlobWithOptions(ctx, info, options) if err != nil || present { return present, reusedInfo, err } @@ -215,7 +251,12 @@ func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.Bl f, err := os.Open(blobPath) if err == nil { defer f.Close() - uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig) + uploadedInfo, err := d.destination.PutBlobWithOptions(ctx, f, info, private.PutBlobOptions{ + Cache: options.Cache, + IsConfig: isConfig, + EmptyLayer: options.EmptyLayer, + LayerIndex: options.LayerIndex, + }) if err != nil { return false, types.BlobInfo{}, err }