Skip to content

Commit

Permalink
Merge pull request #2321 from mtrmac/chunked-bic
Browse files Browse the repository at this point in the history
Record (TOC digest → DiffID) mapping in BlobInfoCache
  • Loading branch information
rhatdan authored Jul 30, 2024
2 parents b089d55 + f49cb62 commit 0b130b8
Show file tree
Hide file tree
Showing 11 changed files with 488 additions and 134 deletions.
10 changes: 10 additions & 0 deletions copy/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/containers/image/v5/pkg/compression"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/types"
chunkedToc "github.com/containers/storage/pkg/chunked/toc"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -308,6 +309,15 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
// No useful information
case bpcOpCompressUncompressed:
c.blobInfoCache.RecordDigestUncompressedPair(uploadedInfo.Digest, srcInfo.Digest)
if d.uploadedAnnotations != nil {
tocDigest, err := chunkedToc.GetTOCDigest(d.uploadedAnnotations)
if err != nil {
return fmt.Errorf("parsing just-created compression annotations: %w", err)
}
if tocDigest != nil {
c.blobInfoCache.RecordTOCUncompressedPair(*tocDigest, srcInfo.Digest)
}
}
case bpcOpDecompressCompressed:
c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, uploadedInfo.Digest)
case bpcOpRecompressCompressed, bpcOpPreserveCompressed:
Expand Down
22 changes: 22 additions & 0 deletions copy/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,28 @@ func (c *copier) copySingleImage(ctx context.Context, unparsedImage *image.Unpar
ic.compressionFormat = c.options.DestinationCtx.CompressionFormat
ic.compressionLevel = c.options.DestinationCtx.CompressionLevel
}
// HACK: Don’t combine zstd:chunked and encryption.
// zstd:chunked can only usefully be consumed using range requests of parts of the layer, which would require the encryption
// to support decrypting arbitrary subsets of the stream. That’s plausible but not supported using the encryption API we have.
// Also, the chunked metadata is exposed in annotations unencrypted, which reveals the TOC digest = layer identity without
// encryption. (That can be determined from the unencrypted config anyway, but, still...)
//
// Ideally this should query a well-defined property of the compression algorithm (and $somehow determine the right fallback) instead of
// hard-coding zstd:chunked / zstd.
if ic.c.options.OciEncryptLayers != nil {
format := ic.compressionFormat
if format == nil {
format = defaultCompressionFormat
}
if format.Name() == compression.ZstdChunked.Name() {
if ic.requireCompressionFormatMatch {
return copySingleImageResult{}, errors.New("explicitly requested to combine zstd:chunked with encryption, which is not beneficial; use plain zstd instead")
}
logrus.Warnf("Compression using zstd:chunked is not beneficial for encrypted layers, using plain zstd instead")
ic.compressionFormat = &compression.Zstd
}
}

// Decide whether we can substitute blobs with semantic equivalents:
// - Don’t do that if we can’t modify the manifest at all
// - Ensure _this_ copy sees exactly the intended data when either processing a signed image or signing it.
Expand Down
7 changes: 7 additions & 0 deletions internal/blobinfocache/blobinfocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ func (bic *v1OnlyBlobInfoCache) Open() {
func (bic *v1OnlyBlobInfoCache) Close() {
}

func (bic *v1OnlyBlobInfoCache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
return ""
}

func (bic *v1OnlyBlobInfoCache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
}

func (bic *v1OnlyBlobInfoCache) RecordDigestCompressorName(anyDigest digest.Digest, compressorName string) {
}

Expand Down
9 changes: 9 additions & 0 deletions internal/blobinfocache/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ type BlobInfoCache2 interface {
// Close destroys state created by Open().
Close()

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest
// RecordTOCUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest)

// RecordDigestCompressorName records a compressor for the blob with the specified digest,
// or Uncompressed or UnknownCompression.
// WARNING: Only call this with LOCALLY VERIFIED data; don’t record a compressor for a
Expand Down
52 changes: 52 additions & 0 deletions pkg/blobinfocache/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (

// uncompressedDigestBucket stores a mapping from any digest to an uncompressed digest.
uncompressedDigestBucket = []byte("uncompressedDigest")
// uncompressedDigestByTOCBucket stores a mapping from a TOC digest to an uncompressed digest.
uncompressedDigestByTOCBucket = []byte("uncompressedDigestByTOC")
// digestCompressorBucket stores a mapping from any digest to a compressor, or blobinfocache.Uncompressed (not blobinfocache.UnknownCompression).
// It may not exist in caches created by older versions, even if uncompressedDigestBucket is present.
digestCompressorBucket = []byte("digestCompressor")
Expand Down Expand Up @@ -243,6 +245,56 @@ func (bdc *cache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompre
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
func (bdc *cache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
var res digest.Digest
if err := bdc.view(func(tx *bolt.Tx) error {
if b := tx.Bucket(uncompressedDigestByTOCBucket); b != nil {
if uncompressedBytes := b.Get([]byte(tocDigest.String())); uncompressedBytes != nil {
d, err := digest.Parse(string(uncompressedBytes))
if err == nil {
res = d
return nil
}
// FIXME? Log err (but throttle the log volume on repeated accesses)?
}
}
res = ""
return nil
}); err != nil { // Including os.IsNotExist(err)
return "" // FIXME? Log err (but throttle the log volume on repeated accesses)?
}
return res
}

// RecordTOCUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (bdc *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
_ = bdc.update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(uncompressedDigestByTOCBucket)
if err != nil {
return err
}
key := []byte(tocDigest.String())
if previousBytes := b.Get(key); previousBytes != nil {
previous, err := digest.Parse(string(previousBytes))
if err != nil {
return err
}
if previous != uncompressed {
logrus.Warnf("Uncompressed digest for blob with TOC %q previously recorded as %q, now %q", tocDigest, previous, uncompressed)
}
}
if err := b.Put(key, []byte(uncompressed.String())); err != nil {
return err
}
return nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// RecordDigestCompressorName records that the blob with digest anyDigest was compressed with the specified
// compressor, or is blobinfocache.Uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
Expand Down
24 changes: 24 additions & 0 deletions pkg/blobinfocache/internal/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func GenericCache(t *testing.T, newTestCache func(t *testing.T) blobinfocache.Bl
}{
{"UncompressedDigest", testGenericUncompressedDigest},
{"RecordDigestUncompressedPair", testGenericRecordDigestUncompressedPair},
{"UncompressedDigestForTOC", testGenericUncompressedDigestForTOC},
{"RecordTOCUncompressedPair", testGenericRecordTOCUncompressedPair},
{"RecordKnownLocations", testGenericRecordKnownLocations},
{"CandidateLocations", testGenericCandidateLocations},
{"CandidateLocations2", testGenericCandidateLocations2},
Expand Down Expand Up @@ -99,6 +101,28 @@ func testGenericRecordDigestUncompressedPair(t *testing.T, cache blobinfocache.B
}
}

func testGenericUncompressedDigestForTOC(t *testing.T, cache blobinfocache.BlobInfoCache2) {
// Nothing is known.
assert.Equal(t, digest.Digest(""), cache.UncompressedDigestForTOC(digestUnknown))

cache.RecordTOCUncompressedPair(digestCompressedA, digestUncompressed)
cache.RecordTOCUncompressedPair(digestCompressedB, digestUncompressed)
// Known TOC→uncompressed mapping
assert.Equal(t, digestUncompressed, cache.UncompressedDigestForTOC(digestCompressedA))
assert.Equal(t, digestUncompressed, cache.UncompressedDigestForTOC(digestCompressedB))
}

func testGenericRecordTOCUncompressedPair(t *testing.T, cache blobinfocache.BlobInfoCache2) {
for i := 0; i < 2; i++ { // Record the same data twice to ensure redundant writes don’t break things.
// Known TOC→uncompressed mapping
cache.RecordTOCUncompressedPair(digestCompressedA, digestUncompressed)
assert.Equal(t, digestUncompressed, cache.UncompressedDigestForTOC(digestCompressedA))
// Two mappings to the same uncompressed digest
cache.RecordTOCUncompressedPair(digestCompressedB, digestUncompressed)
assert.Equal(t, digestUncompressed, cache.UncompressedDigestForTOC(digestCompressedB))
}
}

func testGenericRecordKnownLocations(t *testing.T, cache blobinfocache.BlobInfoCache2) {
transport := mocks.NameImageTransport("==BlobInfocache transport mock")
for i := 0; i < 2; i++ { // Record the same data twice to ensure redundant writes don’t break things.
Expand Down
42 changes: 34 additions & 8 deletions pkg/blobinfocache/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type locationKey struct {
type cache struct {
mutex sync.Mutex
// The following fields can only be accessed with mutex held.
uncompressedDigests map[digest.Digest]digest.Digest
digestsByUncompressed map[digest.Digest]*set.Set[digest.Digest] // stores a set of digests for each uncompressed digest
knownLocations map[locationKey]map[types.BICLocationReference]time.Time // stores last known existence time for each location reference
compressors map[digest.Digest]string // stores a compressor name, or blobinfocache.Uncompressed (not blobinfocache.UnknownCompression), for each digest
uncompressedDigests map[digest.Digest]digest.Digest
uncompressedDigestsByTOC map[digest.Digest]digest.Digest
digestsByUncompressed map[digest.Digest]*set.Set[digest.Digest] // stores a set of digests for each uncompressed digest
knownLocations map[locationKey]map[types.BICLocationReference]time.Time // stores last known existence time for each location reference
compressors map[digest.Digest]string // stores a compressor name, or blobinfocache.Uncompressed (not blobinfocache.UnknownCompression), for each digest
}

// New returns a BlobInfoCache implementation which is in-memory only.
Expand All @@ -44,10 +45,11 @@ func New() types.BlobInfoCache {

func new2() *cache {
return &cache{
uncompressedDigests: map[digest.Digest]digest.Digest{},
digestsByUncompressed: map[digest.Digest]*set.Set[digest.Digest]{},
knownLocations: map[locationKey]map[types.BICLocationReference]time.Time{},
compressors: map[digest.Digest]string{},
uncompressedDigests: map[digest.Digest]digest.Digest{},
uncompressedDigestsByTOC: map[digest.Digest]digest.Digest{},
digestsByUncompressed: map[digest.Digest]*set.Set[digest.Digest]{},
knownLocations: map[locationKey]map[types.BICLocationReference]time.Time{},
compressors: map[digest.Digest]string{},
}
}

Expand Down Expand Up @@ -104,6 +106,30 @@ func (mem *cache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompre
anyDigestSet.Add(anyDigest)
}

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
func (mem *cache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
mem.mutex.Lock()
defer mem.mutex.Unlock()
if d, ok := mem.uncompressedDigestsByTOC[tocDigest]; ok {
return d
}
return ""
}

// RecordTOCUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (mem *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
mem.mutex.Lock()
defer mem.mutex.Unlock()
if previous, ok := mem.uncompressedDigestsByTOC[tocDigest]; ok && previous != uncompressed {
logrus.Warnf("Uncompressed digest for blob with TOC %q previously recorded as %q, now %q", tocDigest, previous, uncompressed)
}
mem.uncompressedDigestsByTOC[tocDigest] = uncompressed
}

// RecordKnownLocation records that a blob with the specified digest exists within the specified (transport, scope) scope,
// and can be reused given the opaque location data.
func (mem *cache) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, blobDigest digest.Digest, location types.BICLocationReference) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/blobinfocache/none/none.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ func (noCache) UncompressedDigest(anyDigest digest.Digest) digest.Digest {
func (noCache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompressed digest.Digest) {
}

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
func (noCache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
return ""
}

// RecordTOCUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (noCache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
}

// RecordKnownLocation records that a blob with the specified digest exists within the specified (transport, scope) scope,
// and can be reused given the opaque location data.
func (noCache) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, blobDigest digest.Digest, location types.BICLocationReference) {
Expand Down
59 changes: 59 additions & 0 deletions pkg/blobinfocache/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ func ensureDBHasCurrentSchema(db *sql.DB) error {
`PRIMARY KEY (transport, scope, digest, location)
)`,
},
{
"DigestTOCUncompressedPairs",
`CREATE TABLE IF NOT EXISTS DigestTOCUncompressedPairs(` +
// index implied by PRIMARY KEY
`tocDigest TEXT PRIMARY KEY NOT NULL,` +
`uncompressedDigest TEXT NOT NULL
)`,
},
}

_, err := dbTransaction(db, func(tx *sql.Tx) (void, error) {
Expand Down Expand Up @@ -385,6 +393,57 @@ func (sqc *cache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompre
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
func (sqc *cache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
res, err := transaction(sqc, func(tx *sql.Tx) (digest.Digest, error) {
uncompressedString, found, err := querySingleValue[string](tx, "SELECT uncompressedDigest FROM DigestTOCUncompressedPairs WHERE tocDigest = ?", tocDigest.String())
if err != nil {
return "", err
}
if found {
d, err := digest.Parse(uncompressedString)
if err != nil {
return "", err
}
return d, nil

}
return "", nil
})
if err != nil {
return "" // FIXME? Log err (but throttle the log volume on repeated accesses)?
}
return res
}

// RecordTOCUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (sqc *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
_, _ = transaction(sqc, func(tx *sql.Tx) (void, error) {
previousString, gotPrevious, err := querySingleValue[string](tx, "SELECT uncompressedDigest FROM DigestTOCUncompressedPairs WHERE tocDigest = ?", tocDigest.String())
if err != nil {
return void{}, fmt.Errorf("looking for uncompressed digest for blob with TOC %q", tocDigest)
}
if gotPrevious {
previous, err := digest.Parse(previousString)
if err != nil {
return void{}, err
}
if previous != uncompressed {
logrus.Warnf("Uncompressed digest for blob with TOC %q previously recorded as %q, now %q", tocDigest, previous, uncompressed)
}
}
if _, err := tx.Exec("INSERT OR REPLACE INTO DigestTOCUncompressedPairs(tocDigest, uncompressedDigest) VALUES (?, ?)",
tocDigest.String(), uncompressed.String()); err != nil {
return void{}, fmt.Errorf("recording uncompressed digest %q for blob with TOC %q: %w", uncompressed, tocDigest, err)
}
return void{}, nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// RecordKnownLocation records that a blob with the specified digest exists within the specified (transport, scope) scope,
// and can be reused given the opaque location data.
func (sqc *cache) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, location types.BICLocationReference) {
Expand Down
Loading

0 comments on commit 0b130b8

Please sign in to comment.