From 39bffa54d90b616dd435f15f9f58cdc2d751c79e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Thu, 10 May 2018 08:48:38 +0000 Subject: [PATCH] image-pruner: prune images in their own jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of pruning in phases: all streams -> all layers -> all blobs -> manifests -> images Prune individual images in parallel jobs: all streams -> parallel [ image1's layers -> image1's blobs -> ... -> image1, image2's layers -> image2's blobs -> ... -> image2, ... ] A failure in streams prune phase is not fatal anymore. Signed-off-by: Michal Minář --- pkg/cmd/server/bootstrappolicy/policy.go | 2 +- pkg/oc/admin/prune/imageprune/helper.go | 40 +- pkg/oc/admin/prune/imageprune/prune.go | 806 +++++++++++----- pkg/oc/admin/prune/imageprune/prune_test.go | 893 ++++++++++++++++-- .../admin/prune/imageprune/testutil/util.go | 11 +- pkg/oc/admin/prune/imageprune/worker.go | 359 +++++++ pkg/oc/admin/prune/images.go | 289 ++++-- pkg/oc/graph/imagegraph/nodes/nodes.go | 5 + pkg/oc/graph/imagegraph/nodes/types.go | 5 +- .../bootstrap_cluster_roles.yaml | 1 + .../bootstrap_policy_file.yaml | 1 + 11 files changed, 2016 insertions(+), 396 deletions(-) create mode 100644 pkg/oc/admin/prune/imageprune/worker.go diff --git a/pkg/cmd/server/bootstrappolicy/policy.go b/pkg/cmd/server/bootstrappolicy/policy.go index 086698ff3167..2e37bc83f7b7 100644 --- a/pkg/cmd/server/bootstrappolicy/policy.go +++ b/pkg/cmd/server/bootstrappolicy/policy.go @@ -505,7 +505,7 @@ func GetOpenshiftBootstrapClusterRoles() []rbac.ClusterRole { rbac.NewRule("get", "list").Groups(appsGroup, extensionsGroup).Resources("replicasets").RuleOrDie(), rbac.NewRule("delete").Groups(imageGroup, legacyImageGroup).Resources("images").RuleOrDie(), - rbac.NewRule("get", "list").Groups(imageGroup, legacyImageGroup).Resources("images", "imagestreams").RuleOrDie(), + rbac.NewRule("get", "list", "watch").Groups(imageGroup, legacyImageGroup).Resources("images", "imagestreams").RuleOrDie(), rbac.NewRule("update").Groups(imageGroup, legacyImageGroup).Resources("imagestreams/status").RuleOrDie(), }, }, diff --git a/pkg/oc/admin/prune/imageprune/helper.go b/pkg/oc/admin/prune/imageprune/helper.go index 2a707446cdcb..2b59b0dae19e 100644 --- a/pkg/oc/admin/prune/imageprune/helper.go +++ b/pkg/oc/admin/prune/imageprune/helper.go @@ -7,12 +7,15 @@ import ( "sort" "strings" - kapi "k8s.io/kubernetes/pkg/apis/core" - "github.com/docker/distribution/registry/api/errcode" "github.com/golang/glog" + kmeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/kubernetes/pkg/api/legacyscheme" + kapiref "k8s.io/kubernetes/pkg/api/ref" + kapi "k8s.io/kubernetes/pkg/apis/core" imageapi "github.com/openshift/origin/pkg/image/apis/image" "github.com/openshift/origin/pkg/util/netutils" @@ -265,3 +268,36 @@ func (e *ErrBadReference) String() string { } return fmt.Sprintf("%s[%s]: invalid %s reference %q: %s", e.kind, name, targetKind, e.reference, e.reason) } + +func getName(obj runtime.Object) string { + accessor, err := kmeta.Accessor(obj) + if err != nil { + glog.V(4).Infof("Error getting accessor for %#v", obj) + return "" + } + ns := accessor.GetNamespace() + if len(ns) == 0 { + return accessor.GetName() + } + return fmt.Sprintf("%s/%s", ns, accessor.GetName()) +} + +func getKindName(obj *kapi.ObjectReference) string { + if obj == nil { + return "unknown object" + } + name := obj.Name + if len(obj.Namespace) > 0 { + name = obj.Namespace + "/" + name + } + return fmt.Sprintf("%s[%s]", obj.Kind, name) +} + +func getRef(obj runtime.Object) *kapi.ObjectReference { + ref, err := kapiref.GetReference(legacyscheme.Scheme, obj) + if err != nil { + glog.Errorf("failed to get reference to object %T: %v", obj, err) + return nil + } + return ref +} diff --git a/pkg/oc/admin/prune/imageprune/prune.go b/pkg/oc/admin/prune/imageprune/prune.go index 302696c599be..3724a43a9c50 100644 --- a/pkg/oc/admin/prune/imageprune/prune.go +++ b/pkg/oc/admin/prune/imageprune/prune.go @@ -2,10 +2,12 @@ package imageprune import ( "encoding/json" + "errors" "fmt" "net/http" "net/url" "reflect" + "sort" "strings" "time" @@ -15,15 +17,13 @@ import ( gonum "github.com/gonum/graph" kerrapi "k8s.io/apimachinery/pkg/api/errors" - kmeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/retry" - "k8s.io/kubernetes/pkg/api/legacyscheme" - kapiref "k8s.io/kubernetes/pkg/api/ref" kapi "k8s.io/kubernetes/pkg/apis/core" kapisext "k8s.io/kubernetes/pkg/apis/extensions" @@ -56,8 +56,25 @@ const ( // ReferencedImageLayerEdgeKind defines an edge from an ImageStreamNode or an // ImageNode to an ImageComponentNode. ReferencedImageLayerEdgeKind = "ReferencedImageLayer" + + // ReferencedImageManifestEdgeKind defines an edge from an ImageStreamNode or an + // ImageNode to an ImageComponentNode. + ReferencedImageManifestEdgeKind = "ReferencedImageManifest" + + defaultPruneImageWorkerCount = 5 ) +// RegistryClientFactoryFunc is a factory function returning a registry client for use in a worker. +type RegistryClientFactoryFunc func() (*http.Client, error) + +//ImagePrunerFactoryFunc is a factory function returning an image deleter for use in a worker. +type ImagePrunerFactoryFunc func() (ImageDeleter, error) + +// FakeRegistryClientFactory is a registry client factory creating no client at all. Useful for dry run. +func FakeRegistryClientFactory() (*http.Client, error) { + return nil, nil +} + // pruneAlgorithm contains the various settings to use when evaluating images // and layers for pruning. type pruneAlgorithm struct { @@ -130,9 +147,13 @@ type PrunerOptions struct { // Images is the entire list of images in OpenShift. An image must be in this // list to be a candidate for pruning. Images *imageapi.ImageList + // ImageWatcher watches for image changes. + ImageWatcher watch.Interface // Streams is the entire list of image streams across all namespaces in the // cluster. Streams *imageapi.ImageStreamList + // StreamWatcher watches for stream changes. + StreamWatcher watch.Interface // Pods is the entire list of pods across all namespaces in the cluster. Pods *kapi.PodList // RCs is the entire list of replication controllers across all namespaces in @@ -157,29 +178,46 @@ type PrunerOptions struct { // will be removed. DryRun bool // RegistryClient is the http.Client to use when contacting the registry. - RegistryClient *http.Client + RegistryClientFactory RegistryClientFactoryFunc // RegistryURL is the URL of the integrated Docker registry. RegistryURL *url.URL // IgnoreInvalidRefs indicates that all invalid references should be ignored. IgnoreInvalidRefs bool + // NumWorkers is a desired number of workers concurrently handling image prune jobs. If less than 1, the + // default number of workers will be spawned. + NumWorkers int } -// Pruner knows how to prune istags, images, layers and image configs. +// Pruner knows how to prune istags, images, manifest, layers, image configs and blobs. type Pruner interface { // Prune uses imagePruner, streamPruner, layerLinkPruner, blobPruner, and // manifestPruner to remove images that have been identified as candidates // for pruning based on the Pruner's internal pruning algorithm. // Please see NewPruner for details on the algorithm. - Prune(imagePruner ImageDeleter, streamPruner ImageStreamDeleter, layerLinkPruner LayerLinkDeleter, blobPruner BlobDeleter, manifestPruner ManifestDeleter) error + Prune( + imagePrunerFactory ImagePrunerFactoryFunc, + streamPruner ImageStreamDeleter, + layerLinkPruner LayerLinkDeleter, + blobPruner BlobDeleter, + manifestPruner ManifestDeleter, + ) (deletions []Deletion, failures []Failure) } // pruner is an object that knows how to prune a data set type pruner struct { - g genericgraph.Graph - algorithm pruneAlgorithm - registryClient *http.Client - registryURL *url.URL - ignoreInvalidRefs bool + g genericgraph.Graph + algorithm pruneAlgorithm + ignoreInvalidRefs bool + registryClientFactory RegistryClientFactoryFunc + registryURL *url.URL + imageWatcher watch.Interface + imageStreamWatcher watch.Interface + imageStreamLimits map[string][]*kapi.LimitRange + // sorted queue of images to prune; nil stands for empty queue + queue *nodeItem + // contains prunable images removed from queue that are currently being processed + processedImages map[*imagegraph.ImageNode]*Job + numWorkers int } var _ Pruner = &pruner{} @@ -256,10 +294,19 @@ func NewPruner(options PrunerOptions) (Pruner, kerrors.Aggregate) { algorithm.namespace = options.Namespace p := &pruner{ - algorithm: algorithm, - registryClient: options.RegistryClient, - registryURL: options.RegistryURL, - ignoreInvalidRefs: options.IgnoreInvalidRefs, + algorithm: algorithm, + ignoreInvalidRefs: options.IgnoreInvalidRefs, + registryClientFactory: options.RegistryClientFactory, + registryURL: options.RegistryURL, + processedImages: make(map[*imagegraph.ImageNode]*Job), + imageWatcher: options.ImageWatcher, + imageStreamWatcher: options.StreamWatcher, + imageStreamLimits: options.LimitRanges, + numWorkers: options.NumWorkers, + } + + if p.numWorkers < 1 { + p.numWorkers = defaultPruneImageWorkerCount } if err := p.buildGraph(options); err != nil { @@ -296,10 +343,7 @@ func getValue(option interface{}) string { return "" } -// addImagesToGraph adds all images to the graph that belong to one of the -// registries in the algorithm and are at least as old as the minimum age -// threshold as specified by the algorithm. It also adds all the images' layers -// to the graph. +// addImagesToGraph adds all images, their manifests and their layers to the graph. func (p *pruner) addImagesToGraph(images *imageapi.ImageList) []error { for i := range images.Items { image := &images.Items[i] @@ -319,6 +363,10 @@ func (p *pruner) addImagesToGraph(images *imageapi.ImageList) []error { layerNode := imagegraph.EnsureImageComponentLayerNode(p.g, layer.Name) p.g.AddEdge(imageNode, layerNode, ReferencedImageLayerEdgeKind) } + + glog.V(4).Infof("Adding image manifest %q to graph", image.Name) + manifestNode := imagegraph.EnsureImageComponentManifestNode(p.g, image.Name) + p.g.AddEdge(imageNode, manifestNode, ReferencedImageManifestEdgeKind) } return nil @@ -402,10 +450,15 @@ func (p *pruner) addImageStreamsToGraph(streams *imageapi.ImageStreamList, limit } glog.V(4).Infof("Adding reference from stream %s to %s", getName(stream), cn.Describe()) - if cn.Type == imagegraph.ImageComponentTypeConfig { + switch cn.Type { + case imagegraph.ImageComponentTypeConfig: p.g.AddEdge(imageStreamNode, s, ReferencedImageConfigEdgeKind) - } else { + case imagegraph.ImageComponentTypeLayer: p.g.AddEdge(imageStreamNode, s, ReferencedImageLayerEdgeKind) + case imagegraph.ImageComponentTypeManifest: + p.g.AddEdge(imageStreamNode, s, ReferencedImageManifestEdgeKind) + default: + utilruntime.HandleError(fmt.Errorf("internal error: unhandled image component type %q", cn.Type)) } } } @@ -772,6 +825,88 @@ func (p *pruner) addBuildStrategyImageReferencesToGraph(referrer *kapi.ObjectRef return nil } +func (p *pruner) handleImageStreamEvent(event watch.Event) { + getIsNode := func() (*imageapi.ImageStream, *imagegraph.ImageStreamNode) { + is, ok := event.Object.(*imageapi.ImageStream) + if !ok { + utilruntime.HandleError(fmt.Errorf("internal error: expected ImageStream object in %s event, not %T", event.Type, event.Object)) + return nil, nil + } + n := p.g.Find(imagegraph.ImageStreamNodeName(is)) + if isNode, ok := n.(*imagegraph.ImageStreamNode); ok { + return is, isNode + } + return is, nil + } + + // NOTE: an addition of an imagestream previously deleted from the graph is a noop due to a limitation of + // the current gonum/graph package + switch event.Type { + case watch.Added: + is, isNode := getIsNode() + if is == nil { + return + } + if isNode != nil { + glog.V(4).Infof("Ignoring added ImageStream %s that is already present in the graph", getName(is)) + return + } + glog.V(4).Infof("Adding ImageStream %s to the graph", getName(is)) + p.addImageStreamsToGraph(&imageapi.ImageStreamList{Items: []imageapi.ImageStream{*is}}, p.imageStreamLimits) + + case watch.Modified: + is, isNode := getIsNode() + if is == nil { + return + } + + if isNode != nil { + glog.V(4).Infof("Removing updated ImageStream %s from the graph", getName(is)) + // first remove the current node if present + p.g.RemoveNode(isNode) + } + + glog.V(4).Infof("Adding updated ImageStream %s back to the graph", getName(is)) + p.addImageStreamsToGraph(&imageapi.ImageStreamList{Items: []imageapi.ImageStream{*is}}, p.imageStreamLimits) + } +} + +func (p *pruner) handleImageEvent(event watch.Event) { + getImageNode := func() (*imageapi.Image, *imagegraph.ImageNode) { + img, ok := event.Object.(*imageapi.Image) + if !ok { + utilruntime.HandleError(fmt.Errorf("internal error: expected Image object in %s event, not %T", event.Type, event.Object)) + return nil, nil + } + return img, imagegraph.FindImage(p.g, img.Name) + } + + switch event.Type { + // NOTE: an addition of an image previously deleted from the graph is a noop due to a limitation of the + // current gonum/graph package + case watch.Added: + img, imgNode := getImageNode() + if img == nil { + return + } + if imgNode != nil { + glog.V(4).Infof("Ignoring added Image %s that is already present in the graph", img) + return + } + glog.V(4).Infof("Adding new Image %s to the graph", img.Name) + p.addImagesToGraph(&imageapi.ImageList{Items: []imageapi.Image{*img}}) + + case watch.Deleted: + img, imgNode := getImageNode() + if imgNode == nil { + glog.V(4).Infof("Ignoring event for deleted Image %s that is not present in the graph", img.Name) + return + } + glog.V(4).Infof("Removing deleted image %s from the graph", img.Name) + p.g.RemoveNode(imgNode) + } +} + // getImageNodes returns only nodes of type ImageNode. func getImageNodes(nodes []gonum.Node) map[string]*imagegraph.ImageNode { ret := make(map[string]*imagegraph.ImageNode) @@ -820,83 +955,40 @@ func imageIsPrunable(g genericgraph.Graph, imageNode *imagegraph.ImageNode, algo return true } -// calculatePrunableImages returns the list of prunable images and a -// graph.NodeSet containing the image node IDs. func calculatePrunableImages( g genericgraph.Graph, imageNodes map[string]*imagegraph.ImageNode, algorithm pruneAlgorithm, -) (map[string]*imagegraph.ImageNode, genericgraph.NodeSet) { - prunable := make(map[string]*imagegraph.ImageNode) - ids := make(genericgraph.NodeSet) +) []*imagegraph.ImageNode { + prunable := []*imagegraph.ImageNode{} for _, imageNode := range imageNodes { glog.V(4).Infof("Examining image %q", imageNode.Image.Name) if imageIsPrunable(g, imageNode, algorithm) { glog.V(4).Infof("Image %q is prunable", imageNode.Image.Name) - prunable[imageNode.Image.Name] = imageNode - ids.Add(imageNode.ID()) - } - } - - return prunable, ids -} - -// subgraphWithoutPrunableImages creates a subgraph from g with prunable image -// nodes excluded. -func subgraphWithoutPrunableImages(g genericgraph.Graph, prunableImageIDs genericgraph.NodeSet) genericgraph.Graph { - return g.Subgraph( - func(g genericgraph.Interface, node gonum.Node) bool { - return !prunableImageIDs.Has(node.ID()) - }, - func(g genericgraph.Interface, from, to gonum.Node, edgeKinds sets.String) bool { - if prunableImageIDs.Has(from.ID()) { - return false - } - if prunableImageIDs.Has(to.ID()) { - return false - } - return true - }, - ) -} - -// calculatePrunableImageComponents returns the list of prunable image components. -func calculatePrunableImageComponents(g genericgraph.Graph) []*imagegraph.ImageComponentNode { - components := []*imagegraph.ImageComponentNode{} - nodes := g.Nodes() - - for i := range nodes { - cn, ok := nodes[i].(*imagegraph.ImageComponentNode) - if !ok { - continue - } - - glog.V(4).Infof("Examining %v", cn) - if imageComponentIsPrunable(g, cn) { - glog.V(4).Infof("%v is prunable", cn) - components = append(components, cn) + prunable = append(prunable, imageNode) } } - return components + return prunable } -func getPrunableComponents(g genericgraph.Graph, prunableImageIDs genericgraph.NodeSet) []*imagegraph.ImageComponentNode { - graphWithoutPrunableImages := subgraphWithoutPrunableImages(g, prunableImageIDs) - return calculatePrunableImageComponents(graphWithoutPrunableImages) -} - -// pruneStreams removes references from all image streams' status.tags entries -// to prunable images, invoking streamPruner.UpdateImageStream for each updated -// stream. +// pruneStreams removes references from all image streams' status.tags entries to prunable images, invoking +// streamPruner.UpdateImageStream for each updated stream. func pruneStreams( g genericgraph.Graph, - prunableImageNodes map[string]*imagegraph.ImageNode, + prunableImageNodes []*imagegraph.ImageNode, streamPruner ImageStreamDeleter, keepYoungerThan time.Time, -) error { +) (deletions []Deletion, failures []Failure) { + imageNameToNode := map[string]*imagegraph.ImageNode{} + for _, node := range prunableImageNodes { + imageNameToNode[node.Image.Name] = node + } + + noChangeErr := errors.New("nothing changed") + glog.V(4).Infof("Removing pruned image references from streams") for _, node := range g.Nodes() { streamNode, ok := node.(*imagegraph.ImageStreamNode) @@ -909,7 +1001,7 @@ func pruneStreams( if err != nil { if kerrapi.IsNotFound(err) { glog.V(4).Infof("Unable to get image stream %s: removed during prune", streamName) - return nil + return noChangeErr } return err } @@ -918,7 +1010,7 @@ func pruneStreams( deletedTags := sets.NewString() for tag := range stream.Status.Tags { - if updated, deleted := pruneISTagHistory(g, prunableImageNodes, keepYoungerThan, streamName, stream, tag); deleted { + if updated, deleted := pruneISTagHistory(g, imageNameToNode, keepYoungerThan, streamName, stream, tag); deleted { deletedTags.Insert(tag) } else if updated { updatedTags.Insert(tag) @@ -926,7 +1018,7 @@ func pruneStreams( } if updatedTags.Len() == 0 && deletedTags.Len() == 0 { - return nil + return noChangeErr } updatedStream, err := streamPruner.UpdateImageStream(stream) @@ -943,13 +1035,42 @@ func pruneStreams( return err }) + if err == noChangeErr { + continue + } if err != nil { - return fmt.Errorf("unable to prune stream %s: %v", streamName, err) + failures = append(failures, Failure{Node: streamNode, Err: err}) + } else { + deletions = append(deletions, Deletion{Node: streamNode}) } } glog.V(4).Infof("Done removing pruned image references from streams") - return nil + return +} + +// strengthenReferencesFromFailedImageStreams turns weak references between image streams and images to +// strong. This must be called right after the image stream pruning to prevent images that failed to be +// untagged from being pruned. +func strengthenReferencesFromFailedImageStreams(g genericgraph.Graph, failures []Failure) { + for _, f := range failures { + for _, n := range g.From(f.Node) { + imageNode, ok := n.(*imagegraph.ImageNode) + if !ok { + continue + } + edge := g.Edge(f.Node, imageNode) + if edge == nil { + continue + } + kinds := g.EdgeKinds(edge) + if kinds.Has(ReferencedImageEdgeKind) { + continue + } + g.RemoveEdge(edge) + g.AddEdge(f.Node, imageNode, ReferencedImageEdgeKind) + } + } } // pruneISTagHistory processes tag event list of the given image stream tag. It removes references to images @@ -1011,170 +1132,436 @@ func tagEventIsPrunable( return false, "the tag event is younger than threshold" } -// pruneImages invokes imagePruner.DeleteImage with each image that is prunable. -func pruneImages(g genericgraph.Graph, imageNodes map[string]*imagegraph.ImageNode, imagePruner ImageDeleter) []error { - errs := []error{} +// byLayerCountAndAge sorts a list of image nodes from the largest (by the number of image layers) to the +// smallest. Images with the same number of layers are ordered from the oldest to the youngest. +type byLayerCountAndAge []*imagegraph.ImageNode - for _, imageNode := range imageNodes { - if err := imagePruner.DeleteImage(imageNode.Image); err != nil { - errs = append(errs, fmt.Errorf("error removing image %q: %v", imageNode.Image.Name, err)) +func (b byLayerCountAndAge) Len() int { return len(b) } +func (b byLayerCountAndAge) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byLayerCountAndAge) Less(i, j int) bool { + fst, snd := b[i].Image, b[j].Image + if len(fst.DockerImageLayers) > len(snd.DockerImageLayers) { + return true + } + if len(fst.DockerImageLayers) < len(snd.DockerImageLayers) { + return false + } + + return fst.CreationTimestamp.Before(&snd.CreationTimestamp) || + (!snd.CreationTimestamp.Before(&fst.CreationTimestamp) && fst.Name < snd.Name) +} + +// nodeItem is an item of a doubly-linked list of image nodes. +type nodeItem struct { + node *imagegraph.ImageNode + prev, next *nodeItem +} + +// pop removes the item from a doubly-linked list and returns the image node it holds and its former next +// neighbour. +func (i *nodeItem) pop() (node *imagegraph.ImageNode, next *nodeItem) { + n, p := i.next, i.prev + if p != nil { + p.next = n + } + if n != nil { + n.prev = p + } + return i.node, n +} + +// insertAfter makes a new list item from the given node and inserts it into the list right after the given +// item. The newly created item is returned. +func insertAfter(item *nodeItem, node *imagegraph.ImageNode) *nodeItem { + newItem := &nodeItem{ + node: node, + prev: item, + } + if item != nil { + if item.next != nil { + item.next.prev = newItem + newItem.next = item.next } + item.next = newItem } + return newItem +} - return errs +// makeQueue makes a doubly-linked list of items out of the given array of image nodes. +func makeQueue(nodes []*imagegraph.ImageNode) *nodeItem { + var head, tail *nodeItem + for i, n := range nodes { + tail = insertAfter(tail, n) + if i == 0 { + head = tail + } + } + return head } -// Run identifies images eligible for pruning, invoking imagePruner for each image, and then it identifies -// image configs and layers eligible for pruning, invoking layerLinkPruner for each registry URL that has -// layers or configs that can be pruned. +// Prune prunes the objects like this: +// 1. it calculates the prunable images and builds a queue +// - the queue does not ever grow, it only shrinks (newly created images are not added) +// 2. it untags the prunable images from image streams +// 3. it spawns workers +// 4. it turns each prunable image into a job for the workers and makes sure they are busy +// 5. it terminates the workers once the queue is empty and reports results func (p *pruner) Prune( - imagePruner ImageDeleter, + imagePrunerFactory ImagePrunerFactoryFunc, streamPruner ImageStreamDeleter, layerLinkPruner LayerLinkDeleter, blobPruner BlobDeleter, manifestPruner ManifestDeleter, -) error { +) (deletions []Deletion, failures []Failure) { allNodes := p.g.Nodes() imageNodes := getImageNodes(allNodes) - if len(imageNodes) == 0 { - return nil + prunable := calculatePrunableImages(p.g, imageNodes, p.algorithm) + + /* Instead of deleting streams in a per-image job, prune them all at once. Otherwise each image stream + * would have to be modified for each prunable image it contains. */ + deletions, failures = pruneStreams(p.g, prunable, streamPruner, p.algorithm.keepYoungerThan) + /* if namespace is specified, prune only ImageStreams and nothing more if we have any errors after + * ImageStreams pruning this may mean that we still have references to images. */ + if len(p.algorithm.namespace) > 0 || len(prunable) == 0 { + return deletions, failures } - prunableImageNodes, prunableImageIDs := calculatePrunableImages(p.g, imageNodes, p.algorithm) + strengthenReferencesFromFailedImageStreams(p.g, failures) - err := pruneStreams(p.g, prunableImageNodes, streamPruner, p.algorithm.keepYoungerThan) - // if namespace is specified prune only ImageStreams and nothing more - // if we have any errors after ImageStreams pruning this may mean that - // we still have references to images. - if len(p.algorithm.namespace) > 0 || err != nil { - return err - } + // Sorting images from the largest (by number of layers) to the smallest is supposed to distribute the + // blob deletion workload equally across whole queue. + // If processed randomly, most probably, job processed in the beginning wouldn't delete any blobs (due to + // too many remaining referers) contrary to the jobs processed at the end. + // The assumption is based on another assumption that images with many layers have a low probability of + // sharing their components with other images. + sort.Sort(byLayerCountAndAge(prunable)) + p.queue = makeQueue(prunable) - var errs []error + var ( + jobChan = make(chan *Job) + resultChan = make(chan JobResult) + ) - if p.algorithm.pruneRegistry { - prunableComponents := getPrunableComponents(p.g, prunableImageIDs) - errs = append(errs, pruneImageComponents(p.g, p.registryClient, p.registryURL, prunableComponents, layerLinkPruner)...) - errs = append(errs, pruneBlobs(p.g, p.registryClient, p.registryURL, prunableComponents, blobPruner)...) - errs = append(errs, pruneManifests(p.g, p.registryClient, p.registryURL, prunableImageNodes, manifestPruner)...) - - if len(errs) > 0 { - // If we had any errors deleting layers, blobs, or manifest data from the registry, - // stop here and don't delete any images. This way, you can rerun prune and retry - // things that failed. - return kerrors.NewAggregate(errs) + defer close(jobChan) + + for i := 0; i < p.numWorkers; i++ { + worker, err := NewWorker( + p.algorithm, + p.registryClientFactory, + p.registryURL, + imagePrunerFactory, + streamPruner, + layerLinkPruner, + blobPruner, + manifestPruner, + ) + if err != nil { + failures = append(failures, Failure{ + Err: fmt.Errorf("failed to initialize worker: %v", err), + }) + return } + go worker.Run(jobChan, resultChan) } - errs = pruneImages(p.g, prunableImageNodes, imagePruner) - return kerrors.NewAggregate(errs) + ds, fs := p.runLoop(jobChan, resultChan) + deletions = append(deletions, ds...) + failures = append(failures, fs...) + + return } -// imageComponentIsPrunable returns true if the image component is not referenced by any images. -func imageComponentIsPrunable(g genericgraph.Graph, cn *imagegraph.ImageComponentNode) bool { - for _, predecessor := range g.To(cn) { - glog.V(4).Infof("Examining predecessor %#v of image config %v", predecessor, cn) - if g.Kind(predecessor) == imagegraph.ImageNodeKind { - glog.V(4).Infof("Config %v has an image predecessor", cn) - return false +// runLoop processes the queue of prunable images until empty. It makes the workers busy and updates the graph +// with each change. +func (p *pruner) runLoop( + jobChan chan<- *Job, + resultChan <-chan JobResult, +) (deletions []Deletion, failures []Failure) { + imgUpdateChan := p.imageWatcher.ResultChan() + isUpdateChan := p.imageStreamWatcher.ResultChan() + for { + // make workers busy + for len(p.processedImages) < p.numWorkers { + job, blocked := p.getNextJob() + if blocked { + break + } + if job == nil { + if len(p.processedImages) == 0 { + return + } + break + } + jobChan <- job + p.processedImages[job.Image] = job } - } - return true + select { + case res := <-resultChan: + p.updateGraphWithResult(&res) + for _, deletion := range res.Deletions { + deletions = append(deletions, deletion) + } + for _, failure := range res.Failures { + failures = append(failures, failure) + } + delete(p.processedImages, res.Job.Image) + case event := <-isUpdateChan: + p.handleImageStreamEvent(event) + case event := <-imgUpdateChan: + p.handleImageEvent(event) + } + } } -// streamReferencingImageComponent returns a list of ImageStreamNodes that reference a -// given ImageComponentNode. -func streamsReferencingImageComponent(g genericgraph.Graph, cn *imagegraph.ImageComponentNode) []*imagegraph.ImageStreamNode { - ret := []*imagegraph.ImageStreamNode{} - for _, predecessor := range g.To(cn) { - if g.Kind(predecessor) != imagegraph.ImageStreamNodeKind { +// getNextJob removes a prunable image from the queue, makes a job out of it and returns it. +// Image may be removed from the queue without being processed if it becomes not prunable (by being referred +// by a new image stream). Image may also be skipped and processed later when it is currently blocked. +// +// Image is blocked when at least one of its components is currently being processed in a running job and +// the component has either: +// - only one remaining strong reference from the blocked image (the other references are being currently +// removed) +// - only one remaining reference in an image stream, where the component is tagged (via image) (the other +// references are being currently removed) +// +// The concept of blocked images attempts to preserve image components until the very last image +// referencing them is deleted. Otherwise an image previously considered as prunable becomes not prunable may +// become not usable since its components have been removed already. +func (p *pruner) getNextJob() (job *Job, blocked bool) { + if p.queue == nil { + return + } + + pop := func(item *nodeItem) (*imagegraph.ImageNode, *nodeItem) { + node, next := item.pop() + if item == p.queue { + p.queue = next + } + return node, next + } + + for item := p.queue; item != nil; { + // something could have changed + if !imageIsPrunable(p.g, item.node, p.algorithm) { + _, item = pop(item) continue } - ret = append(ret, predecessor.(*imagegraph.ImageStreamNode)) + + if components, blocked := getImageComponents(p.g, p.processedImages, item.node); !blocked { + job = &Job{ + Image: item.node, + Components: components, + } + _, item = pop(item) + break + } + item = item.next } - return ret + blocked = job == nil && p.queue != nil + + return } -// pruneImageComponents invokes layerLinkDeleter.DeleteLayerLink for each repository layer link to -// be deleted from the registry. -func pruneImageComponents( - g genericgraph.Graph, - registryClient *http.Client, - registryURL *url.URL, - imageComponents []*imagegraph.ImageComponentNode, - layerLinkDeleter LayerLinkDeleter, -) []error { - errs := []error{} - - for _, cn := range imageComponents { - // get streams that reference config - streamNodes := streamsReferencingImageComponent(g, cn) - - for _, streamNode := range streamNodes { - streamName := getName(streamNode.ImageStream) - glog.V(4).Infof("Pruning repository %s/%s: %s", registryURL.Host, streamName, cn.Describe()) - if err := layerLinkDeleter.DeleteLayerLink(registryClient, registryURL, streamName, cn.Component); err != nil { - errs = append(errs, fmt.Errorf("error pruning layer link %s in the repository %s: %v", cn.Component, streamName, err)) +// updateGraphWithResult updates the graph with the result from completed job. Image nodes are deleted for +// each deleted image. Image components are deleted if they were removed from the global blob store. Unlinked +// imagecomponent (layer/config/manifest link) will cause an edge between image stream and the component to be +// deleted. +func (p *pruner) updateGraphWithResult(res *JobResult) { + imageDeleted := false + for _, d := range res.Deletions { + switch d.Node.(type) { + case *imagegraph.ImageNode: + imageDeleted = true + p.g.RemoveNode(d.Node) + case *imagegraph.ImageComponentNode: + // blob -> delete the node with all the edges + if d.Parent == nil { + p.g.RemoveNode(d.Node) + continue + } + + // link in a repository -> delete just edges + isn, ok := d.Parent.(*imagegraph.ImageStreamNode) + if !ok { + continue } + edge := p.g.Edge(isn, d.Node) + if edge == nil { + continue + } + p.g.RemoveEdge(edge) + case *imagegraph.ImageStreamNode: + // ignore + default: + utilruntime.HandleError(fmt.Errorf("internal error: unhandled graph node %t", d.Node)) } } - return errs + if imageDeleted { + return + } } -// pruneBlobs invokes blobPruner.DeleteBlob for each blob to be deleted from the -// registry. -func pruneBlobs( +// getImageComponents gathers image components with locations, where they can be removed at this time. +// Each component can be prunable in several image streams and in the global blob store. +func getImageComponents( g genericgraph.Graph, - registryClient *http.Client, - registryURL *url.URL, - componentNodes []*imagegraph.ImageComponentNode, - blobPruner BlobDeleter, -) []error { - errs := []error{} + processedImages map[*imagegraph.ImageNode]*Job, + image *imagegraph.ImageNode, +) (components ComponentRetentions, blocked bool) { + components = make(ComponentRetentions) + + for _, node := range g.From(image) { + kinds := g.EdgeKinds(g.Edge(image, node)) + if len(kinds.Intersection(sets.NewString( + ReferencedImageLayerEdgeKind, + ReferencedImageConfigEdgeKind, + ReferencedImageManifestEdgeKind, + ))) == 0 { + continue + } + + imageStrongRefCounter := 0 + imageMarkedForDeletionCounter := 0 + referencingStreams := map[*imagegraph.ImageStreamNode]struct{}{} + referencingImages := map[*imagegraph.ImageNode]struct{}{} + + comp, ok := node.(*imagegraph.ImageComponentNode) + if !ok { + continue + } + + for _, ref := range g.To(comp) { + switch t := ref.(type) { + case (*imagegraph.ImageNode): + imageStrongRefCounter++ + if _, processed := processedImages[t]; processed { + imageMarkedForDeletionCounter++ + } + referencingImages[t] = struct{}{} + + case *imagegraph.ImageStreamNode: + referencingStreams[t] = struct{}{} + + default: + continue + } + } + + switch { + // the component is referenced only by the given image -> prunable globally + case imageStrongRefCounter < 2: + components.Add(comp, true) + // the component can be pruned once the other referencing image that is being deleted is finished; + // don't touch it until then + case imageStrongRefCounter-imageMarkedForDeletionCounter < 2: + return nil, true + // not prunable component + default: + components.Add(comp, false) + } - for _, cn := range componentNodes { - if err := blobPruner.DeleteBlob(registryClient, registryURL, cn.Component); err != nil { - errs = append(errs, fmt.Errorf("error removing blob %s from the registry %s: %v", - cn.Component, registryURL.Host, err)) + if addComponentReferencingStreams( + g, + components, + referencingImages, + referencingStreams, + processedImages, + comp, + ) { + return nil, true } } - return errs + return } -// pruneManifests invokes manifestPruner.DeleteManifest for each repository -// manifest to be deleted from the registry. -func pruneManifests( +// addComponentReferencingStreams records information about prunability of the given component in all the +// streams referencing it (via tagged image). It updates given components attribute. +func addComponentReferencingStreams( g genericgraph.Graph, - registryClient *http.Client, - registryURL *url.URL, - imageNodes map[string]*imagegraph.ImageNode, - manifestPruner ManifestDeleter, -) []error { - errs := []error{} - - for _, imageNode := range imageNodes { - for _, n := range g.To(imageNode) { - streamNode, ok := n.(*imagegraph.ImageStreamNode) - if !ok { + components ComponentRetentions, + referencingImages map[*imagegraph.ImageNode]struct{}, + referencingStreams map[*imagegraph.ImageStreamNode]struct{}, + processedImages map[*imagegraph.ImageNode]*Job, + comp *imagegraph.ImageComponentNode, +) (blocked bool) { +streamLoop: + for stream := range referencingStreams { + refCounter := 0 + markedForDeletionCounter := 0 + + for image := range referencingImages { + edge := g.Edge(stream, image) + if edge == nil { + continue + } + kinds := g.EdgeKinds(edge) + // tagged not prunable image -> keep the component in the stream + if kinds.Has(ReferencedImageEdgeKind) { + components.AddReferencingStreams(comp, false, stream) + continue streamLoop + } + if !kinds.Has(WeakReferencedImageEdgeKind) { continue } - repoName := getName(streamNode.ImageStream) + refCounter++ + if _, processed := processedImages[image]; processed { + markedForDeletionCounter++ + } - glog.V(4).Infof("Pruning manifest %s in the repository %s/%s", imageNode.Image.Name, registryURL.Host, repoName) - if err := manifestPruner.DeleteManifest(registryClient, registryURL, repoName, imageNode.Image.Name); err != nil { - errs = append(errs, fmt.Errorf("error pruning manifest %s in the repository %s/%s: %v", - imageNode.Image.Name, registryURL.Host, repoName, err)) + if refCounter-markedForDeletionCounter > 1 { + components.AddReferencingStreams(comp, false, stream) + continue streamLoop } } + + switch { + // there's just one remaining strong reference from the stream -> unlink + case refCounter < 2: + components.AddReferencingStreams(comp, true, stream) + // there's just one remaining strong reference and at least one another reference now being + // dereferenced in a running job -> wait until it completes + case refCounter-markedForDeletionCounter < 2: + return true + // not yet prunable + default: + components.AddReferencingStreams(comp, false, stream) + } } - return errs + return false +} + +// imageComponentIsPrunable returns true if the image component is not referenced by any images. +func imageComponentIsPrunable(g genericgraph.Graph, cn *imagegraph.ImageComponentNode) bool { + for _, predecessor := range g.To(cn) { + glog.V(4).Infof("Examining predecessor %#v of image config %v", predecessor, cn) + if g.Kind(predecessor) == imagegraph.ImageNodeKind { + glog.V(4).Infof("Config %v has an image predecessor", cn) + return false + } + } + + return true +} + +// streamReferencingImageComponent returns a list of ImageStreamNodes that reference a +// given ImageComponentNode. +func streamsReferencingImageComponent(g genericgraph.Graph, cn *imagegraph.ImageComponentNode) []*imagegraph.ImageStreamNode { + ret := []*imagegraph.ImageStreamNode{} + for _, predecessor := range g.To(cn) { + if g.Kind(predecessor) != imagegraph.ImageStreamNodeKind { + continue + } + ret = append(ret, predecessor.(*imagegraph.ImageStreamNode)) + } + + return ret } // imageDeleter removes an image from OpenShift. @@ -1316,39 +1703,6 @@ func (p *manifestDeleter) DeleteManifest(registryClient *http.Client, registryUR return deleteFromRegistry(registryClient, fmt.Sprintf("%s/v2/%s/manifests/%s", registryURL.String(), repoName, manifest)) } -func getName(obj runtime.Object) string { - accessor, err := kmeta.Accessor(obj) - if err != nil { - glog.V(4).Infof("Error getting accessor for %#v", obj) - return "" - } - ns := accessor.GetNamespace() - if len(ns) == 0 { - return accessor.GetName() - } - return fmt.Sprintf("%s/%s", ns, accessor.GetName()) -} - -func getKindName(obj *kapi.ObjectReference) string { - if obj == nil { - return "unknown object" - } - name := obj.Name - if len(obj.Namespace) > 0 { - name = obj.Namespace + "/" + name - } - return fmt.Sprintf("%s[%s]", obj.Kind, name) -} - -func getRef(obj runtime.Object) *kapi.ObjectReference { - ref, err := kapiref.GetReference(legacyscheme.Scheme, obj) - if err != nil { - glog.Errorf("failed to get reference to object %T: %v", obj, err) - return nil - } - return ref -} - func makeISTag(namespace, name, tag string) *imageapi.ImageStreamTag { return &imageapi.ImageStreamTag{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/oc/admin/prune/imageprune/prune_test.go b/pkg/oc/admin/prune/imageprune/prune_test.go index f02232fa7f7f..d190d033c54a 100644 --- a/pkg/oc/admin/prune/imageprune/prune_test.go +++ b/pkg/oc/admin/prune/imageprune/prune_test.go @@ -8,13 +8,19 @@ import ( "net/http" "net/url" "reflect" + "regexp" + "sort" + "sync" "testing" "time" + "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest/fake" clientgotesting "k8s.io/client-go/testing" kapi "k8s.io/kubernetes/pkg/apis/core" @@ -23,7 +29,8 @@ import ( appsapi "github.com/openshift/origin/pkg/apps/apis/apps" buildapi "github.com/openshift/origin/pkg/build/apis/build" imageapi "github.com/openshift/origin/pkg/image/apis/image" - imageclient "github.com/openshift/origin/pkg/image/generated/internalclientset/fake" + fakeimageclient "github.com/openshift/origin/pkg/image/generated/internalclientset/fake" + imageclient "github.com/openshift/origin/pkg/image/generated/internalclientset/typed/image/internalversion" "github.com/openshift/origin/pkg/oc/admin/prune/imageprune/testutil" "github.com/openshift/origin/pkg/oc/graph/genericgraph" imagegraph "github.com/openshift/origin/pkg/oc/graph/imagegraph/nodes" @@ -44,29 +51,36 @@ func TestImagePruning(t *testing.T) { registryURL := "https://" + registryHost tests := []struct { - name string - pruneOverSizeLimit *bool - allImages *bool - pruneRegistry *bool - ignoreInvalidRefs *bool - keepTagRevisions *int - namespace string - images imageapi.ImageList - pods kapi.PodList - streams imageapi.ImageStreamList - rcs kapi.ReplicationControllerList - bcs buildapi.BuildConfigList - builds buildapi.BuildList - dss kapisext.DaemonSetList - deployments kapisext.DeploymentList - dcs appsapi.DeploymentConfigList - rss kapisext.ReplicaSetList - limits map[string][]*kapi.LimitRange - expectedImageDeletions []string - expectedStreamUpdates []string - expectedLayerLinkDeletions []string - expectedBlobDeletions []string - expectedErrorString string + name string + pruneOverSizeLimit *bool + allImages *bool + pruneRegistry *bool + ignoreInvalidRefs *bool + keepTagRevisions *int + namespace string + images imageapi.ImageList + pods kapi.PodList + streams imageapi.ImageStreamList + rcs kapi.ReplicationControllerList + bcs buildapi.BuildConfigList + builds buildapi.BuildList + dss kapisext.DaemonSetList + deployments kapisext.DeploymentList + dcs appsapi.DeploymentConfigList + rss kapisext.ReplicaSetList + limits map[string][]*kapi.LimitRange + imageDeleterErr error + imageStreamDeleterErr error + layerDeleterErr error + manifestDeleterErr error + blobDeleterErr error + expectedImageDeletions []string + expectedStreamUpdates []string + expectedLayerLinkDeletions []string + expectedManifestLinkDeletions []string + expectedBlobDeletions []string + expectedFailures []string + expectedErrorString string }{ { name: "1 pod - phase pending - don't prune", @@ -110,6 +124,7 @@ func TestImagePruning(t *testing.T) { pods: testutil.PodList(testutil.Pod("foo", "pod1", kapi.PodSucceeded, registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000")), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000", registryURL + "|" + testutil.Layer1, registryURL + "|" + testutil.Layer2, registryURL + "|" + testutil.Layer3, @@ -151,6 +166,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000", registryURL + "|" + testutil.Layer1, registryURL + "|" + testutil.Layer2, registryURL + "|" + testutil.Layer3, @@ -169,6 +185,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000", registryURL + "|" + testutil.Layer1, registryURL + "|" + testutil.Layer2, registryURL + "|" + testutil.Layer3, @@ -185,6 +202,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000", registryURL + "|" + testutil.Layer1, registryURL + "|" + testutil.Layer2, registryURL + "|" + testutil.Layer3, @@ -201,6 +219,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000", registryURL + "|" + testutil.Layer1, registryURL + "|" + testutil.Layer2, registryURL + "|" + testutil.Layer3, @@ -217,6 +236,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000", registryURL + "|" + testutil.Layer1, registryURL + "|" + testutil.Layer2, registryURL + "|" + testutil.Layer3, @@ -247,6 +267,7 @@ func TestImagePruning(t *testing.T) { ), dss: testutil.DSList(testutil.DS("foo", "rc1", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000")), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000001"}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000001"}, }, { @@ -257,6 +278,7 @@ func TestImagePruning(t *testing.T) { ), rss: testutil.RSList(testutil.RS("foo", "rc1", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000")), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000001"}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000001"}, }, { @@ -267,6 +289,7 @@ func TestImagePruning(t *testing.T) { ), deployments: testutil.DeploymentList(testutil.Deployment("foo", "rc1", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000")), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000001"}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000001"}, }, { @@ -371,8 +394,84 @@ func TestImagePruning(t *testing.T) { ), )), ), - expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000004"}, - expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedBlobDeletions: []string{registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + }, + + { + name: "continue on blob deletion failure", + images: testutil.ImageList( + testutil.UnmanagedImage("sha256:0000000000000000000000000000000000000000000000000000000000000000", "otherregistry/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000", false, "", ""), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003"), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ), + streams: testutil.StreamList( + testutil.Stream(registryHost, "foo", "bar", testutil.Tags( + testutil.Tag("latest", + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000000", "otherregistry/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ), + )), + ), + blobDeleterErr: fmt.Errorf("err"), + expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedBlobDeletions: []string{registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedFailures: []string{registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000004|err"}, + }, + + { + name: "continue on manifest link deletion failure", + images: testutil.ImageList( + testutil.UnmanagedImage("sha256:0000000000000000000000000000000000000000000000000000000000000000", "otherregistry/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000", false, "", ""), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003"), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ), + streams: testutil.StreamList( + testutil.Stream(registryHost, "foo", "bar", testutil.Tags( + testutil.Tag("latest", + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000000", "otherregistry/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ), + )), + ), + manifestDeleterErr: fmt.Errorf("err"), + expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedBlobDeletions: []string{registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedFailures: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004|err"}, + }, + + { + name: "stop on image stream update failure", + images: testutil.ImageList( + testutil.UnmanagedImage("sha256:0000000000000000000000000000000000000000000000000000000000000000", "otherregistry/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000", false, "", ""), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003"), + testutil.Image("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ), + streams: testutil.StreamList( + testutil.Stream(registryHost, "foo", "bar", testutil.Tags( + testutil.Tag("latest", + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000000", "otherregistry/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ), + )), + ), + imageStreamDeleterErr: fmt.Errorf("err"), + expectedFailures: []string{"foo/bar|err"}, }, { @@ -453,7 +552,7 @@ func TestImagePruning(t *testing.T) { "foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000000", "foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000002", }, - expectedBlobDeletions: []string{registryURL + "|layer1"}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000001", registryURL + "|layer1"}, }, { @@ -471,6 +570,7 @@ func TestImagePruning(t *testing.T) { )), ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000002"}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000002"}, }, { @@ -526,6 +626,18 @@ func TestImagePruning(t *testing.T) { "foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004", "foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000005", }, + expectedManifestLinkDeletions: []string{ + registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000001", + registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000003", + registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004", + registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000005", + }, + expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000001", + registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000003", + registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000004", + registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000005", + }, }, { @@ -558,6 +670,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedStreamUpdates: []string{}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000"}, }, { @@ -568,6 +681,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedStreamUpdates: []string{}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000"}, }, { @@ -587,6 +701,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedStreamUpdates: []string{}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000"}, }, { @@ -608,6 +723,14 @@ func TestImagePruning(t *testing.T) { "sha256:0000000000000000000000000000000000000000000000000000000000000005", }, expectedStreamUpdates: []string{}, + expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000001", + registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000002", + registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000003", + registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000004", + registryURL + "|" + "sha256:0000000000000000000000000000000000000000000000000000000000000005", + }, }, { @@ -618,6 +741,7 @@ func TestImagePruning(t *testing.T) { ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000000"}, expectedStreamUpdates: []string{}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000"}, }, { @@ -640,6 +764,14 @@ func TestImagePruning(t *testing.T) { "sha256:0000000000000000000000000000000000000000000000000000000000000005", }, expectedStreamUpdates: []string{}, + expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000000", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000001", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000002", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000003", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000004", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000005", + }, }, { @@ -693,7 +825,9 @@ func TestImagePruning(t *testing.T) { registryURL + "|foo/bar|layer7", registryURL + "|foo/bar|layer8", }, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000004", registryURL + "|layer5", registryURL + "|layer6", registryURL + "|layer7", @@ -701,6 +835,49 @@ func TestImagePruning(t *testing.T) { }, }, + { + name: "continue on layer link error", + images: testutil.ImageList( + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000001", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000001", &testutil.Config1, "layer1", "layer2", "layer3", "layer4"), + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002", &testutil.Config2, "layer1", "layer2", "layer3", "layer4"), + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003", nil, "layer1", "layer2", "layer3", "layer4"), + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004", nil, "layer5", "layer6", "layer7", "layer8"), + ), + streams: testutil.StreamList( + testutil.Stream(registryHost, "foo", "bar", testutil.Tags( + testutil.Tag("latest", + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000001", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000001"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ), + )), + ), + layerDeleterErr: fmt.Errorf("err"), + expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000004", + registryURL + "|layer5", + registryURL + "|layer6", + registryURL + "|layer7", + registryURL + "|layer8", + }, + expectedLayerLinkDeletions: []string{ + registryURL + "|foo/bar|layer5", + registryURL + "|foo/bar|layer6", + registryURL + "|foo/bar|layer7", + registryURL + "|foo/bar|layer8", + }, + expectedFailures: []string{ + registryURL + "|foo/bar|layer5|err", + registryURL + "|foo/bar|layer6|err", + registryURL + "|foo/bar|layer7|err", + registryURL + "|foo/bar|layer8|err", + }, + }, + { name: "images with duplicate layers and configs", images: testutil.ImageList( @@ -729,7 +906,10 @@ func TestImagePruning(t *testing.T) { registryURL + "|foo/bar|layer7", registryURL + "|foo/bar|layer8", }, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000004", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000005", registryURL + "|" + testutil.Config2, registryURL + "|layer5", registryURL + "|layer6", @@ -740,6 +920,47 @@ func TestImagePruning(t *testing.T) { }, }, + { + name: "continue on image deletion failure", + images: testutil.ImageList( + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000001", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000001", &testutil.Config1, "layer1", "layer2", "layer3", "layer4"), + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002", &testutil.Config1, "layer1", "layer2", "layer3", "layer4"), + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003", &testutil.Config1, "layer1", "layer2", "layer3", "layer4"), + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004", &testutil.Config2, "layer5", "layer6", "layer7", "layer8"), + testutil.ImageWithLayers("sha256:0000000000000000000000000000000000000000000000000000000000000005", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000005", &testutil.Config2, "layer5", "layer6", "layer9", "layerX"), + ), + streams: testutil.StreamList( + testutil.Stream(registryHost, "foo", "bar", testutil.Tags( + testutil.Tag("latest", + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000001", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000001"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000003", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000004", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ), + )), + ), + imageDeleterErr: fmt.Errorf("err"), + expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000004", "sha256:0000000000000000000000000000000000000000000000000000000000000005"}, + expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedLayerLinkDeletions: []string{ + registryURL + "|foo/bar|" + testutil.Config2, + registryURL + "|foo/bar|layer5", + registryURL + "|foo/bar|layer6", + registryURL + "|foo/bar|layer7", + registryURL + "|foo/bar|layer8", + }, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000004", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000005", + registryURL + "|layer7", + registryURL + "|layer8", + registryURL + "|layer9", + registryURL + "|layerX", + }, + expectedFailures: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000004|err", "sha256:0000000000000000000000000000000000000000000000000000000000000005|err"}, + }, + { name: "layers shared with young images are not pruned", images: testutil.ImageList( @@ -747,6 +968,7 @@ func TestImagePruning(t *testing.T) { testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000002", registryHost+"/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002", 5), ), expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000001"}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000001"}, }, { @@ -769,8 +991,10 @@ func TestImagePruning(t *testing.T) { limits: map[string][]*kapi.LimitRange{ "foo": testutil.LimitList(100, 200), }, - expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000003"}, - expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000003"}, + expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000003"}, + expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000003"}, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000003"}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000003"}, }, { @@ -802,6 +1026,14 @@ func TestImagePruning(t *testing.T) { }, expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000002", "sha256:0000000000000000000000000000000000000000000000000000000000000004"}, expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000002", "bar/foo|sha256:0000000000000000000000000000000000000000000000000000000000000004"}, + expectedManifestLinkDeletions: []string{ + registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000002", + registryURL + "|bar/foo|sha256:0000000000000000000000000000000000000000000000000000000000000004", + }, + expectedBlobDeletions: []string{ + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000002", + registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000004", + }, }, { @@ -876,8 +1108,10 @@ func TestImagePruning(t *testing.T) { limits: map[string][]*kapi.LimitRange{ "foo": testutil.LimitList(100, 200), }, - expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000003"}, - expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000003"}, + expectedImageDeletions: []string{"sha256:0000000000000000000000000000000000000000000000000000000000000003"}, + expectedManifestLinkDeletions: []string{registryURL + "|foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000003"}, + expectedBlobDeletions: []string{registryURL + "|sha256:0000000000000000000000000000000000000000000000000000000000000003"}, + expectedStreamUpdates: []string{"foo/bar|sha256:0000000000000000000000000000000000000000000000000000000000000003"}, }, { @@ -908,20 +1142,23 @@ func TestImagePruning(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { options := PrunerOptions{ - Namespace: test.namespace, - AllImages: test.allImages, - Images: &test.images, - Streams: &test.streams, - Pods: &test.pods, - RCs: &test.rcs, - BCs: &test.bcs, - Builds: &test.builds, - DSs: &test.dss, - Deployments: &test.deployments, - DCs: &test.dcs, - RSs: &test.rss, - LimitRanges: test.limits, - RegistryURL: &url.URL{Scheme: "https", Host: registryHost}, + Namespace: test.namespace, + AllImages: test.allImages, + Images: &test.images, + ImageWatcher: watch.NewFake(), + Streams: &test.streams, + StreamWatcher: watch.NewFake(), + Pods: &test.pods, + RCs: &test.rcs, + BCs: &test.bcs, + Builds: &test.builds, + DSs: &test.dss, + Deployments: &test.deployments, + DCs: &test.dcs, + RSs: &test.rss, + LimitRanges: test.limits, + RegistryClientFactory: FakeRegistryClientFactory, + RegistryURL: &url.URL{Scheme: "https", Host: registryHost}, } if test.pruneOverSizeLimit != nil { options.PruneOverSizeLimit = test.pruneOverSizeLimit @@ -955,14 +1192,33 @@ func TestImagePruning(t *testing.T) { return } - imageDeleter := &fakeImageDeleter{invocations: sets.NewString()} - streamDeleter := &fakeImageStreamDeleter{invocations: sets.NewString()} - layerLinkDeleter := &fakeLayerLinkDeleter{invocations: sets.NewString()} - blobDeleter := &fakeBlobDeleter{invocations: sets.NewString()} - manifestDeleter := &fakeManifestDeleter{invocations: sets.NewString()} - - if err := p.Prune(imageDeleter, streamDeleter, layerLinkDeleter, blobDeleter, manifestDeleter); err != nil { - t.Fatalf("unexpected error: %v", err) + imageDeleter, imageDeleterFactory := newFakeImageDeleter(test.imageDeleterErr) + streamDeleter := &fakeImageStreamDeleter{err: test.imageStreamDeleterErr, invocations: sets.NewString()} + layerLinkDeleter := &fakeLayerLinkDeleter{err: test.layerDeleterErr, invocations: sets.NewString()} + blobDeleter := &fakeBlobDeleter{err: test.blobDeleterErr, invocations: sets.NewString()} + manifestDeleter := &fakeManifestDeleter{err: test.manifestDeleterErr, invocations: sets.NewString()} + + deletions, failures := p.Prune(imageDeleterFactory, streamDeleter, layerLinkDeleter, blobDeleter, manifestDeleter) + + expectedFailures := sets.NewString(test.expectedFailures...) + renderedFailures := sets.NewString() + for _, f := range failures { + rendered := renderFailure(registryURL, &f) + if renderedFailures.Has(rendered) { + t.Errorf("got the following failure more than once: %v", rendered) + continue + } + renderedFailures.Insert(rendered) + } + for f := range renderedFailures { + if expectedFailures.Has(f) { + expectedFailures.Delete(f) + continue + } + t.Errorf("got unexpected failure: %v", f) + } + for f := range expectedFailures { + t.Errorf("the following expected failure was not returned: %v", f) } expectedImageDeletions := sets.NewString(test.expectedImageDeletions...) @@ -980,14 +1236,81 @@ func TestImagePruning(t *testing.T) { t.Errorf("unexpected layer link deletions: %s", diff.ObjectDiff(a, e)) } + expectedManifestLinkDeletions := sets.NewString(test.expectedManifestLinkDeletions...) + if a, e := manifestDeleter.invocations, expectedManifestLinkDeletions; !reflect.DeepEqual(a, e) { + t.Errorf("unexpected manifest link deletions: %s", diff.ObjectDiff(a, e)) + } + expectedBlobDeletions := sets.NewString(test.expectedBlobDeletions...) if a, e := blobDeleter.invocations, expectedBlobDeletions; !reflect.DeepEqual(a, e) { t.Errorf("unexpected blob deletions: %s", diff.ObjectDiff(a, e)) } + + // TODO: shall we return deletion for each layer link unlinked from the image stream?? + imageStreamUpdates := sets.NewString() + expectedAllDeletions := sets.NewString() + for _, s := range []sets.String{expectedImageDeletions, expectedLayerLinkDeletions, expectedBlobDeletions} { + expectedAllDeletions.Insert(s.List()...) + } + for _, d := range deletions { + rendered, isImageStreamUpdate, isManifestLinkDeletion := renderDeletion(registryURL, &d) + if isManifestLinkDeletion { + continue + } + if isImageStreamUpdate { + imageStreamUpdates.Insert(rendered) + continue + } + if expectedAllDeletions.Has(rendered) { + expectedAllDeletions.Delete(rendered) + } else { + t.Errorf("got unexpected deletion: %#+v (rendered: %q)", d, rendered) + } + } + for _, f := range failures { + rendered, _, _ := renderDeletion(registryURL, &Deletion{Node: f.Node, Parent: f.Parent}) + expectedAllDeletions.Delete(rendered) + } + for del, ok := expectedAllDeletions.PopAny(); ok; del, ok = expectedAllDeletions.PopAny() { + t.Errorf("expected deletion %q did not happen", del) + } + + expectedStreamUpdateNames := sets.NewString() + for u := range expectedStreamUpdates { + expectedStreamUpdateNames.Insert(regexp.MustCompile(`[@|:]`).Split(u, 2)[0]) + } + if a, e := imageStreamUpdates, expectedStreamUpdateNames; !reflect.DeepEqual(a, e) { + t.Errorf("unexpected image stream updates in deletions: %s", diff.ObjectDiff(a, e)) + } }) } } +func renderDeletion(registryURL string, deletion *Deletion) (rendered string, isImageStreamUpdate, isManifestLinkDeletion bool) { + switch t := deletion.Node.(type) { + case *imagegraph.ImageNode: + return t.Image.Name, false, false + case *imagegraph.ImageComponentNode: + // deleting blob + if deletion.Parent == nil { + return fmt.Sprintf("%s|%s", registryURL, t.Component), false, false + } + streamName := "unknown" + if sn, ok := deletion.Parent.(*imagegraph.ImageStreamNode); ok { + streamName = getName(sn.ImageStream) + } + return fmt.Sprintf("%s|%s|%s", registryURL, streamName, t.Component), false, t.Type == imagegraph.ImageComponentTypeManifest + case *imagegraph.ImageStreamNode: + return getName(t.ImageStream), true, false + } + return "unknown", false, false +} + +func renderFailure(registryURL string, failure *Failure) string { + rendered, _, _ := renderDeletion(registryURL, &Deletion{Node: failure.Node, Parent: failure.Parent}) + return rendered + "|" + failure.Err.Error() +} + func TestImageDeleter(t *testing.T) { flag.Lookup("v").Value.Set(fmt.Sprint(*logLevel)) @@ -1001,7 +1324,7 @@ func TestImageDeleter(t *testing.T) { } for name, test := range tests { - imageClient := imageclient.Clientset{} + imageClient := fakeimageclient.Clientset{} imageClient.AddReactor("delete", "images", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { return true, nil, test.imageDeletionError }) @@ -1096,6 +1419,7 @@ func TestRegistryPruning(t *testing.T) { "https://registry1.io|foo/bar|layer2", ), expectedBlobDeletions: sets.NewString( + "https://registry1.io|sha256:0000000000000000000000000000000000000000000000000000000000000001", "https://registry1.io|"+testutil.Config1, "https://registry1.io|layer1", "https://registry1.io|layer2", @@ -1132,6 +1456,8 @@ func TestRegistryPruning(t *testing.T) { ), expectedLayerLinkDeletions: sets.NewString(), expectedBlobDeletions: sets.NewString( + "https://registry1.io|sha256:0000000000000000000000000000000000000000000000000000000000000001", + "https://registry1.io|sha256:0000000000000000000000000000000000000000000000000000000000000002", "https://registry1.io|"+testutil.Config1, "https://registry1.io|"+testutil.Config2, "https://registry1.io|layer1", @@ -1169,9 +1495,9 @@ func TestRegistryPruning(t *testing.T) { expectedLayerLinkDeletions: sets.NewString( "https://registry1.io|foo/bar|layer1", "https://registry1.io|foo/bar|layer2", - // TODO: ideally, pruner should remove layers of id2 from foo/bar as well ), expectedBlobDeletions: sets.NewString( + "https://registry1.io|sha256:0000000000000000000000000000000000000000000000000000000000000001", "https://registry1.io|layer1", "https://registry1.io|layer2", ), @@ -1217,7 +1543,9 @@ func TestRegistryPruning(t *testing.T) { KeepTagRevisions: &keepTagRevisions, PruneRegistry: &test.pruneRegistry, Images: &test.images, + ImageWatcher: watch.NewFake(), Streams: &test.streams, + StreamWatcher: watch.NewFake(), Pods: &kapi.PodList{}, RCs: &kapi.ReplicationControllerList{}, BCs: &buildapi.BuildConfigList{}, @@ -1226,20 +1554,21 @@ func TestRegistryPruning(t *testing.T) { Deployments: &kapisext.DeploymentList{}, DCs: &appsapi.DeploymentConfigList{}, RSs: &kapisext.ReplicaSetList{}, - RegistryURL: &url.URL{Scheme: "https", Host: "registry1.io"}, + RegistryClientFactory: FakeRegistryClientFactory, + RegistryURL: &url.URL{Scheme: "https", Host: "registry1.io"}, } p, err := NewPruner(options) if err != nil { t.Fatalf("unexpected error: %v", err) } - imageDeleter := &fakeImageDeleter{invocations: sets.NewString()} + _, imageDeleterFactory := newFakeImageDeleter(nil) streamDeleter := &fakeImageStreamDeleter{invocations: sets.NewString()} layerLinkDeleter := &fakeLayerLinkDeleter{invocations: sets.NewString()} blobDeleter := &fakeBlobDeleter{invocations: sets.NewString()} manifestDeleter := &fakeManifestDeleter{invocations: sets.NewString()} - p.Prune(imageDeleter, streamDeleter, layerLinkDeleter, blobDeleter, manifestDeleter) + p.Prune(imageDeleterFactory, streamDeleter, layerLinkDeleter, blobDeleter, manifestDeleter) if a, e := layerLinkDeleter.invocations, test.expectedLayerLinkDeletions; !reflect.DeepEqual(a, e) { t.Errorf("unexpected layer link deletions: %s", diff.ObjectDiff(a, e)) @@ -1290,16 +1619,18 @@ func TestImageWithStrongAndWeakRefsIsNotPruned(t *testing.T) { rss := testutil.RSList() options := PrunerOptions{ - Images: &images, - Streams: &streams, - Pods: &pods, - RCs: &rcs, - BCs: &bcs, - Builds: &builds, - DSs: &dss, - Deployments: &deployments, - DCs: &dcs, - RSs: &rss, + Images: &images, + ImageWatcher: watch.NewFake(), + Streams: &streams, + StreamWatcher: watch.NewFake(), + Pods: &pods, + RCs: &rcs, + BCs: &bcs, + Builds: &builds, + DSs: &dss, + Deployments: &deployments, + DCs: &dcs, + RSs: &rss, } keepYoungerThan := 24 * time.Hour keepTagRevisions := 2 @@ -1310,14 +1641,19 @@ func TestImageWithStrongAndWeakRefsIsNotPruned(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - imageDeleter := &fakeImageDeleter{invocations: sets.NewString()} + imageDeleter, imageDeleterFactory := newFakeImageDeleter(nil) streamDeleter := &fakeImageStreamDeleter{invocations: sets.NewString()} layerLinkDeleter := &fakeLayerLinkDeleter{invocations: sets.NewString()} blobDeleter := &fakeBlobDeleter{invocations: sets.NewString()} manifestDeleter := &fakeManifestDeleter{invocations: sets.NewString()} - if err := p.Prune(imageDeleter, streamDeleter, layerLinkDeleter, blobDeleter, manifestDeleter); err != nil { - t.Fatalf("unexpected error: %v", err) + deletions, failures := p.Prune(imageDeleterFactory, streamDeleter, layerLinkDeleter, blobDeleter, manifestDeleter) + if len(failures) != 0 { + t.Errorf("got unexpected failures: %#+v", failures) + } + + if len(deletions) > 0 { + t.Fatalf("got unexpected deletions: %#+v", deletions) } if imageDeleter.invocations.Len() > 0 { @@ -1349,11 +1685,358 @@ func TestImageIsPrunable(t *testing.T) { } } +func TestPrunerGetNextJob(t *testing.T) { + flag.Lookup("v").Value.Set(fmt.Sprint(*logLevel)) + + glog.V(2).Infof("debug") + algo := pruneAlgorithm{ + keepYoungerThan: time.Now(), + } + p := &pruner{algorithm: algo, processedImages: make(map[*imagegraph.ImageNode]*Job)} + images := testutil.ImageList( + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000003", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003", 1, "layer1"), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000002", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002", 2, "layer1", "layer2"), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000001", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000001", 3, "Layer1", "Layer2", "Layer3"), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000013", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000013", 4, "Layer1", "LayeR2", "LayeR3"), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000012", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000012", 5, "LayeR1", "LayeR2"), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000011", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000011", 6, "layer1", "Layer2", "LAYER3", "LAYER4"), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000010", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000010", 7, "layer1", "layer2", "layer3", "layer4"), + ) + p.g = genericgraph.New() + err := p.addImagesToGraph(&images) + if err != nil { + t.Fatalf("failed to add images: %v", err) + } + + is := images.Items + imageStreams := testutil.StreamList( + testutil.Stream("example.com", "foo", "bar", testutil.Tags( + testutil.Tag("latest", + testutil.TagEvent(is[3].Name, is[3].DockerImageReference), + testutil.TagEvent(is[4].Name, is[4].DockerImageReference), + testutil.TagEvent(is[5].Name, is[5].DockerImageReference)))), + testutil.Stream("example.com", "foo", "baz", testutil.Tags( + testutil.Tag("devel", + testutil.TagEvent(is[3].Name, is[3].DockerImageReference), + testutil.TagEvent(is[2].Name, is[2].DockerImageReference), + testutil.TagEvent(is[1].Name, is[1].DockerImageReference)), + testutil.Tag("prod", + testutil.TagEvent(is[2].Name, is[2].DockerImageReference))))) + if err := p.addImageStreamsToGraph(&imageStreams, nil); err != nil { + t.Fatalf("failed to add image streams: %v", err) + } + + imageNodes := getImageNodes(p.g.Nodes()) + if len(imageNodes) == 0 { + t.Fatalf("not images nodes") + } + prunable := calculatePrunableImages(p.g, imageNodes, algo) + sort.Sort(byLayerCountAndAge(prunable)) + p.queue = makeQueue(prunable) + + checkQueue := func(desc string, expected ...*imageapi.Image) { + for i, item := 0, p.queue; i < len(expected) || item != nil; i++ { + if i >= len(expected) { + t.Errorf("[%s] unexpected image at #%d: %s", desc, i, item.node.Image.Name) + } else if item == nil { + t.Errorf("[%s] expected image %q not found at #%d", desc, expected[i].Name, i) + } else if item.node.Image.Name != expected[i].Name { + t.Errorf("[%s] unexpected image at #%d: %s != %s", desc, i, item.node.Image.Name, expected[i].Name) + } + if item != nil { + item = item.next + } + } + if t.Failed() { + t.FailNow() + } + } + + /* layerrefs: layer1:4, Layer1:2, LayeR1:1, layer2:2, Layer2:2, LayeR2:2, + * layer3:1, Layer3:1, LayeR3:1, LAYER3:1, layer4:1, LAYER4:1 */ + checkQueue("initial state", &is[6], &is[5], &is[3], &is[2], &is[4], &is[1], &is[0]) + job := expectBlockedOrJob(t, p, "pop first", false, &is[6], []string{"layer4", "layer3"})(p.getNextJob()) + p.processedImages[job.Image] = job + imgnd6 := job.Image + + /* layerrefs: layer1:3, Layer1:2, LayeR1:1, layer2:1, Layer2:2, LayeR2:2, + * layer3:0, Layer3:1, LayeR3:1, LAYER3:1, layer4:0, LAYER4:1 */ + checkQueue("1 removed", &is[5], &is[3], &is[2], &is[4], &is[1], &is[0]) + job = expectBlockedOrJob(t, p, "pop second", false, &is[5], []string{"LAYER3", "LAYER4"})(p.getNextJob()) + p.processedImages[job.Image] = job + imgnd5 := job.Image + + /* layerrefs: layer1:2, Layer1:2, LayeR1:1, layer2:1, Layer2:1, LayeR2:2, + * Layer3:1, LayeR3:1, LAYER3:0, LAYER4:0 */ + checkQueue("2 removed", &is[3], &is[2], &is[4], &is[1], &is[0]) + job = expectBlockedOrJob(t, p, "pop third", false, &is[3], []string{"LayeR3"})(p.getNextJob()) + p.processedImages[job.Image] = job + imgnd3 := job.Image + + // layerrefs: layer1:2, Layer1:1, LayeR1:1, layer2:1, Layer2:1, LayeR2:1, Layer3:1, LayeR3:0 + checkQueue("3 removed", &is[2], &is[4], &is[1], &is[0]) + // all the remaining images are blocked now except for the is[0] + job = expectBlockedOrJob(t, p, "pop fourth", false, &is[0], nil)(p.getNextJob()) + p.processedImages[job.Image] = job + imgnd0 := job.Image + + // layerrefs: layer1:1, Layer1:1, LayeR1:1, layer2:1, Layer2:1, LayeR2:1, Layer3:1 + checkQueue("4 removed and blocked", &is[2], &is[4], &is[1]) + // all the remaining images are blocked now + expectBlockedOrJob(t, p, "blocked", true, nil, nil)(p.getNextJob()) + + // layerrefs: layer1:1, Layer1:2, LayeR1:1, layer2:1, Layer2:1, LayeR2:1, Layer3:1 + checkQueue("3 to go", &is[2], &is[4], &is[1]) + // unblock one of the images + p.g.RemoveNode(imgnd3) + job = expectBlockedOrJob(t, p, "pop fifth", false, &is[4], + []string{"LayeR1", "LayeR2"})(p.getNextJob()) + p.processedImages[job.Image] = job + imgnd4 := job.Image + + // layerrefs: layer1:1, Layer1:2, LayeR1:0, layer2:1, Layer2:1, LayeR2:0, Layer3:1 + checkQueue("2 to go", &is[2], &is[1]) + expectBlockedOrJob(t, p, "blocked with two items#1", true, nil, nil)(p.getNextJob()) + checkQueue("still 2 to go", &is[2], &is[1]) + + p.g.RemoveNode(imgnd0) + delete(p.processedImages, imgnd0) + expectBlockedOrJob(t, p, "blocked with two items#2", true, nil, nil)(p.getNextJob()) + p.g.RemoveNode(imgnd6) + delete(p.processedImages, imgnd6) + expectBlockedOrJob(t, p, "blocked with two items#3", true, nil, nil)(p.getNextJob()) + p.g.RemoveNode(imgnd4) + delete(p.processedImages, imgnd4) + expectBlockedOrJob(t, p, "blocked with two items#4", true, nil, nil)(p.getNextJob()) + p.g.RemoveNode(imgnd5) + delete(p.processedImages, imgnd5) + + job = expectBlockedOrJob(t, p, "pop sixth", false, &is[2], + []string{"Layer1", "Layer2", "Layer3"})(p.getNextJob()) + p.processedImages[job.Image] = job + + // layerrefs: layer1:1, Layer1:0, layer2:1, Layer2:0, Layer3:0 + checkQueue("1 to go", &is[1]) + job = expectBlockedOrJob(t, p, "pop last", false, &is[1], + []string{"layer1", "layer2"})(p.getNextJob()) + p.processedImages[job.Image] = job + + // layerrefs: layer1:0, layer2:0 + checkQueue("queue empty") + expectBlockedOrJob(t, p, "empty", false, nil, nil)(p.getNextJob()) +} + +func expectBlockedOrJob( + t *testing.T, + p *pruner, + desc string, + blocked bool, + image *imageapi.Image, + layers []string, +) func(job *Job, blocked bool) *Job { + return func(job *Job, b bool) *Job { + if b != blocked { + t.Fatalf("[%s] unexpected blocked: %t != %t", desc, b, blocked) + } + + if blocked { + return job + } + + if image == nil && job != nil { + t.Fatalf("[%s] got unexpected job %#+v", desc, job) + } + if image != nil && job == nil { + t.Fatalf("[%s] got nil instead of job", desc) + } + if job == nil { + return nil + } + + if a, e := job.Image.Image.Name, image.Name; a != e { + t.Errorf("[%s] unexpected image in job: %s != %s", desc, a, e) + } + + expLayers := sets.NewString(imagegraph.EnsureImageComponentManifestNode( + p.g, job.Image.Image.Name).(*imagegraph.ImageComponentNode).String()) + for _, l := range layers { + expLayers.Insert(imagegraph.EnsureImageComponentLayerNode( + p.g, l).(*imagegraph.ImageComponentNode).String()) + } + actLayers := sets.NewString() + for c, ret := range job.Components { + if ret.PrunableGlobally { + actLayers.Insert(c.String()) + } + } + if a, e := actLayers, expLayers; !reflect.DeepEqual(a, e) { + t.Errorf("[%s] unexpected image components: %s", desc, diff.ObjectDiff(a.List(), e.List())) + } + + if t.Failed() { + t.FailNow() + } + + return job + } +} + +func TestChangeImageStreamsWhilePruning(t *testing.T) { + flag.Lookup("v").Value.Set(fmt.Sprint(*logLevel)) + + images := testutil.ImageList( + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000001", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000001", 5), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000002", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000002", 4), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000003", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003", 3), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000004", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003", 2), + testutil.AgedImage("sha256:0000000000000000000000000000000000000000000000000000000000000005", "registry1.io/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000003", 1), + ) + + streams := testutil.StreamList(testutil.Stream("registry1", "foo", "bar", testutil.Tags())) + streamWatcher := watch.NewFake() + pods := testutil.PodList() + rcs := testutil.RCList() + bcs := testutil.BCList() + builds := testutil.BuildList() + dss := testutil.DSList() + deployments := testutil.DeploymentList() + dcs := testutil.DCList() + rss := testutil.RSList() + + options := PrunerOptions{ + Images: &images, + ImageWatcher: watch.NewFake(), + Streams: &streams, + StreamWatcher: streamWatcher, + Pods: &pods, + RCs: &rcs, + BCs: &bcs, + Builds: &builds, + DSs: &dss, + Deployments: &deployments, + DCs: &dcs, + RSs: &rss, + RegistryClientFactory: FakeRegistryClientFactory, + RegistryURL: &url.URL{Scheme: "https", Host: "registry1.io"}, + NumWorkers: 1, + } + keepYoungerThan := 30 * time.Second + keepTagRevisions := 2 + options.KeepYoungerThan = &keepYoungerThan + options.KeepTagRevisions = &keepTagRevisions + p, err := NewPruner(options) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + pruneFinished := make(chan struct{}) + deletions, failures := []Deletion{}, []Failure{} + imageDeleter, imageDeleterFactory := newBlockingImageDeleter(t) + + // run the pruning loop in a go routine + go func() { + deletions, failures = p.Prune( + imageDeleterFactory, + &fakeImageStreamDeleter{invocations: sets.NewString()}, + &fakeLayerLinkDeleter{invocations: sets.NewString()}, + &fakeBlobDeleter{invocations: sets.NewString()}, + &fakeManifestDeleter{invocations: sets.NewString()}, + ) + if len(failures) != 0 { + t.Errorf("got unexpected failures: %#+v", failures) + } + close(pruneFinished) + }() + + expectedImageDeletions := sets.NewString() + expectedBlobDeletions := sets.NewString() + + img := imageDeleter.waitForRequest() + if a, e := img.Name, images.Items[0].Name; a != e { + t.Fatalf("got unexpected image deletion request: %s != %s", a, e) + } + expectedImageDeletions.Insert(images.Items[0].Name) + expectedBlobDeletions.Insert("registry1|" + images.Items[0].Name) + + // let the pruner wait for reply and meanwhile reference an image with a new image stream + stream := testutil.Stream("registry1", "foo", "new", testutil.Tags( + testutil.Tag("latest", + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000002", "registry1/foo/new@sha256:0000000000000000000000000000000000000000000000000000000000000002"), + ))) + streamWatcher.Add(&stream) + imageDeleter.unblock() + + // the pruner shall skip the newly referenced image + img = imageDeleter.waitForRequest() + if a, e := img.Name, images.Items[2].Name; a != e { + t.Fatalf("got unexpected image deletion request: %s != %s", a, e) + } + expectedImageDeletions.Insert(images.Items[2].Name) + expectedBlobDeletions.Insert("registry1|" + images.Items[2].Name) + + // now lets modify the existing image stream to reference some more images + stream = testutil.Stream("registry1", "foo", "bar", testutil.Tags( + testutil.Tag("latest", + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000000", "registry1/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000"), + testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000004", "registry1/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), + ))) + streamWatcher.Modify(&stream) + imageDeleter.unblock() + + // the pruner shall skip the newly referenced image + img = imageDeleter.waitForRequest() + if a, e := img.Name, images.Items[4].Name; a != e { + t.Fatalf("got unexpected image deletion request: %s != %s", a, e) + } + expectedImageDeletions.Insert(images.Items[4].Name) + expectedBlobDeletions.Insert("registry1|" + images.Items[4].Name) + imageDeleter.unblock() + + // no more images - wait for the pruner to finish + select { + case <-pruneFinished: + case <-time.After(time.Second): + t.Errorf("tester: timeout while waiting for pruner to finish") + } + + if a, e := imageDeleter.d.invocations, expectedImageDeletions; !reflect.DeepEqual(a, e) { + t.Errorf("unexpected image deletions: %s", diff.ObjectDiff(a, e)) + } + + expectedAllDeletions := sets.NewString( + append(expectedImageDeletions.List(), expectedBlobDeletions.List()...)...) + for _, d := range deletions { + rendered, _, isManifestLinkDeletion := renderDeletion("registry1", &d) + if isManifestLinkDeletion { + // TODO: update tests to count and verify the number of manifest link deletions + continue + } + if expectedAllDeletions.Has(rendered) { + expectedAllDeletions.Delete(rendered) + } else { + t.Errorf("got unexpected deletion: %#+v (rendered: %q)", d, rendered) + } + } + for del, ok := expectedAllDeletions.PopAny(); ok; del, ok = expectedAllDeletions.PopAny() { + t.Errorf("expected deletion %q did not happen", del) + } +} + +func streamListToClient(list *imageapi.ImageStreamList) imageclient.ImageStreamsGetter { + streams := make([]runtime.Object, 0, len(list.Items)) + for i := range list.Items { + streams = append(streams, &list.Items[i]) + } + + return fakeimageclient.NewSimpleClientset(streams...).Image() +} + func keepTagRevisions(n int) *int { return &n } type fakeImageDeleter struct { + mutex sync.Mutex invocations sets.String err error } @@ -1361,11 +2044,68 @@ type fakeImageDeleter struct { var _ ImageDeleter = &fakeImageDeleter{} func (p *fakeImageDeleter) DeleteImage(image *imageapi.Image) error { + p.mutex.Lock() + defer p.mutex.Unlock() p.invocations.Insert(image.Name) return p.err } +func newFakeImageDeleter(err error) (*fakeImageDeleter, ImagePrunerFactoryFunc) { + deleter := &fakeImageDeleter{ + err: err, + invocations: sets.NewString(), + } + return deleter, func() (ImageDeleter, error) { + return deleter, nil + } +} + +type blockingImageDeleter struct { + t *testing.T + d *fakeImageDeleter + requests chan *imageapi.Image + reply chan struct{} +} + +func (bid *blockingImageDeleter) DeleteImage(img *imageapi.Image) error { + bid.requests <- img + select { + case <-bid.reply: + case <-time.After(time.Second): + bid.t.Fatalf("worker: timeout while waiting for image deletion confirmation") + } + return bid.d.DeleteImage(img) +} + +func (bid *blockingImageDeleter) waitForRequest() *imageapi.Image { + select { + case img := <-bid.requests: + return img + case <-time.After(time.Second): + bid.t.Fatalf("tester: timeout while waiting on worker's request") + return nil + } +} + +func (bid *blockingImageDeleter) unblock() { + bid.reply <- struct{}{} +} + +func newBlockingImageDeleter(t *testing.T) (*blockingImageDeleter, ImagePrunerFactoryFunc) { + deleter, _ := newFakeImageDeleter(nil) + blocking := blockingImageDeleter{ + t: t, + d: deleter, + requests: make(chan *imageapi.Image), + reply: make(chan struct{}), + } + return &blocking, func() (ImageDeleter, error) { + return &blocking, nil + } +} + type fakeImageStreamDeleter struct { + mutex sync.Mutex invocations sets.String err error streamImages map[string][]string @@ -1375,6 +2115,8 @@ type fakeImageStreamDeleter struct { var _ ImageStreamDeleter = &fakeImageStreamDeleter{} func (p *fakeImageStreamDeleter) GetImageStream(stream *imageapi.ImageStream) (*imageapi.ImageStream, error) { + p.mutex.Lock() + defer p.mutex.Unlock() if p.streamImages == nil { p.streamImages = make(map[string][]string) } @@ -1425,6 +2167,7 @@ func (p *fakeImageStreamDeleter) NotifyImageStreamPrune(stream *imageapi.ImageSt } type fakeBlobDeleter struct { + mutex sync.Mutex invocations sets.String err error } @@ -1432,11 +2175,14 @@ type fakeBlobDeleter struct { var _ BlobDeleter = &fakeBlobDeleter{} func (p *fakeBlobDeleter) DeleteBlob(registryClient *http.Client, registryURL *url.URL, blob string) error { + p.mutex.Lock() + defer p.mutex.Unlock() p.invocations.Insert(fmt.Sprintf("%s|%s", registryURL.String(), blob)) return p.err } type fakeLayerLinkDeleter struct { + mutex sync.Mutex invocations sets.String err error } @@ -1444,11 +2190,14 @@ type fakeLayerLinkDeleter struct { var _ LayerLinkDeleter = &fakeLayerLinkDeleter{} func (p *fakeLayerLinkDeleter) DeleteLayerLink(registryClient *http.Client, registryURL *url.URL, repo, layer string) error { + p.mutex.Lock() + defer p.mutex.Unlock() p.invocations.Insert(fmt.Sprintf("%s|%s|%s", registryURL.String(), repo, layer)) return p.err } type fakeManifestDeleter struct { + mutex sync.Mutex invocations sets.String err error } @@ -1456,6 +2205,8 @@ type fakeManifestDeleter struct { var _ ManifestDeleter = &fakeManifestDeleter{} func (p *fakeManifestDeleter) DeleteManifest(registryClient *http.Client, registryURL *url.URL, repo, manifest string) error { + p.mutex.Lock() + defer p.mutex.Unlock() p.invocations.Insert(fmt.Sprintf("%s|%s|%s", registryURL.String(), repo, manifest)) return p.err } diff --git a/pkg/oc/admin/prune/imageprune/testutil/util.go b/pkg/oc/admin/prune/imageprune/testutil/util.go index 6590cc22c2ca..ae263b477aa4 100644 --- a/pkg/oc/admin/prune/imageprune/testutil/util.go +++ b/pkg/oc/admin/prune/imageprune/testutil/util.go @@ -38,13 +38,16 @@ func ImageList(images ...imageapi.Image) imageapi.ImageList { } // AgedImage creates a test image with specified age. -func AgedImage(id, ref string, ageInMinutes int64) imageapi.Image { - return CreatedImage(id, ref, time.Now().Add(time.Duration(ageInMinutes)*time.Minute*-1)) +func AgedImage(id, ref string, ageInMinutes int64, layers ...string) imageapi.Image { + return CreatedImage(id, ref, time.Now().Add(time.Duration(ageInMinutes)*time.Minute*-1), layers...) } // CreatedImage creates a test image with the CreationTime set to the given timestamp. -func CreatedImage(id, ref string, created time.Time) imageapi.Image { - image := ImageWithLayers(id, ref, nil, Layer1, Layer2, Layer3, Layer4, Layer5) +func CreatedImage(id, ref string, created time.Time, layers ...string) imageapi.Image { + if len(layers) == 0 { + layers = []string{Layer1, Layer2, Layer3, Layer4, Layer5} + } + image := ImageWithLayers(id, ref, nil, layers...) image.CreationTimestamp = metav1.NewTime(created) return image } diff --git a/pkg/oc/admin/prune/imageprune/worker.go b/pkg/oc/admin/prune/imageprune/worker.go new file mode 100644 index 000000000000..319adf861e1e --- /dev/null +++ b/pkg/oc/admin/prune/imageprune/worker.go @@ -0,0 +1,359 @@ +package imageprune + +import ( + "fmt" + "net/http" + "net/url" + + "github.com/golang/glog" + gonum "github.com/gonum/graph" + + kerrapi "k8s.io/apimachinery/pkg/api/errors" + + imagegraph "github.com/openshift/origin/pkg/oc/graph/imagegraph/nodes" +) + +// ComponentRetention knows all the places where image component needs to be pruned (e.g. global blob store +// and repositories). +type ComponentRetention struct { + ReferencingStreams map[*imagegraph.ImageStreamNode]bool + PrunableGlobally bool +} + +// ComponentRetentions contains prunable locations for all the components of an image. +type ComponentRetentions map[*imagegraph.ImageComponentNode]*ComponentRetention + +func (cr ComponentRetentions) add(comp *imagegraph.ImageComponentNode) *ComponentRetention { + if _, ok := cr[comp]; ok { + return cr[comp] + } + cr[comp] = &ComponentRetention{ + ReferencingStreams: make(map[*imagegraph.ImageStreamNode]bool), + } + return cr[comp] +} + +// Add adds component marked as (not) prunable in the blob store. +func (cr ComponentRetentions) Add( + comp *imagegraph.ImageComponentNode, + globallyPrunable bool, +) *ComponentRetention { + r := cr.add(comp) + r.PrunableGlobally = globallyPrunable + return r +} + +// AddReferencingStreams adds a repository location as (not) prunable to the given component. +func (cr ComponentRetentions) AddReferencingStreams( + comp *imagegraph.ImageComponentNode, + prunable bool, + streams ...*imagegraph.ImageStreamNode, +) *ComponentRetention { + r := cr.add(comp) + for _, n := range streams { + r.ReferencingStreams[n] = prunable + } + return r +} + +// Job is an image pruning job for the Worker. It contains information about single image and related +// components. +type Job struct { + Image *imagegraph.ImageNode + Components ComponentRetentions +} + +func enumerateImageComponents( + crs ComponentRetentions, + compType *imagegraph.ImageComponentType, + withPreserved bool, + handler func(comp *imagegraph.ImageComponentNode, prunable bool), +) { + for c, retention := range crs { + if !withPreserved && !retention.PrunableGlobally { + continue + } + if compType != nil && c.Type != *compType { + continue + } + + handler(c, retention.PrunableGlobally) + } +} + +func enumerateImageStreamComponents( + crs ComponentRetentions, + compType *imagegraph.ImageComponentType, + withPreserved bool, + handler func(comp *imagegraph.ImageComponentNode, stream *imagegraph.ImageStreamNode, prunable bool), +) { + for c, cr := range crs { + if compType != nil && c.Type != *compType { + continue + } + + for s, prunable := range cr.ReferencingStreams { + if withPreserved || prunable { + handler(c, s, prunable) + } + } + } +} + +// Deletion denotes a single deletion of a resource as a result of processing a job. If Parent is nil, the +// deletion occured in the global blob store. Otherwise the parent identities repository location. +type Deletion struct { + Node gonum.Node + Parent gonum.Node +} + +// Failure denotes a pruning failure of a single object. +type Failure struct { + Node gonum.Node + Parent gonum.Node + Err error +} + +var _ error = &Failure{} + +func (pf *Failure) Error() string { return pf.String() } + +func (pf *Failure) String() string { + if pf.Node == nil { + return fmt.Sprintf("failed to prune blob: %v", pf.Err) + } + + switch t := pf.Node.(type) { + case *imagegraph.ImageStreamNode: + return fmt.Sprintf("failed to update ImageStream %s: %v", getName(t.ImageStream), pf.Err) + case *imagegraph.ImageNode: + return fmt.Sprintf("failed to delete Image %s: %v", t.Image.DockerImageReference, pf.Err) + case *imagegraph.ImageComponentNode: + detail := "" + if isn, ok := pf.Parent.(*imagegraph.ImageStreamNode); ok { + detail = " in repository " + getName(isn.ImageStream) + } + switch t.Type { + case imagegraph.ImageComponentTypeConfig: + return fmt.Sprintf("failed to delete image config link %s%s: %v", t.Component, detail, pf.Err) + case imagegraph.ImageComponentTypeLayer: + return fmt.Sprintf("failed to delete image layer link %s%s: %v", t.Component, detail, pf.Err) + case imagegraph.ImageComponentTypeManifest: + return fmt.Sprintf("failed to delete image manifest link %s%s: %v", t.Component, detail, pf.Err) + default: + return fmt.Sprintf("failed to delete %s%s: %v", t.String(), detail, pf.Err) + } + default: + return fmt.Sprintf("failed to delete %v: %v", t, pf.Err) + } +} + +// JobResult is a result of job's processing. +type JobResult struct { + Job *Job + Deletions []Deletion + Failures []Failure +} + +func (jr *JobResult) update(deletions []Deletion, failures []Failure) *JobResult { + jr.Deletions = append(jr.Deletions, deletions...) + jr.Failures = append(jr.Failures, failures...) + return jr +} + +// Worker knows how to prune image and its related components. +type Worker interface { + // Run is supposed to be run as a go-rutine. It terminates when nil is received through the in channel. + Run(in <-chan *Job, out chan<- JobResult) +} + +type worker struct { + algorithm pruneAlgorithm + registryClient *http.Client + registryURL *url.URL + imagePruner ImageDeleter + streamPruner ImageStreamDeleter + layerLinkPruner LayerLinkDeleter + blobPruner BlobDeleter + manifestPruner ManifestDeleter +} + +var _ Worker = &worker{} + +// NewWorker creates a new pruning worker. +func NewWorker( + algorithm pruneAlgorithm, + registryClientFactory RegistryClientFactoryFunc, + registryURL *url.URL, + imagePrunerFactory ImagePrunerFactoryFunc, + streamPruner ImageStreamDeleter, + layerLinkPruner LayerLinkDeleter, + blobPruner BlobDeleter, + manifestPruner ManifestDeleter, +) (Worker, error) { + client, err := registryClientFactory() + if err != nil { + return nil, err + } + + imagePruner, err := imagePrunerFactory() + if err != nil { + return nil, err + } + + return &worker{ + algorithm: algorithm, + registryClient: client, + registryURL: registryURL, + imagePruner: imagePruner, + streamPruner: streamPruner, + layerLinkPruner: layerLinkPruner, + blobPruner: blobPruner, + manifestPruner: manifestPruner, + }, nil +} + +func (w *worker) Run(in <-chan *Job, out chan<- JobResult) { + for { + job, more := <-in + if !more { + return + } + out <- *w.prune(job) + } +} + +func (w *worker) prune(job *Job) *JobResult { + res := &JobResult{Job: job} + + blobDeletions, blobFailures := []Deletion{}, []Failure{} + + if w.algorithm.pruneRegistry { + // NOTE: not found errors are treated as success + res.update(pruneImageComponents( + w.registryClient, + w.registryURL, + job.Components, + w.layerLinkPruner, + )) + + blobDeletions, blobFailures := pruneBlobs( + w.registryClient, + w.registryURL, + job.Components, + w.blobPruner, + ) + res.update(blobDeletions, blobFailures) + + res.update(pruneManifests( + w.registryClient, + w.registryURL, + job.Components, + w.manifestPruner, + )) + } + + // Keep the image object when its blobs could not be deleted and the image is ostensibly (we cannot be + // sure unless we ask the registry for blob's existence) still complete. Thanks to the preservation, the + // blobs can be identified and deleted next time. + if len(blobDeletions) > 0 || len(blobFailures) == 0 { + res.update(pruneImages(job.Image, w.imagePruner)) + } + + return res +} + +// pruneImages invokes imagePruner.DeleteImage with each image that is prunable. +func pruneImages( + imageNode *imagegraph.ImageNode, + imagePruner ImageDeleter, +) (deletions []Deletion, failures []Failure) { + err := imagePruner.DeleteImage(imageNode.Image) + if err != nil { + if kerrapi.IsNotFound(err) { + glog.V(2).Infof("Skipping image %s that no longer exists", imageNode.Image.Name) + } else { + failures = append(failures, Failure{Node: imageNode, Err: err}) + } + } else { + deletions = append(deletions, Deletion{Node: imageNode}) + } + + return +} + +// pruneImageComponents invokes layerLinkDeleter.DeleteLayerLink for each repository layer link to +// be deleted from the registry. +func pruneImageComponents( + registryClient *http.Client, + registryURL *url.URL, + crs ComponentRetentions, + layerLinkDeleter LayerLinkDeleter, +) (deletions []Deletion, failures []Failure) { + enumerateImageStreamComponents(crs, nil, false, func( + comp *imagegraph.ImageComponentNode, + stream *imagegraph.ImageStreamNode, + _ bool, + ) { + if comp.Type == imagegraph.ImageComponentTypeManifest { + return + } + streamName := getName(stream.ImageStream) + glog.V(4).Infof("Pruning repository %s/%s: %s", registryURL.Host, streamName, comp.Describe()) + err := layerLinkDeleter.DeleteLayerLink(registryClient, registryURL, streamName, comp.Component) + if err != nil { + failures = append(failures, Failure{Node: comp, Parent: stream, Err: err}) + } else { + deletions = append(deletions, Deletion{Node: comp, Parent: stream}) + } + }) + + return +} + +// pruneBlobs invokes blobPruner.DeleteBlob for each blob to be deleted from the registry. +func pruneBlobs( + registryClient *http.Client, + registryURL *url.URL, + crs ComponentRetentions, + blobPruner BlobDeleter, +) (deletions []Deletion, failures []Failure) { + enumerateImageComponents(crs, nil, false, func(comp *imagegraph.ImageComponentNode, prunable bool) { + err := blobPruner.DeleteBlob(registryClient, registryURL, comp.Component) + if err != nil { + failures = append(failures, Failure{Node: comp, Err: err}) + } else { + deletions = append(deletions, Deletion{Node: comp}) + } + }) + + return +} + +// pruneManifests invokes manifestPruner.DeleteManifest for each repository +// manifest to be deleted from the registry. +func pruneManifests( + registryClient *http.Client, + registryURL *url.URL, + crs ComponentRetentions, + manifestPruner ManifestDeleter, +) (deletions []Deletion, failures []Failure) { + manifestType := imagegraph.ImageComponentTypeManifest + enumerateImageStreamComponents(crs, &manifestType, false, func( + manifestNode *imagegraph.ImageComponentNode, + stream *imagegraph.ImageStreamNode, + _ bool, + ) { + repoName := getName(stream.ImageStream) + + glog.V(4).Infof("Pruning manifest %s in the repository %s/%s", manifestNode.Component, registryURL.Host, repoName) + err := manifestPruner.DeleteManifest(registryClient, registryURL, repoName, manifestNode.Component) + if err != nil { + failures = append(failures, Failure{Node: manifestNode, Parent: stream, Err: err}) + } else { + deletions = append(deletions, Deletion{Node: manifestNode, Parent: stream}) + } + }) + + return +} diff --git a/pkg/oc/admin/prune/images.go b/pkg/oc/admin/prune/images.go index 0c35df01aa28..766944853744 100644 --- a/pkg/oc/admin/prune/images.go +++ b/pkg/oc/admin/prune/images.go @@ -15,13 +15,17 @@ import ( "text/tabwriter" "time" + "github.com/golang/glog" + gonum "github.com/gonum/graph" "github.com/spf13/cobra" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kutilerrors "k8s.io/apimachinery/pkg/util/errors" knet "k8s.io/apimachinery/pkg/util/net" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" apimachineryversion "k8s.io/apimachinery/pkg/version" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" restclient "k8s.io/client-go/rest" kclientcmd "k8s.io/client-go/tools/clientcmd" @@ -37,6 +41,7 @@ import ( imageclient "github.com/openshift/origin/pkg/image/generated/internalclientset/typed/image/internalversion" "github.com/openshift/origin/pkg/oc/admin/prune/imageprune" "github.com/openshift/origin/pkg/oc/cli/util/clientcmd" + imagegraph "github.com/openshift/origin/pkg/oc/graph/imagegraph/nodes" oserrors "github.com/openshift/origin/pkg/util/errors" "github.com/openshift/origin/pkg/util/netutils" "github.com/openshift/origin/pkg/version" @@ -122,15 +127,16 @@ type PruneImagesOptions struct { PruneRegistry *bool IgnoreInvalidRefs bool - ClientConfig *restclient.Config - AppsClient appsclient.AppsInterface - BuildClient buildclient.BuildInterface - ImageClient imageclient.ImageInterface - DiscoveryClient discovery.DiscoveryInterface - KubeClient kclientset.Interface - Timeout time.Duration - Out io.Writer - ErrOut io.Writer + ClientConfig *restclient.Config + AppsClient appsclient.AppsInterface + BuildClient buildclient.BuildInterface + ImageClient imageclient.ImageInterface + ImageClientFactory func() (imageclient.ImageInterface, error) + DiscoveryClient discovery.DiscoveryInterface + KubeClient kclientset.Interface + Timeout time.Duration + Out io.Writer + ErrOut io.Writer } // NewCmdPruneImages implements the OpenShift cli prune images command. @@ -214,6 +220,7 @@ func (o *PruneImagesOptions) Complete(f *clientcmd.Factory, cmd *cobra.Command, o.AppsClient = appsClient o.BuildClient = buildClient o.ImageClient = imageClient + o.ImageClientFactory = getImageClientFactory(f) o.KubeClient = kubeClient o.DiscoveryClient = kubeClient.Discovery() @@ -250,16 +257,6 @@ func (o PruneImagesOptions) Validate() error { // Run contains all the necessary functionality for the OpenShift cli prune images command. func (o PruneImagesOptions) Run() error { - allImages, err := o.ImageClient.Images().List(metav1.ListOptions{}) - if err != nil { - return err - } - - allStreams, err := o.ImageClient.ImageStreams(o.Namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - allPods, err := o.KubeClient.Core().Pods(o.Namespace).List(metav1.ListOptions{}) if err != nil { return err @@ -331,10 +328,35 @@ func (o PruneImagesOptions) Run() error { limitRangesMap[limit.Namespace] = limits } + allImages, err := o.ImageClient.Images().List(metav1.ListOptions{}) + if err != nil { + return err + } + imageWatcher, err := o.ImageClient.Images().Watch(metav1.ListOptions{}) + if err != nil { + utilruntime.HandleError(fmt.Errorf("internal error: failed to watch for images: %v"+ + "\n - image changes will not be detected", err)) + imageWatcher = watch.NewFake() + } + + imageStreamWatcher, err := o.ImageClient.ImageStreams(o.Namespace).Watch(metav1.ListOptions{}) + if err != nil { + utilruntime.HandleError(fmt.Errorf("internal error: failed to watch for image streams: %v"+ + "\n - image stream changes will not be detected", err)) + imageStreamWatcher = watch.NewFake() + } + defer imageStreamWatcher.Stop() + + allStreams, err := o.ImageClient.ImageStreams(o.Namespace).List(metav1.ListOptions{}) + if err != nil { + return err + } + var ( - registryHost = o.RegistryUrlOverride - registryClient *http.Client - registryPinger imageprune.RegistryPinger + registryHost = o.RegistryUrlOverride + registryClientFactory imageprune.RegistryClientFactoryFunc + registryClient *http.Client + registryPinger imageprune.RegistryPinger ) if o.Confirm { @@ -351,18 +373,24 @@ func (o PruneImagesOptions) Run() error { strings.HasPrefix(registryHost, "http://") } - registryClient, err = getRegistryClient(o.ClientConfig, o.CABundle, insecure) + registryClientFactory = func() (*http.Client, error) { + return getRegistryClient(o.ClientConfig, o.CABundle, insecure) + } + registryClient, err = registryClientFactory() if err != nil { return err } + registryPinger = &imageprune.DefaultRegistryPinger{ Client: registryClient, Insecure: insecure, } } else { registryPinger = &imageprune.DryRunRegistryPinger{} + registryClientFactory = imageprune.FakeRegistryClientFactory } + // verify the registy connection now to avoid future surprises registryURL, err := registryPinger.Ping(registryHost) if err != nil { if len(o.RegistryUrlOverride) == 0 && regexp.MustCompile(registryURLNotReachable).MatchString(err.Error()) { @@ -372,26 +400,28 @@ func (o PruneImagesOptions) Run() error { } options := imageprune.PrunerOptions{ - KeepYoungerThan: o.KeepYoungerThan, - KeepTagRevisions: o.KeepTagRevisions, - PruneOverSizeLimit: o.PruneOverSizeLimit, - AllImages: o.AllImages, - Images: allImages, - Streams: allStreams, - Pods: allPods, - RCs: allRCs, - BCs: allBCs, - Builds: allBuilds, - DSs: allDSs, - Deployments: allDeployments, - DCs: allDCs, - RSs: allRSs, - LimitRanges: limitRangesMap, - DryRun: o.Confirm == false, - RegistryClient: registryClient, - RegistryURL: registryURL, - PruneRegistry: o.PruneRegistry, - IgnoreInvalidRefs: o.IgnoreInvalidRefs, + KeepYoungerThan: o.KeepYoungerThan, + KeepTagRevisions: o.KeepTagRevisions, + PruneOverSizeLimit: o.PruneOverSizeLimit, + AllImages: o.AllImages, + Images: allImages, + ImageWatcher: imageWatcher, + Streams: allStreams, + StreamWatcher: imageStreamWatcher, + Pods: allPods, + RCs: allRCs, + BCs: allBCs, + Builds: allBuilds, + DSs: allDSs, + Deployments: allDeployments, + DCs: allDCs, + RSs: allRSs, + LimitRanges: limitRangesMap, + DryRun: o.Confirm == false, + RegistryClientFactory: registryClientFactory, + RegistryURL: registryURL, + PruneRegistry: o.PruneRegistry, + IgnoreInvalidRefs: o.IgnoreInvalidRefs, } if o.Namespace != metav1.NamespaceAll { options.Namespace = o.Namespace @@ -402,21 +432,27 @@ func (o PruneImagesOptions) Run() error { return fmt.Errorf("failed to build graph - no changes made") } - w := tabwriter.NewWriter(o.Out, 10, 4, 3, ' ', 0) - defer w.Flush() - - imageDeleter := &describingImageDeleter{w: w, errOut: o.ErrOut} - imageStreamDeleter := &describingImageStreamDeleter{w: w, errOut: o.ErrOut} - layerLinkDeleter := &describingLayerLinkDeleter{w: w, errOut: o.ErrOut} - blobDeleter := &describingBlobDeleter{w: w, errOut: o.ErrOut} - manifestDeleter := &describingManifestDeleter{w: w, errOut: o.ErrOut} + imagePrunerFactory := func() (imageprune.ImageDeleter, error) { + return &describingImageDeleter{w: o.Out, errOut: o.ErrOut}, nil + } + imageStreamDeleter := &describingImageStreamDeleter{w: o.Out, errOut: o.ErrOut} + layerLinkDeleter := &describingLayerLinkDeleter{w: o.Out, errOut: o.ErrOut} + blobDeleter := &describingBlobDeleter{w: o.Out, errOut: o.ErrOut} + manifestDeleter := &describingManifestDeleter{w: o.Out, errOut: o.ErrOut} if o.Confirm { - imageDeleter.delegate = imageprune.NewImageDeleter(o.ImageClient) imageStreamDeleter.delegate = imageprune.NewImageStreamDeleter(o.ImageClient) layerLinkDeleter.delegate = imageprune.NewLayerLinkDeleter() blobDeleter.delegate = imageprune.NewBlobDeleter() manifestDeleter.delegate = imageprune.NewManifestDeleter() + + imagePrunerFactory = func() (imageprune.ImageDeleter, error) { + imageClient, err := o.ImageClientFactory() + if err != nil { + return nil, err + } + return imageprune.NewImageDeleter(imageClient), nil + } } else { fmt.Fprintln(o.ErrOut, "Dry run enabled - no modifications will be made. Add --confirm to remove images") } @@ -425,7 +461,99 @@ func (o PruneImagesOptions) Run() error { fmt.Fprintln(o.Out, "Only API objects will be removed. No modifications to the image registry will be made.") } - return pruner.Prune(imageDeleter, imageStreamDeleter, layerLinkDeleter, blobDeleter, manifestDeleter) + deletions, failures := pruner.Prune( + imagePrunerFactory, + imageStreamDeleter, + layerLinkDeleter, + blobDeleter, + manifestDeleter, + ) + printSummary(o.Out, deletions, failures) + if len(failures) == 1 { + return &failures[0] + } + if len(failures) > 0 { + return fmt.Errorf("failed") + } + return nil +} + +func printSummary(out io.Writer, deletions []imageprune.Deletion, failures []imageprune.Failure) { + // TODO: for higher verbosity, sum by error type + if len(failures) == 0 { + fmt.Fprintf(out, "Deleted %d objects.\n", len(deletions)) + } else { + fmt.Fprintf(out, "Deleted %d objects out of %d.\n", len(deletions), len(deletions)+len(failures)) + fmt.Fprintf(out, "Failed to delete %d objects.\n", len(failures)) + } + if !glog.V(2) { + return + } + + fmt.Fprintf(out, "\n") + + w := tabwriter.NewWriter(out, 10, 4, 3, ' ', 0) + defer w.Flush() + + buckets := make(map[string]struct{ deletions, failures, total uint64 }) + count := func(node gonum.Node, parent gonum.Node, deletions uint64, failures uint64) { + bucket := "" + switch t := node.(type) { + case *imagegraph.ImageStreamNode: + bucket = "is" + case *imagegraph.ImageNode: + bucket = "image" + case *imagegraph.ImageComponentNode: + bucket = "component/" + string(t.Type) + if parent == nil { + bucket = "blob" + } + default: + bucket = fmt.Sprintf("other/%T", t) + } + c := buckets[bucket] + c.deletions += deletions + c.failures += failures + c.total += deletions + failures + buckets[bucket] = c + } + + for _, d := range deletions { + count(d.Node, d.Parent, 1, 0) + } + for _, f := range failures { + count(f.Node, f.Parent, 0, 1) + } + + printAndPopBucket := func(name string, desc string) { + cnt, ok := buckets[name] + if ok { + delete(buckets, name) + } + if cnt.total == 0 { + return + } + fmt.Fprintf(w, "%s:\t%d\n", desc, cnt.deletions) + if cnt.failures == 0 { + return + } + // add padding before failures to make it appear subordinate to the line above + for i := 0; i < len(desc)-len("failures"); i++ { + fmt.Fprintf(w, " ") + } + fmt.Fprintf(w, "failures:\t%d\n", cnt.failures) + } + + printAndPopBucket("is", "Image Stream updates") + printAndPopBucket("image", "Image deletions") + printAndPopBucket("blob", "Blob deletions") + printAndPopBucket("component/"+string(imagegraph.ImageComponentTypeManifest), "Image Manifest Link deletions") + printAndPopBucket("component/"+string(imagegraph.ImageComponentTypeConfig), "Image Config Link deletions") + printAndPopBucket("component/"+string(imagegraph.ImageComponentTypeLayer), "Image Layer Link deletions") + + for name := range buckets { + printAndPopBucket(name, fmt.Sprintf("%s deletions", strings.TrimPrefix(name, "other/"))) + } } func (o *PruneImagesOptions) printGraphBuildErrors(errs kutilerrors.Aggregate) { @@ -487,17 +615,11 @@ func (p *describingImageStreamDeleter) UpdateImageStream(stream *imageapi.ImageS } func (p *describingImageStreamDeleter) NotifyImageStreamPrune(stream *imageapi.ImageStream, updatedTags []string, deletedTags []string) { - if !p.headerPrinted { - p.headerPrinted = true - fmt.Fprintln(p.w, "Deleting references from image streams to images ...") - fmt.Fprintln(p.w, "STREAM\tACTION\tTAGS") - } - if len(updatedTags) > 0 { - fmt.Fprintf(p.w, "%s/%s\tUpdated\t%s\n", stream.Namespace, stream.Name, strings.Join(updatedTags, ", ")) + fmt.Fprintf(p.w, "Updating istags %s/%s: %s\n", stream.Namespace, stream.Name, strings.Join(updatedTags, ", ")) } if len(deletedTags) > 0 { - fmt.Fprintf(p.w, "%s/%s\tDeleted\t%s\n", stream.Namespace, stream.Name, strings.Join(deletedTags, ", ")) + fmt.Fprintf(p.w, "Deleting istags %s/%s: %s\n", stream.Namespace, stream.Name, strings.Join(deletedTags, ", ")) } } @@ -513,13 +635,7 @@ type describingImageDeleter struct { var _ imageprune.ImageDeleter = &describingImageDeleter{} func (p *describingImageDeleter) DeleteImage(image *imageapi.Image) error { - if !p.headerPrinted { - p.headerPrinted = true - fmt.Fprintln(p.w, "\nDeleting images from server ...") - fmt.Fprintln(p.w, "IMAGE") - } - - fmt.Fprintf(p.w, "%s\n", image.Name) + fmt.Fprintf(p.w, "Deleting image %s\n", image.Name) if p.delegate == nil { return nil @@ -545,13 +661,7 @@ type describingLayerLinkDeleter struct { var _ imageprune.LayerLinkDeleter = &describingLayerLinkDeleter{} func (p *describingLayerLinkDeleter) DeleteLayerLink(registryClient *http.Client, registryURL *url.URL, repo, name string) error { - if !p.headerPrinted { - p.headerPrinted = true - fmt.Fprintln(p.w, "\nDeleting registry repository layer links ...") - fmt.Fprintln(p.w, "REPO\tLAYER LINK") - } - - fmt.Fprintf(p.w, "%s\t%s\n", repo, name) + fmt.Fprintf(p.w, "Deleting layer link %s in repository %s\n", name, repo) if p.delegate == nil { return nil @@ -577,13 +687,7 @@ type describingBlobDeleter struct { var _ imageprune.BlobDeleter = &describingBlobDeleter{} func (p *describingBlobDeleter) DeleteBlob(registryClient *http.Client, registryURL *url.URL, layer string) error { - if !p.headerPrinted { - p.headerPrinted = true - fmt.Fprintln(p.w, "\nDeleting registry layer blobs ...") - fmt.Fprintln(p.w, "BLOB") - } - - fmt.Fprintf(p.w, "%s\n", layer) + fmt.Fprintf(p.w, "Deleting blob %s\n", layer) if p.delegate == nil { return nil @@ -610,13 +714,7 @@ type describingManifestDeleter struct { var _ imageprune.ManifestDeleter = &describingManifestDeleter{} func (p *describingManifestDeleter) DeleteManifest(registryClient *http.Client, registryURL *url.URL, repo, manifest string) error { - if !p.headerPrinted { - p.headerPrinted = true - fmt.Fprintln(p.w, "\nDeleting registry repository manifest data ...") - fmt.Fprintln(p.w, "REPO\tIMAGE") - } - - fmt.Fprintf(p.w, "%s\t%s\n", repo, manifest) + fmt.Fprintf(p.w, "Deleting manifest link %s in repository %s\n", manifest, repo) if p.delegate == nil { return nil @@ -624,7 +722,7 @@ func (p *describingManifestDeleter) DeleteManifest(registryClient *http.Client, err := p.delegate.DeleteManifest(registryClient, registryURL, repo, manifest) if err != nil { - fmt.Fprintf(p.errOut, "error deleting manifest %s from repository %s: %v\n", manifest, repo, err) + fmt.Fprintf(p.errOut, "error deleting manifest link %s from repository %s: %v\n", manifest, repo, err) } return err @@ -660,6 +758,17 @@ func getClients(f *clientcmd.Factory) (appsclient.AppsInterface, buildclient.Bui return appsClient, buildClient, imageClient, kubeClient, err } +func getImageClientFactory(f *clientcmd.Factory) func() (imageclient.ImageInterface, error) { + return func() (imageclient.ImageInterface, error) { + clientConfig, err := f.ClientConfig() + if err != nil { + return nil, err + } + + return imageclient.NewForConfig(clientConfig) + } +} + // getRegistryClient returns a registry client. Note that registryCABundle and registryInsecure=true are // mutually exclusive. If registryInsecure=true is specified, the ca bundle is ignored. func getRegistryClient(clientConfig *restclient.Config, registryCABundle string, registryInsecure bool) (*http.Client, error) { diff --git a/pkg/oc/graph/imagegraph/nodes/nodes.go b/pkg/oc/graph/imagegraph/nodes/nodes.go index 61b438635dc1..60ed2b173cbd 100644 --- a/pkg/oc/graph/imagegraph/nodes/nodes.go +++ b/pkg/oc/graph/imagegraph/nodes/nodes.go @@ -198,3 +198,8 @@ func EnsureImageComponentConfigNode(g osgraph.MutableUniqueGraph, name string) g func EnsureImageComponentLayerNode(g osgraph.MutableUniqueGraph, name string) graph.Node { return ensureImageComponentNode(g, name, ImageComponentTypeLayer) } + +// EnsureImageComponentLayerNode adds a graph node for the image layer if it does not already exist. +func EnsureImageComponentManifestNode(g osgraph.MutableUniqueGraph, name string) graph.Node { + return ensureImageComponentNode(g, name, ImageComponentTypeManifest) +} diff --git a/pkg/oc/graph/imagegraph/nodes/types.go b/pkg/oc/graph/imagegraph/nodes/types.go index 5482b757a99d..989f0e3478f7 100644 --- a/pkg/oc/graph/imagegraph/nodes/types.go +++ b/pkg/oc/graph/imagegraph/nodes/types.go @@ -13,8 +13,9 @@ type ImageComponentType string const ( ImageComponentNodeKind = "ImageComponent" - ImageComponentTypeConfig ImageComponentType = `Config` - ImageComponentTypeLayer ImageComponentType = `Layer` + ImageComponentTypeConfig ImageComponentType = `Config` + ImageComponentTypeLayer ImageComponentType = `Layer` + ImageComponentTypeManifest ImageComponentType = `Manifest` ) var ( diff --git a/test/testdata/bootstrappolicy/bootstrap_cluster_roles.yaml b/test/testdata/bootstrappolicy/bootstrap_cluster_roles.yaml index 53d0a6b8154e..099973360db5 100644 --- a/test/testdata/bootstrappolicy/bootstrap_cluster_roles.yaml +++ b/test/testdata/bootstrappolicy/bootstrap_cluster_roles.yaml @@ -1612,6 +1612,7 @@ items: verbs: - get - list + - watch - apiGroups: - "" - image.openshift.io diff --git a/test/testdata/bootstrappolicy/bootstrap_policy_file.yaml b/test/testdata/bootstrappolicy/bootstrap_policy_file.yaml index bd96d95b2a95..21a31a1b8cf7 100644 --- a/test/testdata/bootstrappolicy/bootstrap_policy_file.yaml +++ b/test/testdata/bootstrappolicy/bootstrap_policy_file.yaml @@ -1612,6 +1612,7 @@ items: verbs: - get - list + - watch - apiGroups: - "" - image.openshift.io