From 2f4c6f06b18713c67a11f19b413de705a5338309 Mon Sep 17 00:00:00 2001 From: Jason Hall Date: Wed, 10 Mar 2021 21:13:05 -0500 Subject: [PATCH] Support WithJobs in remote.Write (#958) --- pkg/v1/remote/write.go | 75 +++++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index 261e9ec44..8f354f747 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -64,39 +64,60 @@ func Write(ref name.Reference, img v1.Image, options ...Option) error { context: o.context, } + // Upload individual blobs and collect any errors. + blobChan := make(chan v1.Layer, 2*o.jobs) + g, ctx := errgroup.WithContext(o.context) + for i := 0; i < o.jobs; i++ { + // Start N workers consuming blobs to upload. + g.Go(func() error { + for b := range blobChan { + if err := w.uploadOne(b); err != nil { + return err + } + } + return nil + }) + } + // Upload individual layers in goroutines and collect any errors. // If we can dedupe by the layer digest, try to do so. If we can't determine // the digest for whatever reason, we can't dedupe and might re-upload. - var g errgroup.Group - uploaded := map[v1.Hash]bool{} - for _, l := range ls { - l := l - - // Handle foreign layers. - mt, err := l.MediaType() - if err != nil { - return err - } - if !mt.IsDistributable() && !o.allowNondistributableArtifacts { - continue - } + g.Go(func() error { + defer close(blobChan) + uploaded := map[v1.Hash]bool{} + for _, l := range ls { + l := l + + // Handle foreign layers. + mt, err := l.MediaType() + if err != nil { + return err + } + if !mt.IsDistributable() && !o.allowNondistributableArtifacts { + continue + } - // Streaming layers calculate their digests while uploading them. Assume - // an error here indicates we need to upload the layer. - h, err := l.Digest() - if err == nil { - // If we can determine the layer's digest ahead of - // time, use it to dedupe uploads. - if uploaded[h] { - continue // Already uploading. + // Streaming layers calculate their digests while uploading them. Assume + // an error here indicates we need to upload the layer. + h, err := l.Digest() + if err == nil { + // If we can determine the layer's digest ahead of + // time, use it to dedupe uploads. + if uploaded[h] { + continue // Already uploading. + } + uploaded[h] = true + } + select { + case blobChan <- l: + case <-ctx.Done(): + return ctx.Err() } - uploaded[h] = true } - - // TODO(#803): Pipe through remote.WithJobs and upload these in parallel. - g.Go(func() error { - return w.uploadOne(l) - }) + return nil + }) + if err := g.Wait(); err != nil { + return err } if l, err := partial.ConfigLayer(img); err != nil {