Skip to content

Commit

Permalink
Merge #2418
Browse files Browse the repository at this point in the history
2418: buildah, bud: support --jobs=N for parallel execution r=rhatdan a=giuseppe

<!--
Please label this pull request according to what type of issue you are
addressing, especially if this is a release targeted pull request.

Uncomment only one `/kind <>` line, hit enter to put that in a new line, and
remove leading whitespace from that line:
-->

/kind feature

#### What this PR does / why we need it:

it enables running multi stages Containerfiles in parallel

#### How to verify it

buildah bud --jobs=$(nproc) .....

#### Which issue(s) this PR fixes:

None


#### Special notes for your reviewer:

#### Does this PR introduce a user-facing change?

```release-note
add support for running multi stages Containerfiles in parallel
```



Co-authored-by: Giuseppe Scrivano <[email protected]>
  • Loading branch information
bors[bot] and giuseppe committed Jun 30, 2020
2 parents 9fd14e6 + f8f6565 commit 1289798
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 43 deletions.
12 changes: 10 additions & 2 deletions cmd/buildah/bud.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error {

defaultsMountFile, _ := c.PersistentFlags().GetString("defaults-mount-file")

os, arch, err := parse.PlatformFromOptions(c)
imageOS, arch, err := parse.PlatformFromOptions(c)
if err != nil {
return err
}
Expand All @@ -301,6 +301,13 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error {
return errors.Wrapf(err, "unable to obtain decrypt config")
}

if iopts.Jobs > 1 {
stdin, err = os.OpenFile("/dev/null", os.O_RDONLY|os.O_CREATE, 0000)
if err != nil {
return err
}
}

options := imagebuildah.BuildOptions{
AddCapabilities: iopts.CapAdd,
AdditionalTags: tags,
Expand Down Expand Up @@ -328,7 +335,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error {
MaxPullPushRetries: maxPullPushRetries,
NamespaceOptions: namespaceOptions,
NoCache: iopts.NoCache,
OS: os,
OS: imageOS,
Out: stdout,
Output: output,
OutputFormat: format,
Expand All @@ -346,6 +353,7 @@ func budCmd(c *cobra.Command, inputArgs []string, iopts budOptions) error {
Target: iopts.Target,
TransientMounts: iopts.Volumes,
OciDecryptConfig: decConfig,
Jobs: iopts.Jobs,
}

if iopts.Quiet {
Expand Down
4 changes: 2 additions & 2 deletions contrib/cirrus/lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ execute_local_registry() {
cp $authdirpath/domain.crt $certdirpath/localhost:5000/domain.crt
echo "Creating http credentials file"
podman run --entrypoint htpasswd $REGISTRY_FQIN \
-Bbn testuser testpassword \
podman run --entrypoint sh $REGISTRY_FQIN \
-c 'apk add apache2-utils > /dev/null && htpasswd -Bbn testuser testpassword' \
> $authdirpath/htpasswd
echo "Starting up the local 'registry' container"
Expand Down
6 changes: 6 additions & 0 deletions docs/buildah-bud.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ container technology).
Note: You can also override the default isolation type by setting the
BUILDAH\_ISOLATION environment variable. `export BUILDAH_ISOLATION=oci`

**--jobs** *N*

Run up to N concurrent stages in parallel. If the number of jobs is greater than 1,
stdin will be read from /dev/null. If 0 is specified, then there is
no limit in the number of jobs that run in parallel.

**--label** *label*

Add an image *label* (e.g. label=*value*) to the image metadata. Can be used multiple times.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/vishvananda/netlink v1.1.0 // indirect
go.etcd.io/bbolt v1.3.4
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299
)

Expand Down
3 changes: 3 additions & 0 deletions imagebuildah/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ type BuildOptions struct {
// OciDecryptConfig contains the config that can be used to decrypt an image if it is
// encrypted if non-nil. If nil, it does not attempt to decrypt an image.
OciDecryptConfig *encconfig.DecryptConfig

// Jobs is the number of stages to run in parallel. If not specified it defaults to 1.
Jobs int
}

// BuildDockerfiles parses a set of one or more Dockerfiles (which may be
Expand Down
193 changes: 154 additions & 39 deletions imagebuildah/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/containers/buildah"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/openshift/imagebuilder/dockerfile/parser"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

// builtinAllowedBuildArgs is list of built-in allowed build args. Normally we
Expand Down Expand Up @@ -102,6 +104,11 @@ type Executor struct {
maxPullPushRetries int
retryPullPushDelay time.Duration
ociDecryptConfig *encconfig.DecryptConfig
lastError error
terminatedStage map[string]struct{}
stagesLock sync.Mutex
stagesSemaphore *semaphore.Weighted
jobs int
}

// NewExecutor creates a new instance of the imagebuilder.Executor interface.
Expand Down Expand Up @@ -191,6 +198,8 @@ func NewExecutor(store storage.Store, options BuildOptions, mainNode *parser.Nod
maxPullPushRetries: options.MaxPullPushRetries,
retryPullPushDelay: options.PullPushRetryDelay,
ociDecryptConfig: options.OciDecryptConfig,
terminatedStage: make(map[string]struct{}),
jobs: options.Jobs,
}
if exec.err == nil {
exec.err = os.Stderr
Expand Down Expand Up @@ -277,6 +286,35 @@ func (b *Executor) resolveNameToImageRef(output string) (types.ImageReference, e
return imageRef, nil
}

func (b *Executor) waitForStage(ctx context.Context, name string) error {
stage := b.stages[name]
if stage == nil {
return errors.Errorf("unknown stage %q", name)
}
for {
if b.lastError != nil {
return b.lastError
}
if stage.stage == nil {
return nil
}

b.stagesLock.Lock()
_, terminated := b.terminatedStage[name]
b.stagesLock.Unlock()

if terminated {
return nil
}

b.stagesSemaphore.Release(1)
time.Sleep(time.Millisecond * 10)
if err := b.stagesSemaphore.Acquire(ctx, 1); err != nil {
return err
}
}
}

// getImageHistory returns the history of imageID.
func (b *Executor) getImageHistory(ctx context.Context, imageID string) ([]v1.History, error) {
imageRef, err := is.Transport.ParseStoreReference(b.store, "@"+imageID)
Expand All @@ -295,6 +333,53 @@ func (b *Executor) getImageHistory(ctx context.Context, imageID string) ([]v1.Hi
return oci.History, nil
}

func (b *Executor) buildStage(ctx context.Context, cleanupStages map[int]*StageExecutor, stages imagebuilder.Stages, stageIndex int) (imageID string, ref reference.Canonical, err error) {
stage := stages[stageIndex]
ib := stage.Builder
node := stage.Node
base, err := ib.From(node)

// If this is the last stage, then the image that we produce at
// its end should be given the desired output name.
output := ""
if stageIndex == len(stages)-1 {
output = b.output
}

if err != nil {
logrus.Debugf("Build(node.Children=%#v)", node.Children)
return "", nil, err
}

stageExecutor := b.startStage(&stage, len(stages), output)

// If this a single-layer build, or if it's a multi-layered
// build and b.forceRmIntermediateCtrs is set, make sure we
// remove the intermediate/build containers, regardless of
// whether or not the stage's build fails.
if b.forceRmIntermediateCtrs || !b.layers {
b.stagesLock.Lock()
cleanupStages[stage.Position] = stageExecutor
b.stagesLock.Unlock()
}

// Build this stage.
if imageID, ref, err = stageExecutor.Execute(ctx, base); err != nil {
return "", nil, err
}

// The stage succeeded, so remove its build container if we're
// told to delete successful intermediate/build containers for
// multi-layered builds.
if b.removeIntermediateCtrs {
b.stagesLock.Lock()
cleanupStages[stage.Position] = stageExecutor
b.stagesLock.Unlock()
}

return imageID, ref, nil
}

// Build takes care of the details of running Prepare/Execute/Commit/Delete
// over each of the one or more parsed Dockerfiles and stages.
func (b *Executor) Build(ctx context.Context, stages imagebuilder.Stages) (imageID string, ref reference.Canonical, err error) {
Expand All @@ -314,12 +399,16 @@ func (b *Executor) Build(ctx context.Context, stages imagebuilder.Stages) (image
// Clean up any containers associated with the final container
// built by a stage, for stages that succeeded, since we no
// longer need their filesystem contents.

b.stagesLock.Lock()
for _, stage := range cleanupStages {
if err := stage.Delete(); err != nil {
logrus.Debugf("Failed to cleanup stage containers: %v", err)
lastErr = err
}
}
b.stagesLock.Unlock()

cleanupStages = nil
// Clean up any builders that we used to get data from images.
for _, builder := range b.containerMap {
Expand Down Expand Up @@ -404,61 +493,87 @@ func (b *Executor) Build(ctx context.Context, stages imagebuilder.Stages) (image
}
}

// Run through the build stages, one at a time.
for stageIndex, stage := range stages {
var lastErr error
type Result struct {
Index int
ImageID string
Ref reference.Canonical
Error error
}

ib := stage.Builder
node := stage.Node
base, err := ib.From(node)
if err != nil {
logrus.Debugf("Build(node.Children=%#v)", node.Children)
return "", nil, err
}
ch := make(chan Result)

// If this is the last stage, then the image that we produce at
// its end should be given the desired output name.
output := ""
if stageIndex == len(stages)-1 {
output = b.output
}
jobs := int64(b.jobs)
if jobs < 0 {
return "", nil, errors.New("error building: invalid value for jobs. It must be a positive integer")
} else if jobs == 0 {
jobs = int64(len(stages))
}

stageExecutor := b.startStage(&stage, len(stages), output)
b.stagesSemaphore = semaphore.NewWeighted(jobs)

// If this a single-layer build, or if it's a multi-layered
// build and b.forceRmIntermediateCtrs is set, make sure we
// remove the intermediate/build containers, regardless of
// whether or not the stage's build fails.
if b.forceRmIntermediateCtrs || !b.layers {
cleanupStages[stage.Position] = stageExecutor
}
var wg sync.WaitGroup
wg.Add(len(stages))

// Build this stage.
if imageID, ref, err = stageExecutor.Execute(ctx, base); err != nil {
lastErr = err
}
if lastErr != nil {
return "", nil, lastErr
go func() {
for stageIndex := range stages {
index := stageIndex
// Acquire the sempaphore before creating the goroutine so we are sure they
// run in the specified order.
if err := b.stagesSemaphore.Acquire(ctx, 1); err != nil {
b.lastError = err
return
}
go func() {
defer b.stagesSemaphore.Release(1)
defer wg.Done()
imageID, ref, err = b.buildStage(ctx, cleanupStages, stages, index)
if err != nil {
ch <- Result{
Index: index,
Error: err,
}
return
}

ch <- Result{
Index: index,
ImageID: imageID,
Ref: ref,
Error: nil,
}
}()
}
}()
go func() {
wg.Wait()
close(ch)
}()

// The stage succeeded, so remove its build container if we're
// told to delete successful intermediate/build containers for
// multi-layered builds.
if b.removeIntermediateCtrs {
cleanupStages[stage.Position] = stageExecutor
for r := range ch {
stage := stages[r.Index]

b.stagesLock.Lock()
b.terminatedStage[stage.Name] = struct{}{}
b.stagesLock.Unlock()

if r.Error != nil {
b.lastError = r.Error
return "", nil, r.Error
}

// If this is an intermediate stage, make a note of the ID, so
// that we can look it up later.
if stageIndex < len(stages)-1 && imageID != "" {
b.imageMap[stage.Name] = imageID
if r.Index < len(stages)-1 && r.ImageID != "" {
b.imageMap[stage.Name] = r.ImageID
// We're not populating the cache with intermediate
// images, so add this one to the list of images that
// we'll remove later.
if !b.layers {
cleanupImages = append(cleanupImages, imageID)
cleanupImages = append(cleanupImages, r.ImageID)
}
imageID = ""
}
if r.Index == len(stages)-1 {
imageID = r.ImageID
}
}

Expand Down
3 changes: 3 additions & 0 deletions imagebuildah/stage_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,9 @@ func (s *StageExecutor) Execute(ctx context.Context, base string) (imgID string,
return "", nil, errors.Errorf("%s: invalid --from flag, should be --from=<name|stage>", command)
}
if otherStage, ok := s.executor.stages[arr[1]]; ok && otherStage.index < s.index {
if err := s.executor.waitForStage(ctx, otherStage.name); err != nil {
return "", nil, err
}
mountPoint = otherStage.mountPoint
} else if mountPoint, err = s.getImageRootfs(ctx, arr[1]); err != nil {
return "", nil, errors.Errorf("%s --from=%s: no stage or image found with that name", command, arr[1])
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type BudResults struct {
Tag []string
Target string
TLSVerify bool
Jobs int
}

// FromAndBugResults represents the results for common flags
Expand Down Expand Up @@ -182,6 +183,7 @@ func GetBudFlags(flags *BudResults) pflag.FlagSet {
fs.StringArrayVarP(&flags.Tag, "tag", "t", []string{}, "tagged `name` to apply to the built image")
fs.StringVar(&flags.Target, "target", "", "set the target build stage to build")
fs.BoolVar(&flags.TLSVerify, "tls-verify", true, "require HTTPS and verify certificates when accessing the registry")
fs.IntVar(&flags.Jobs, "jobs", 1, "how many stages to run in parallel")
return fs
}

Expand Down
Loading

0 comments on commit 1289798

Please sign in to comment.