Skip to content

Commit

Permalink
Workaround for container registry push/pull errors (go-gitea#21862)
Browse files Browse the repository at this point in the history
This PR addresses go-gitea#19586

I added a mutex to the upload version creation which will prevent the
push errors when two requests try to create these database entries. I'm
not sure if this should be the final solution for this problem.

I added a workaround to allow a reupload of missing blobs. Normally a
reupload is skipped because the database knows the blob is already
present. The workaround checks if the blob exists on the file system.
This should not be needed anymore with the above fix so I marked this
code to be removed with Gitea v1.20.

Co-authored-by: Lunny Xiao <[email protected]>
  • Loading branch information
KN4CK3R and lunny committed Dec 8, 2022
1 parent e93a4a0 commit ad0ee26
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 4 deletions.
28 changes: 28 additions & 0 deletions integrations/api_packages_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package integrations

import (
"bytes"
"crypto/sha256"
"encoding/base64"
"fmt"
"net/http"
"strings"
"sync"
"testing"

"code.gitea.io/gitea/models/db"
Expand Down Expand Up @@ -549,6 +551,32 @@ func TestPackageContainer(t *testing.T) {
})
}

// https://github.com/go-gitea/gitea/issues/19586
t.Run("ParallelUpload", func(t *testing.T) {
defer PrintCurrentTest(t)()

url := fmt.Sprintf("%sv2/%s/parallel", setting.AppURL, user.Name)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)

content := []byte{byte(i)}
digest := fmt.Sprintf("sha256:%x", sha256.Sum256(content))

go func() {
defer wg.Done()

req := NewRequestWithBody(t, "POST", fmt.Sprintf("%s/blobs/uploads?digest=%s", url, digest), bytes.NewReader(content))
addTokenAuthHeader(req, userToken)
resp := MakeRequest(t, req, http.StatusCreated)

assert.Equal(t, digest, resp.Header().Get("Docker-Content-Digest"))
}()
}
wg.Wait()
})

t.Run("OwnerNameChange", func(t *testing.T) {
defer PrintCurrentTest(t)()

Expand Down
7 changes: 7 additions & 0 deletions modules/packages/content_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func (s *ContentStore) Get(key BlobHash256Key) (storage.Object, error) {
return s.store.Open(KeyToRelativePath(key))
}

// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
func (s *ContentStore) Has(key BlobHash256Key) error {
_, err := s.store.Stat(KeyToRelativePath(key))
return err
}

// Save stores a package blob
func (s *ContentStore) Save(key BlobHash256Key, r io.Reader, size int64) error {
_, err := s.store.Save(KeyToRelativePath(key), r, size)
Expand Down
31 changes: 30 additions & 1 deletion routers/api/packages/container/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ package container
import (
"context"
"encoding/hex"
"errors"
"fmt"
"os"
"strings"
"sync"

"code.gitea.io/gitea/models/db"
packages_model "code.gitea.io/gitea/models/packages"
Expand All @@ -19,6 +22,8 @@ import (
packages_service "code.gitea.io/gitea/services/packages"
)

var uploadVersionMutex sync.Mutex

// saveAsPackageBlob creates a package blob from an upload
// The uploaded blob gets stored in a special upload version to link them to the package/image
func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_service.PackageInfo) (*packages_model.PackageBlob, error) {
Expand All @@ -28,6 +33,11 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic

contentStore := packages_module.NewContentStore()

var uploadVersion *packages_model.PackageVersion

// FIXME: Replace usage of mutex with database transaction
// https://github.com/go-gitea/gitea/pull/21862
uploadVersionMutex.Lock()
err := db.WithTx(func(ctx context.Context) error {
created := true
p := &packages_model.Package{
Expand Down Expand Up @@ -68,11 +78,30 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
}
}

uploadVersion = pv

return nil
})
uploadVersionMutex.Unlock()
if err != nil {
return nil, err
}

err = db.WithTx(func(ctx context.Context) error {
pb, exists, err = packages_model.GetOrInsertBlob(ctx, pb)
if err != nil {
log.Error("Error inserting package blob: %v", err)
return err
}
// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
if exists {
err = contentStore.Has(packages_module.BlobHash256Key(pb.HashSHA256))
if err != nil && errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256)
exists = false
}
}
if !exists {
if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), hsr, hsr.Size()); err != nil {
log.Error("Error saving package blob in content store: %v", err)
Expand All @@ -83,7 +112,7 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
filename := strings.ToLower(fmt.Sprintf("sha256_%s", pb.HashSHA256))

pf := &packages_model.PackageFile{
VersionID: pv.ID,
VersionID: uploadVersion.ID,
BlobID: pb.ID,
Name: filename,
LowerName: filename,
Expand Down
27 changes: 24 additions & 3 deletions routers/api/packages/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -193,7 +194,7 @@ func InitiateUploadBlob(ctx *context.Context) {
mount := ctx.FormTrim("mount")
from := ctx.FormTrim("from")
if mount != "" {
blob, _ := container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{
blob, _ := workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{
Image: from,
Digest: mount,
})
Expand Down Expand Up @@ -361,7 +362,7 @@ func getBlobFromContext(ctx *context.Context) (*packages_model.PackageFileDescri
return nil, container_model.ErrContainerBlobNotExist
}

return container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{
return workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{
OwnerID: ctx.Package.Owner.ID,
Image: ctx.Params("image"),
Digest: digest,
Expand Down Expand Up @@ -503,7 +504,7 @@ func getManifestFromContext(ctx *context.Context) (*packages_model.PackageFileDe
return nil, container_model.ErrContainerBlobNotExist
}

return container_model.GetContainerBlob(ctx, opts)
return workaroundGetContainerBlob(ctx, opts)
}

// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#checking-if-content-exists-in-the-registry
Expand Down Expand Up @@ -643,3 +644,23 @@ func GetTagList(ctx *context.Context) {
Tags: tags,
})
}

// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
func workaroundGetContainerBlob(ctx *context.Context, opts *container_model.BlobSearchOptions) (*packages_model.PackageFileDescriptor, error) {
blob, err := container_model.GetContainerBlob(ctx, opts)
if err != nil {
return nil, err
}

err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(blob.Blob.HashSHA256))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", blob.Blob.HashSHA256)
return nil, container_model.ErrContainerBlobNotExist
}
return nil, err
}

return blob, nil
}
11 changes: 11 additions & 0 deletions routers/api/packages/container/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package container

import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"

"code.gitea.io/gitea/models/db"
Expand Down Expand Up @@ -403,6 +405,15 @@ func createManifestBlob(ctx context.Context, mci *manifestCreationInfo, pv *pack
log.Error("Error inserting package blob: %v", err)
return nil, false, "", err
}
// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
if exists {
err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(pb.HashSHA256))
if err != nil && errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256)
exists = false
}
}
if !exists {
contentStore := packages_module.NewContentStore()
if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), buf, buf.Size()); err != nil {
Expand Down

0 comments on commit ad0ee26

Please sign in to comment.