Skip to content

Commit

Permalink
storage: take lock files into consideration while garbage collecting
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Dec 19, 2022
1 parent 9d77641 commit 49982d4
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 21 deletions.
2 changes: 1 addition & 1 deletion controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)/2))
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)/2))
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/helmchart_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1.
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)/2))
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/helmrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)/2))
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/ocirepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)/2))
return nil
}
}
Expand Down
42 changes: 27 additions & 15 deletions controllers/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,26 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) ([]string, err

// getGarbageFiles returns all files that need to be garbage collected for the given artifact.
// Garbage files are determined based on the below flow:
// 1. collect all files with an expired ttl
// 1. collect all artifact files with an expired ttl
// 2. if we satisfy maxItemsToBeRetained, then return
// 3. else, remove all files till the latest n files remain, where n=maxItemsToBeRetained
func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) {
// 3. else, collect all artifact files till the latest n files remain, where n=maxItemsToBeRetained
func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) (garbageFiles []string, _ error) {
localPath := s.LocalPath(artifact)
dir := filepath.Dir(localPath)
garbageFiles := []string{}
filesWithCreatedTs := make(map[time.Time]string)
artifactFilesWithCreatedTs := make(map[time.Time]string)
// sortedPaths contain all files sorted according to their created ts.
sortedPaths := []string{}
now := time.Now().UTC()
totalFiles := 0
totalArtifactFiles := 0
var errors []string
creationTimestamps := []time.Time{}
_ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
errors = append(errors, err.Error())
return nil
}
if totalFiles >= totalCountLimit {
return fmt.Errorf("reached file walking limit, already walked over: %d", totalFiles)
if totalArtifactFiles >= totalCountLimit {
return fmt.Errorf("reached file walking limit, already walked over: %d", totalArtifactFiles)
}
info, err := d.Info()
if err != nil {
Expand All @@ -189,14 +188,16 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m
createdAt := info.ModTime().UTC()
diff := now.Sub(createdAt)
// Compare the time difference between now and the time at which the file was created
// with the provided TTL. Delete if the difference is greater than the TTL.
// with the provided TTL. Delete if the difference is greater than the TTL. Since the
// below logic just deals with determining if an artifact needs to be garbage collected,
// we avoid all lock files, adding them at the end to the list of garbage files.
expired := diff > ttl
if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink {
if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink && filepath.Ext(path) != ".lock" {
if path != localPath && expired {
garbageFiles = append(garbageFiles, path)
}
totalFiles += 1
filesWithCreatedTs[createdAt] = path
totalArtifactFiles += 1
artifactFilesWithCreatedTs[createdAt] = path
creationTimestamps = append(creationTimestamps, createdAt)
}
return nil
Expand All @@ -208,14 +209,14 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m

// We already collected enough garbage files to satisfy the no. of max
// items that are supposed to be retained, so exit early.
if totalFiles-len(garbageFiles) < maxItemsToBeRetained {
if totalArtifactFiles-len(garbageFiles) < maxItemsToBeRetained {
return garbageFiles, nil
}

// sort all timestamps in an ascending order.
sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) })
for _, ts := range creationTimestamps {
path, ok := filesWithCreatedTs[ts]
path, ok := artifactFilesWithCreatedTs[ts]
if !ok {
return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts)
}
Expand All @@ -225,7 +226,7 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m
var collected int
noOfGarbageFiles := len(garbageFiles)
for _, path := range sortedPaths {
if path != localPath && !stringInSlice(path, garbageFiles) {
if path != localPath && filepath.Ext(path) != ".lock" && !stringInSlice(path, garbageFiles) {
// If we previously collected a few garbage files with an expired ttl, then take that into account
// when checking whether we need to remove more files to satisfy the max no. of items allowed
// in the filesystem, along with the no. of files already removed in this loop.
Expand Down Expand Up @@ -271,6 +272,17 @@ func (s *Storage) GarbageCollect(ctx context.Context, artifact sourcev1.Artifact
} else {
deleted = append(deleted, file)
}
// If a lock file exists for this garbage artifact, remove that too.
lockFile := file + ".lock"
if _, err = os.Lstat(lockFile); err == nil {
err = os.Remove(lockFile)
if err != nil {
errors = append(errors, err)
} else {
deleted = append(deleted, lockFile)
}

}
}
}
if len(errors) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func main() {
flag.StringSliceVar(&git.HostKeyAlgos, "ssh-hostkey-algos", []string{},
"The list of hostkey algorithms to use for ssh connections, arranged from most preferred to the least.")
flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second,
"The duration of time that artifacts will be kept in storage before being garbage collected.")
"The duration of time that artifacts from previous reconcilations will be kept in storage before being garbage collected.")
flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2,
"The maximum number of artifacts to be kept in storage after a garbage collection.")

Expand Down

0 comments on commit 49982d4

Please sign in to comment.