From 7943dea9e8149ec5107ff671e36809ca8d990744 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 22 Jun 2020 10:53:02 +0200 Subject: [PATCH] buildah, bud: support --jobs=N for parallel execution it enables running multi stages Containerfiles in parallel. Signed-off-by: Giuseppe Scrivano --- cmd/buildah/bud.go | 12 ++- docs/buildah-bud.md | 6 ++ go.mod | 1 + imagebuildah/build.go | 3 + imagebuildah/executor.go | 129 ++++++++++++++++++++++++++++++--- imagebuildah/stage_executor.go | 3 + pkg/cli/common.go | 2 + tests/bud.bats | 17 +++++ 8 files changed, 161 insertions(+), 12 deletions(-) diff --git a/cmd/buildah/bud.go b/cmd/buildah/bud.go index 7d39b4158f4..a7c6caa28e5 100644 --- a/cmd/buildah/bud.go +++ b/cmd/buildah/bud.go @@ -291,7 +291,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error { defaultsMountFile, _ := c.PersistentFlags().GetString("defaults-mount-file") - os, arch, err := parse.PlatformFromOptions(c) + imageOS, arch, err := parse.PlatformFromOptions(c) if err != nil { return err } @@ -301,6 +301,13 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error { return errors.Wrapf(err, "unable to obtain decrypt config") } + if iopts.Jobs > 1 { + stdin, err = os.OpenFile("/dev/null", os.O_RDONLY|os.O_CREATE, 0000) + if err != nil { + return err + } + } + options := imagebuildah.BuildOptions{ AddCapabilities: iopts.CapAdd, AdditionalTags: tags, @@ -328,7 +335,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error { MaxPullPushRetries: maxPullPushRetries, NamespaceOptions: namespaceOptions, NoCache: iopts.NoCache, - OS: os, + OS: imageOS, Out: stdout, Output: output, OutputFormat: format, @@ -346,6 +353,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error { Target: iopts.Target, TransientMounts: iopts.Volumes, OciDecryptConfig: decConfig, + Jobs: iopts.Jobs, } if iopts.Quiet { diff --git a/docs/buildah-bud.md b/docs/buildah-bud.md index ac84290efe5..333546d9cf3 100644 --- a/docs/buildah-bud.md +++ b/docs/buildah-bud.md @@ -278,6 +278,12 @@ container technology). Note: You can also override the default isolation type by setting the BUILDAH\_ISOLATION environment variable. `export BUILDAH_ISOLATION=oci` +**--jobs** *N* + +Run up to N concurrent stages in parallel. If the number of jobs is greater than 1, +stdin will be read from /dev/null. If 0 is specified, then there is +no limit in the number of jobs that run in parallel. + **--label** *label* Add an image *label* (e.g. label=*value*) to the image metadata. Can be used multiple times. diff --git a/go.mod b/go.mod index daf5a4f19ff..271e454c02c 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/vishvananda/netlink v1.1.0 // indirect go.etcd.io/bbolt v1.3.4 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 + golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 ) diff --git a/imagebuildah/build.go b/imagebuildah/build.go index a9e0641c063..9d6f59cdc03 100644 --- a/imagebuildah/build.go +++ b/imagebuildah/build.go @@ -177,6 +177,9 @@ type BuildOptions struct { // OciDecryptConfig contains the config that can be used to decrypt an image if it is // encrypted if non-nil. If nil, it does not attempt to decrypt an image. OciDecryptConfig *encconfig.DecryptConfig + + // Jobs is the number of stages to run in parallel. If not specified it defaults to 1. + Jobs int } // BuildDockerfiles parses a set of one or more Dockerfiles (which may be diff --git a/imagebuildah/executor.go b/imagebuildah/executor.go index 1e31173b6d2..b187c2ec3aa 100644 --- a/imagebuildah/executor.go +++ b/imagebuildah/executor.go @@ -9,6 +9,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/containers/buildah" @@ -29,6 +30,7 @@ import ( "github.com/openshift/imagebuilder/dockerfile/parser" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) // builtinAllowedBuildArgs is list of built-in allowed build args. Normally we @@ -102,6 +104,11 @@ type Executor struct { maxPullPushRetries int retryPullPushDelay time.Duration ociDecryptConfig *encconfig.DecryptConfig + lastError error + terminatedStage map[string]struct{} + stagesLock sync.Mutex + stagesSemaphore *semaphore.Weighted + jobs int } // NewExecutor creates a new instance of the imagebuilder.Executor interface. @@ -191,6 +198,8 @@ func NewExecutor(store storage.Store, options BuildOptions, mainNode *parser.Nod maxPullPushRetries: options.MaxPullPushRetries, retryPullPushDelay: options.PullPushRetryDelay, ociDecryptConfig: options.OciDecryptConfig, + terminatedStage: make(map[string]struct{}), + jobs: options.Jobs, } if exec.err == nil { exec.err = os.Stderr @@ -277,6 +286,35 @@ func (b *Executor) resolveNameToImageRef(output string) (types.ImageReference, e return imageRef, nil } +func (b *Executor) waitForStage(ctx context.Context, name string) error { + stage := b.stages[name] + if stage == nil { + return errors.Errorf("unknown stage %q", name) + } + for { + if b.lastError != nil { + return b.lastError + } + if stage.stage == nil { + return nil + } + + b.stagesLock.Lock() + _, terminated := b.terminatedStage[name] + b.stagesLock.Unlock() + + if terminated { + return nil + } + + b.stagesSemaphore.Release(1) + time.Sleep(time.Millisecond * 10) + if err := b.stagesSemaphore.Acquire(ctx, 1); err != nil { + return err + } + } +} + // getImageHistory returns the history of imageID. func (b *Executor) getImageHistory(ctx context.Context, imageID string) ([]v1.History, error) { imageRef, err := is.Transport.ParseStoreReference(b.store, "@"+imageID) @@ -320,7 +358,9 @@ func (b *Executor) buildStage(ctx context.Context, cleanupStages map[int]*StageE // remove the intermediate/build containers, regardless of // whether or not the stage's build fails. if b.forceRmIntermediateCtrs || !b.layers { + b.stagesLock.Lock() cleanupStages[stage.Position] = stageExecutor + b.stagesLock.Unlock() } // Build this stage. @@ -332,7 +372,9 @@ func (b *Executor) buildStage(ctx context.Context, cleanupStages map[int]*StageE // told to delete successful intermediate/build containers for // multi-layered builds. if b.removeIntermediateCtrs { - cleanupStages[stage.Position] = stageExecutor + b.stagesLock.Lock() + cleanupStages[stage.Position] = stageExecutor + b.stagesLock.Unlock() } return imageID, ref, nil @@ -357,12 +399,16 @@ func (b *Executor) Build(ctx context.Context, stages imagebuilder.Stages) (image // Clean up any containers associated with the final container // built by a stage, for stages that succeeded, since we no // longer need their filesystem contents. + + b.stagesLock.Lock() for _, stage := range cleanupStages { if err := stage.Delete(); err != nil { logrus.Debugf("Failed to cleanup stage containers: %v", err) lastErr = err } } + b.stagesLock.Unlock() + cleanupStages = nil // Clean up any builders that we used to get data from images. for _, builder := range b.containerMap { @@ -447,24 +493,87 @@ func (b *Executor) Build(ctx context.Context, stages imagebuilder.Stages) (image } } - // Run through the build stages, one at a time. - for stageIndex, stage := range stages { - imageID, ref, err = b.buildStage(ctx, cleanupStages, stages, stageIndex) - if err != nil { - return imageID, ref, err + type Result struct { + Index int + ImageID string + Ref reference.Canonical + Error error + } + + ch := make(chan Result) + + jobs := int64(b.jobs) + if jobs < 0 { + return "", nil, errors.New("error building: invalid value for jobs. It must be a positive integer") + } else if jobs == 0 { + jobs = int64(len(stages)) + } + + b.stagesSemaphore = semaphore.NewWeighted(jobs) + + var wg sync.WaitGroup + wg.Add(len(stages)) + + go func() { + for stageIndex := range stages { + index := stageIndex + // Acquire the sempaphore before creating the goroutine so we are sure they + // run in the specified order. + if err := b.stagesSemaphore.Acquire(ctx, 1); err != nil { + b.lastError = err + return + } + go func() { + defer b.stagesSemaphore.Release(1) + defer wg.Done() + imageID, ref, err = b.buildStage(ctx, cleanupStages, stages, index) + if err != nil { + ch <- Result{ + Index: index, + Error: err, + } + return + } + + ch <- Result{ + Index: index, + ImageID: imageID, + Ref: ref, + Error: nil, + } + }() + } + }() + go func() { + wg.Wait() + close(ch) + }() + + for r := range ch { + stage := stages[r.Index] + + b.stagesLock.Lock() + b.terminatedStage[stage.Name] = struct{}{} + b.stagesLock.Unlock() + + if r.Error != nil { + b.lastError = r.Error + return "", nil, r.Error } // If this is an intermediate stage, make a note of the ID, so // that we can look it up later. - if stageIndex < len(stages)-1 && imageID != "" { - b.imageMap[stage.Name] = imageID + if r.Index < len(stages)-1 && r.ImageID != "" { + b.imageMap[stage.Name] = r.ImageID // We're not populating the cache with intermediate // images, so add this one to the list of images that // we'll remove later. if !b.layers { - cleanupImages = append(cleanupImages, imageID) + cleanupImages = append(cleanupImages, r.ImageID) } - imageID = "" + } + if r.Index == len(stages)-1 { + imageID = r.ImageID } } diff --git a/imagebuildah/stage_executor.go b/imagebuildah/stage_executor.go index 7ba5e2e964b..eafe5abbf97 100644 --- a/imagebuildah/stage_executor.go +++ b/imagebuildah/stage_executor.go @@ -870,6 +870,9 @@ func (s *StageExecutor) Execute(ctx context.Context, base string) (imgID string, return "", nil, errors.Errorf("%s: invalid --from flag, should be --from=", command) } if otherStage, ok := s.executor.stages[arr[1]]; ok && otherStage.index < s.index { + if err := s.executor.waitForStage(ctx, arr[1]); err != nil { + return "", nil, err + } mountPoint = otherStage.mountPoint } else if mountPoint, err = s.getImageRootfs(ctx, arr[1]); err != nil { return "", nil, errors.Errorf("%s --from=%s: no stage or image found with that name", command, arr[1]) diff --git a/pkg/cli/common.go b/pkg/cli/common.go index 1a457f34c4b..977013a39fc 100644 --- a/pkg/cli/common.go +++ b/pkg/cli/common.go @@ -80,6 +80,7 @@ type BudResults struct { Tag []string Target string TLSVerify bool + Jobs int } // FromAndBugResults represents the results for common flags @@ -182,6 +183,7 @@ func GetBudFlags(flags *BudResults) pflag.FlagSet { fs.StringArrayVarP(&flags.Tag, "tag", "t", []string{}, "tagged `name` to apply to the built image") fs.StringVar(&flags.Target, "target", "", "set the target build stage to build") fs.BoolVar(&flags.TLSVerify, "tls-verify", true, "require HTTPS and verify certificates when accessing the registry") + fs.IntVar(&flags.Jobs, "jobs", 1, "how many stages to run in parallel") return fs } diff --git a/tests/bud.bats b/tests/bud.bats index d17ba83268a..62648afc809 100644 --- a/tests/bud.bats +++ b/tests/bud.bats @@ -1382,8 +1382,25 @@ function _test_http() { @test "bud with copy-from referencing the base image" { _prefetch busybox target=busybox-derived + target_mt=busybox-mt-derived run_buildah bud --signature-policy ${TESTSDIR}/policy.json -t ${target} -f ${TESTSDIR}/bud/copy-from/Dockerfile3 ${TESTSDIR}/bud/copy-from + run_buildah bud --signature-policy ${TESTSDIR}/policy.json --jobs 4 -t ${target} -f ${TESTSDIR}/bud/copy-from/Dockerfile3 ${TESTSDIR}/bud/copy-from + run_buildah bud --signature-policy ${TESTSDIR}/policy.json -t ${target} -f ${TESTSDIR}/bud/copy-from/Dockerfile4 ${TESTSDIR}/bud/copy-from + run_buildah bud --no-cache --signature-policy ${TESTSDIR}/policy.json --jobs 4 -t ${target_mt} -f ${TESTSDIR}/bud/copy-from/Dockerfile4 ${TESTSDIR}/bud/copy-from + + run_buildah from --quiet ${target} + cid=$output + run_buildah mount ${cid} + root_single_job=$output + + run_buildah from --quiet ${target_mt} + cid=$output + run_buildah mount ${cid} + root_multi_job=$output + + # Check that both the version with --jobs 1 and --jobs=N have the same number of files + test $(find $root_single_job -type f | wc -l) = $(find $root_multi_job -type f | wc -l) } @test "bud with copy-from referencing the current stage" {