Skip to content

Commit

Permalink
buildah, bud: support --jobs=N for parallel execution
Browse files Browse the repository at this point in the history
it enables running multi stages Containerfiles in parallel.

Signed-off-by: Giuseppe Scrivano <[email protected]>
  • Loading branch information
giuseppe committed Jun 27, 2020
1 parent a27c525 commit 7943dea
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 12 deletions.
12 changes: 10 additions & 2 deletions cmd/buildah/bud.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error {

defaultsMountFile, _ := c.PersistentFlags().GetString("defaults-mount-file")

os, arch, err := parse.PlatformFromOptions(c)
imageOS, arch, err := parse.PlatformFromOptions(c)
if err != nil {
return err
}
Expand All @@ -301,6 +301,13 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error {
return errors.Wrapf(err, "unable to obtain decrypt config")
}

if iopts.Jobs > 1 {
stdin, err = os.OpenFile("/dev/null", os.O_RDONLY|os.O_CREATE, 0000)
if err != nil {
return err
}
}

options := imagebuildah.BuildOptions{
AddCapabilities: iopts.CapAdd,
AdditionalTags: tags,
Expand Down Expand Up @@ -328,7 +335,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error {
MaxPullPushRetries: maxPullPushRetries,
NamespaceOptions: namespaceOptions,
NoCache: iopts.NoCache,
OS: os,
OS: imageOS,
Out: stdout,
Output: output,
OutputFormat: format,
Expand All @@ -346,6 +353,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error {
Target: iopts.Target,
TransientMounts: iopts.Volumes,
OciDecryptConfig: decConfig,
Jobs: iopts.Jobs,
}

if iopts.Quiet {
Expand Down
6 changes: 6 additions & 0 deletions docs/buildah-bud.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ container technology).
Note: You can also override the default isolation type by setting the
BUILDAH\_ISOLATION environment variable. `export BUILDAH_ISOLATION=oci`

**--jobs** *N*

Run up to N concurrent stages in parallel. If the number of jobs is greater than 1,
stdin will be read from /dev/null. If 0 is specified, then there is
no limit in the number of jobs that run in parallel.

**--label** *label*

Add an image *label* (e.g. label=*value*) to the image metadata. Can be used multiple times.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/vishvananda/netlink v1.1.0 // indirect
go.etcd.io/bbolt v1.3.4
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299
)

Expand Down
3 changes: 3 additions & 0 deletions imagebuildah/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ type BuildOptions struct {
// OciDecryptConfig contains the config that can be used to decrypt an image if it is
// encrypted if non-nil. If nil, it does not attempt to decrypt an image.
OciDecryptConfig *encconfig.DecryptConfig

// Jobs is the number of stages to run in parallel. If not specified it defaults to 1.
Jobs int
}

// BuildDockerfiles parses a set of one or more Dockerfiles (which may be
Expand Down
129 changes: 119 additions & 10 deletions imagebuildah/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/containers/buildah"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/openshift/imagebuilder/dockerfile/parser"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

// builtinAllowedBuildArgs is list of built-in allowed build args. Normally we
Expand Down Expand Up @@ -102,6 +104,11 @@ type Executor struct {
maxPullPushRetries int
retryPullPushDelay time.Duration
ociDecryptConfig *encconfig.DecryptConfig
lastError error
terminatedStage map[string]struct{}
stagesLock sync.Mutex
stagesSemaphore *semaphore.Weighted
jobs int
}

// NewExecutor creates a new instance of the imagebuilder.Executor interface.
Expand Down Expand Up @@ -191,6 +198,8 @@ func NewExecutor(store storage.Store, options BuildOptions, mainNode *parser.Nod
maxPullPushRetries: options.MaxPullPushRetries,
retryPullPushDelay: options.PullPushRetryDelay,
ociDecryptConfig: options.OciDecryptConfig,
terminatedStage: make(map[string]struct{}),
jobs: options.Jobs,
}
if exec.err == nil {
exec.err = os.Stderr
Expand Down Expand Up @@ -277,6 +286,35 @@ func (b *Executor) resolveNameToImageRef(output string) (types.ImageReference, e
return imageRef, nil
}

func (b *Executor) waitForStage(ctx context.Context, name string) error {
stage := b.stages[name]
if stage == nil {
return errors.Errorf("unknown stage %q", name)
}
for {
if b.lastError != nil {
return b.lastError
}
if stage.stage == nil {
return nil
}

b.stagesLock.Lock()
_, terminated := b.terminatedStage[name]
b.stagesLock.Unlock()

if terminated {
return nil
}

b.stagesSemaphore.Release(1)
time.Sleep(time.Millisecond * 10)
if err := b.stagesSemaphore.Acquire(ctx, 1); err != nil {
return err
}
}
}

// getImageHistory returns the history of imageID.
func (b *Executor) getImageHistory(ctx context.Context, imageID string) ([]v1.History, error) {
imageRef, err := is.Transport.ParseStoreReference(b.store, "@"+imageID)
Expand Down Expand Up @@ -320,7 +358,9 @@ func (b *Executor) buildStage(ctx context.Context, cleanupStages map[int]*StageE
// remove the intermediate/build containers, regardless of
// whether or not the stage's build fails.
if b.forceRmIntermediateCtrs || !b.layers {
b.stagesLock.Lock()
cleanupStages[stage.Position] = stageExecutor
b.stagesLock.Unlock()
}

// Build this stage.
Expand All @@ -332,7 +372,9 @@ func (b *Executor) buildStage(ctx context.Context, cleanupStages map[int]*StageE
// told to delete successful intermediate/build containers for
// multi-layered builds.
if b.removeIntermediateCtrs {
cleanupStages[stage.Position] = stageExecutor
b.stagesLock.Lock()
cleanupStages[stage.Position] = stageExecutor
b.stagesLock.Unlock()
}

return imageID, ref, nil
Expand All @@ -357,12 +399,16 @@ func (b *Executor) Build(ctx context.Context, stages imagebuilder.Stages) (image
// Clean up any containers associated with the final container
// built by a stage, for stages that succeeded, since we no
// longer need their filesystem contents.

b.stagesLock.Lock()
for _, stage := range cleanupStages {
if err := stage.Delete(); err != nil {
logrus.Debugf("Failed to cleanup stage containers: %v", err)
lastErr = err
}
}
b.stagesLock.Unlock()

cleanupStages = nil
// Clean up any builders that we used to get data from images.
for _, builder := range b.containerMap {
Expand Down Expand Up @@ -447,24 +493,87 @@ func (b *Executor) Build(ctx context.Context, stages imagebuilder.Stages) (image
}
}

// Run through the build stages, one at a time.
for stageIndex, stage := range stages {
imageID, ref, err = b.buildStage(ctx, cleanupStages, stages, stageIndex)
if err != nil {
return imageID, ref, err
type Result struct {
Index int
ImageID string
Ref reference.Canonical
Error error
}

ch := make(chan Result)

jobs := int64(b.jobs)
if jobs < 0 {
return "", nil, errors.New("error building: invalid value for jobs. It must be a positive integer")
} else if jobs == 0 {
jobs = int64(len(stages))
}

b.stagesSemaphore = semaphore.NewWeighted(jobs)

var wg sync.WaitGroup
wg.Add(len(stages))

go func() {
for stageIndex := range stages {
index := stageIndex
// Acquire the sempaphore before creating the goroutine so we are sure they
// run in the specified order.
if err := b.stagesSemaphore.Acquire(ctx, 1); err != nil {
b.lastError = err
return
}
go func() {
defer b.stagesSemaphore.Release(1)
defer wg.Done()
imageID, ref, err = b.buildStage(ctx, cleanupStages, stages, index)
if err != nil {
ch <- Result{
Index: index,
Error: err,
}
return
}

ch <- Result{
Index: index,
ImageID: imageID,
Ref: ref,
Error: nil,
}
}()
}
}()
go func() {
wg.Wait()
close(ch)
}()

for r := range ch {
stage := stages[r.Index]

b.stagesLock.Lock()
b.terminatedStage[stage.Name] = struct{}{}
b.stagesLock.Unlock()

if r.Error != nil {
b.lastError = r.Error
return "", nil, r.Error
}

// If this is an intermediate stage, make a note of the ID, so
// that we can look it up later.
if stageIndex < len(stages)-1 && imageID != "" {
b.imageMap[stage.Name] = imageID
if r.Index < len(stages)-1 && r.ImageID != "" {
b.imageMap[stage.Name] = r.ImageID
// We're not populating the cache with intermediate
// images, so add this one to the list of images that
// we'll remove later.
if !b.layers {
cleanupImages = append(cleanupImages, imageID)
cleanupImages = append(cleanupImages, r.ImageID)
}
imageID = ""
}
if r.Index == len(stages)-1 {
imageID = r.ImageID
}
}

Expand Down
3 changes: 3 additions & 0 deletions imagebuildah/stage_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,9 @@ func (s *StageExecutor) Execute(ctx context.Context, base string) (imgID string,
return "", nil, errors.Errorf("%s: invalid --from flag, should be --from=<name|stage>", command)
}
if otherStage, ok := s.executor.stages[arr[1]]; ok && otherStage.index < s.index {
if err := s.executor.waitForStage(ctx, arr[1]); err != nil {
return "", nil, err
}
mountPoint = otherStage.mountPoint
} else if mountPoint, err = s.getImageRootfs(ctx, arr[1]); err != nil {
return "", nil, errors.Errorf("%s --from=%s: no stage or image found with that name", command, arr[1])
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type BudResults struct {
Tag []string
Target string
TLSVerify bool
Jobs int
}

// FromAndBugResults represents the results for common flags
Expand Down Expand Up @@ -182,6 +183,7 @@ func GetBudFlags(flags *BudResults) pflag.FlagSet {
fs.StringArrayVarP(&flags.Tag, "tag", "t", []string{}, "tagged `name` to apply to the built image")
fs.StringVar(&flags.Target, "target", "", "set the target build stage to build")
fs.BoolVar(&flags.TLSVerify, "tls-verify", true, "require HTTPS and verify certificates when accessing the registry")
fs.IntVar(&flags.Jobs, "jobs", 1, "how many stages to run in parallel")
return fs
}

Expand Down
17 changes: 17 additions & 0 deletions tests/bud.bats
Original file line number Diff line number Diff line change
Expand Up @@ -1382,8 +1382,25 @@ function _test_http() {
@test "bud with copy-from referencing the base image" {
_prefetch busybox
target=busybox-derived
target_mt=busybox-mt-derived
run_buildah bud --signature-policy ${TESTSDIR}/policy.json -t ${target} -f ${TESTSDIR}/bud/copy-from/Dockerfile3 ${TESTSDIR}/bud/copy-from
run_buildah bud --signature-policy ${TESTSDIR}/policy.json --jobs 4 -t ${target} -f ${TESTSDIR}/bud/copy-from/Dockerfile3 ${TESTSDIR}/bud/copy-from

run_buildah bud --signature-policy ${TESTSDIR}/policy.json -t ${target} -f ${TESTSDIR}/bud/copy-from/Dockerfile4 ${TESTSDIR}/bud/copy-from
run_buildah bud --no-cache --signature-policy ${TESTSDIR}/policy.json --jobs 4 -t ${target_mt} -f ${TESTSDIR}/bud/copy-from/Dockerfile4 ${TESTSDIR}/bud/copy-from

run_buildah from --quiet ${target}
cid=$output
run_buildah mount ${cid}
root_single_job=$output

run_buildah from --quiet ${target_mt}
cid=$output
run_buildah mount ${cid}
root_multi_job=$output

# Check that both the version with --jobs 1 and --jobs=N have the same number of files
test $(find $root_single_job -type f | wc -l) = $(find $root_multi_job -type f | wc -l)
}

@test "bud with copy-from referencing the current stage" {
Expand Down

0 comments on commit 7943dea

Please sign in to comment.