Skip to content

Commit

Permalink
fix(repo-server): excess git requests, cache lock on revisions
Browse files Browse the repository at this point in the history
Signed-off-by: nromriell <[email protected]>
  • Loading branch information
nromriell committed Feb 7, 2024
1 parent d494d3a commit 18eb32c
Show file tree
Hide file tree
Showing 12 changed files with 896 additions and 108 deletions.
189 changes: 158 additions & 31 deletions reposerver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/argoproj/gitops-engine/pkg/utils/text"
"github.com/go-git/go-git/v5/plumbing"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

Expand All @@ -24,11 +25,13 @@ import (
)

var ErrCacheMiss = cacheutil.ErrCacheMiss
var ErrCacheKeyLocked = cacheutil.ErrCacheKeyLocked

type Cache struct {
cache *cacheutil.Cache
repoCacheExpiration time.Duration
revisionCacheExpiration time.Duration
cache *cacheutil.Cache
repoCacheExpiration time.Duration
revisionCacheExpiration time.Duration
revisionCacheLockTimeout time.Duration
}

// ClusterRuntimeInfo holds cluster runtime information
Expand All @@ -40,7 +43,7 @@ type ClusterRuntimeInfo interface {
}

func NewCache(cache *cacheutil.Cache, repoCacheExpiration time.Duration, revisionCacheExpiration time.Duration) *Cache {
return &Cache{cache, repoCacheExpiration, revisionCacheExpiration}
return &Cache{cache, repoCacheExpiration, revisionCacheExpiration, 10 * time.Second}
}

func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...cacheutil.Options) func() (*Cache, error) {
Expand Down Expand Up @@ -140,12 +143,17 @@ func listApps(repoURL, revision string) string {

func (c *Cache) ListApps(repoUrl, revision string) (map[string]string, error) {
res := make(map[string]string)
err := c.cache.GetItem(listApps(repoUrl, revision), &res)
err := c.cache.GetItem(listApps(repoUrl, revision), &res, nil)
return res, err
}

func (c *Cache) SetApps(repoUrl, revision string, apps map[string]string) error {
return c.cache.SetItem(listApps(repoUrl, revision), apps, c.repoCacheExpiration, apps == nil)
return c.cache.SetItem(
listApps(repoUrl, revision),
apps,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: apps == nil})
}

func helmIndexRefsKey(repo string) string {
Expand All @@ -154,12 +162,19 @@ func helmIndexRefsKey(repo string) string {

// SetHelmIndex stores helm repository index.yaml content to cache
func (c *Cache) SetHelmIndex(repo string, indexData []byte) error {
return c.cache.SetItem(helmIndexRefsKey(repo), indexData, c.revisionCacheExpiration, false)
if indexData == nil {
// Logged as warning upstream
return fmt.Errorf("helm index data is nil, skipping cache")
}
return c.cache.SetItem(
helmIndexRefsKey(repo),
indexData,
&cacheutil.CacheActionOpts{Expiration: c.revisionCacheExpiration})
}

// GetHelmIndex retrieves helm repository index.yaml content from cache
func (c *Cache) GetHelmIndex(repo string, indexData *[]byte) error {
return c.cache.GetItem(helmIndexRefsKey(repo), indexData)
return c.cache.GetItem(helmIndexRefsKey(repo), indexData, nil)
}

func gitRefsKey(repo string) string {
Expand All @@ -172,21 +187,108 @@ func (c *Cache) SetGitReferences(repo string, references []*plumbing.Reference)
for i := range references {
input = append(input, references[i].Strings())
}
return c.cache.SetItem(gitRefsKey(repo), input, c.revisionCacheExpiration, false)
return c.cache.SetItem(gitRefsKey(repo), input, &cacheutil.CacheActionOpts{Expiration: c.revisionCacheExpiration})
}

// Converts raw cache items to plumbing.Reference objects
func GitRefCacheItemToReferences(cacheItem [][2]string) *[]*plumbing.Reference {
var res []*plumbing.Reference
for i := range cacheItem {
// Skip empty data
if cacheItem[i][0] != "" || cacheItem[i][1] != "" {
res = append(res, plumbing.NewReferenceFromStrings(cacheItem[i][0], cacheItem[i][1]))
}
}
return &res
}

// GetGitReferences retrieves resolved Git repository references from cache
func (c *Cache) GetGitReferences(repo string, references *[]*plumbing.Reference) error {
// TryLockGitRefCache attempts to lock the key for the Git repository references if the key doesn't exist
func (c *Cache) TryLockGitRefCache(repo string, lockId string) error {
// This try set with DisableOverwrite is important for making sure that only one process is able to claim ownership
// A normal get + set, or just set would cause ownership to go to whoever the last writer was, and during race conditions
// leads to duplicate requests
err := c.cache.SetItem(gitRefsKey(repo), [][2]string{{cacheutil.CacheLockedValue, lockId}}, &cacheutil.CacheActionOpts{
Expiration: c.revisionCacheLockTimeout,
DisableOverwrite: true})
return err
}

func (c *Cache) GetGitReferences(repo string, lockId string) (lockOwner string, references *[]*plumbing.Reference, err error) {
var input [][2]string
if err := c.cache.GetItem(gitRefsKey(repo), &input); err != nil {
return err
err = c.cache.GetItem(gitRefsKey(repo), &input, &cacheutil.CacheActionOpts{Expiration: c.revisionCacheExpiration})
if err == ErrCacheMiss {
// Expected
return "", nil, nil
} else if err == nil && len(input) > 0 && len(input[0]) > 0 {
if input[0][0] != cacheutil.CacheLockedValue {
// Valid value in cache, convert to plumbing.Reference and return
return "", GitRefCacheItemToReferences(input), nil
} else {
// The key lock is being held
return input[0][1], nil, nil
}
}
var res []*plumbing.Reference
for i := range input {
res = append(res, plumbing.NewReferenceFromStrings(input[i][0], input[i][1]))
return "", nil, err
}

// GetOrLockGitReferences retrieves the git references if they exist, otherwise creates a lock and returns so the caller can populate the cache
func (c *Cache) GetOrLockGitReferences(repo string, references *[]*plumbing.Reference) (updateCache bool, lockId string, err error) {
myLockUUID, err := uuid.NewRandom()
if err != nil {
log.Debug("Error generating git references cache lock id: ", err)
return false, "", err
}
*references = res
return nil
// We need to be able to identify that our lock was the successful one, otherwise we'll still have duplicate requests
myLockId := myLockUUID.String()
// Value matches the ttl on the lock in TryLockGitRefCache
waitUntil := time.Now().Add(c.revisionCacheLockTimeout)
// Wait only the maximum amount of time configured for the lock
for time.Now().Before(waitUntil) {
// Attempt to retrieve the key from local cache only
if _, cacheReferences, err := c.GetGitReferences(repo, myLockId); err != nil || cacheReferences != nil {
if cacheReferences != nil {
*references = *cacheReferences
}
return false, myLockId, err
}
// Could not get key locally attempt to get the lock
err = c.TryLockGitRefCache(repo, myLockId)
if err != nil {
// Log but ignore this error since we'll want to retry, failing to obtain the lock should not throw an error
log.Errorf("Error attempting to acquire git references cache lock: %v", err)
}
// Attempt to retrieve the key again to see if we have the lock, or the key was populated
if lockOwner, cacheReferences, err := c.GetGitReferences(repo, myLockId); err != nil || cacheReferences != nil {
if cacheReferences != nil {
// Someone else populated the key
*references = *cacheReferences
}
return false, myLockId, err
} else if lockOwner == myLockId {
// We have the lock, populate the key
return true, myLockId, nil
}
// Wait for lock, valid value, or timeout
time.Sleep(1 * time.Second)
}
// Timeout waiting for lock
log.Debug("Repository cache was unable to acquire lock or valid data within timeout")
return true, myLockId, nil
}

// UnlockGitReferences unlocks the key for the Git repository references if needed
func (c *Cache) UnlockGitReferences(repo string, lockId string) error {
var input [][2]string
var err error
if err = c.cache.GetItem(gitRefsKey(repo), &input, nil); err == nil &&
len(input) > 0 &&
len(input[0]) > 1 &&
input[0][0] == cacheutil.CacheLockedValue &&
input[0][1] == lockId {
// We have the lock, so remove it
return c.cache.SetItem(gitRefsKey(repo), input, &cacheutil.CacheActionOpts{Delete: true})
}
return err
}

// refSourceCommitSHAs is a list of resolved revisions for each ref source. This allows us to invalidate the cache
Expand Down Expand Up @@ -231,7 +333,7 @@ func (c *Cache) SetNewRevisionManifests(newRevision string, revision string, app
}

func (c *Cache) GetManifests(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, clusterInfo ClusterRuntimeInfo, namespace string, trackingMethod string, appLabelKey string, appName string, res *CachedManifestResponse, refSourceCommitSHAs ResolvedRevisions) error {
err := c.cache.GetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), res)
err := c.cache.GetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), res, nil)

if err != nil {
return err
Expand Down Expand Up @@ -274,11 +376,19 @@ func (c *Cache) SetManifests(revision string, appSrc *appv1.ApplicationSource, s
res.CacheEntryHash = hash
}

return c.cache.SetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), res, c.repoCacheExpiration, res == nil)
return c.cache.SetItem(
manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs),
res,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: res == nil})
}

func (c *Cache) DeleteManifests(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, clusterInfo ClusterRuntimeInfo, namespace, trackingMethod, appLabelKey, appName string, refSourceCommitSHAs ResolvedRevisions) error {
return c.cache.SetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), "", c.repoCacheExpiration, true)
return c.cache.SetItem(
manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs),
"",
&cacheutil.CacheActionOpts{Delete: true})
}

func appDetailsCacheKey(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, trackingMethod appv1.TrackingMethod, refSourceCommitSHAs ResolvedRevisions) string {
Expand All @@ -289,11 +399,16 @@ func appDetailsCacheKey(revision string, appSrc *appv1.ApplicationSource, srcRef
}

func (c *Cache) GetAppDetails(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, res *apiclient.RepoAppDetailsResponse, trackingMethod appv1.TrackingMethod, refSourceCommitSHAs ResolvedRevisions) error {
return c.cache.GetItem(appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs), res)
return c.cache.GetItem(appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs), res, nil)
}

func (c *Cache) SetAppDetails(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, res *apiclient.RepoAppDetailsResponse, trackingMethod appv1.TrackingMethod, refSourceCommitSHAs ResolvedRevisions) error {
return c.cache.SetItem(appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs), res, c.repoCacheExpiration, res == nil)
return c.cache.SetItem(
appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs),
res,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: res == nil})
}

func revisionMetadataKey(repoURL, revision string) string {
Expand All @@ -302,11 +417,14 @@ func revisionMetadataKey(repoURL, revision string) string {

func (c *Cache) GetRevisionMetadata(repoURL, revision string) (*appv1.RevisionMetadata, error) {
item := &appv1.RevisionMetadata{}
return item, c.cache.GetItem(revisionMetadataKey(repoURL, revision), item)
return item, c.cache.GetItem(revisionMetadataKey(repoURL, revision), item, nil)
}

func (c *Cache) SetRevisionMetadata(repoURL, revision string, item *appv1.RevisionMetadata) error {
return c.cache.SetItem(revisionMetadataKey(repoURL, revision), item, c.repoCacheExpiration, false)
return c.cache.SetItem(
revisionMetadataKey(repoURL, revision),
item,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func revisionChartDetailsKey(repoURL, chart, revision string) string {
Expand All @@ -315,37 +433,46 @@ func revisionChartDetailsKey(repoURL, chart, revision string) string {

func (c *Cache) GetRevisionChartDetails(repoURL, chart, revision string) (*appv1.ChartDetails, error) {
item := &appv1.ChartDetails{}
return item, c.cache.GetItem(revisionChartDetailsKey(repoURL, chart, revision), item)
return item, c.cache.GetItem(revisionChartDetailsKey(repoURL, chart, revision), item, nil)
}

func (c *Cache) SetRevisionChartDetails(repoURL, chart, revision string, item *appv1.ChartDetails) error {
return c.cache.SetItem(revisionChartDetailsKey(repoURL, chart, revision), item, c.repoCacheExpiration, false)
return c.cache.SetItem(
revisionChartDetailsKey(repoURL, chart, revision),
item,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func gitFilesKey(repoURL, revision, pattern string) string {
return fmt.Sprintf("gitfiles|%s|%s|%s", repoURL, revision, pattern)
}

func (c *Cache) SetGitFiles(repoURL, revision, pattern string, files map[string][]byte) error {
return c.cache.SetItem(gitFilesKey(repoURL, revision, pattern), &files, c.repoCacheExpiration, false)
return c.cache.SetItem(
gitFilesKey(repoURL, revision, pattern),
&files,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func (c *Cache) GetGitFiles(repoURL, revision, pattern string) (map[string][]byte, error) {
var item map[string][]byte
return item, c.cache.GetItem(gitFilesKey(repoURL, revision, pattern), &item)
return item, c.cache.GetItem(gitFilesKey(repoURL, revision, pattern), &item, nil)
}

func gitDirectoriesKey(repoURL, revision string) string {
return fmt.Sprintf("gitdirs|%s|%s", repoURL, revision)
}

func (c *Cache) SetGitDirectories(repoURL, revision string, directories []string) error {
return c.cache.SetItem(gitDirectoriesKey(repoURL, revision), &directories, c.repoCacheExpiration, false)
return c.cache.SetItem(
gitDirectoriesKey(repoURL, revision),
&directories,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func (c *Cache) GetGitDirectories(repoURL, revision string) ([]string, error) {
var item []string
return item, c.cache.GetItem(gitDirectoriesKey(repoURL, revision), &item)
return item, c.cache.GetItem(gitDirectoriesKey(repoURL, revision), &item, nil)
}

func (cmr *CachedManifestResponse) shallowCopy() *CachedManifestResponse {
Expand Down
Loading

0 comments on commit 18eb32c

Please sign in to comment.