diff --git a/executor/engine.go b/executor/engine.go index b3fba545..e3c78ca0 100644 --- a/executor/engine.go +++ b/executor/engine.go @@ -70,7 +70,7 @@ type Engine interface { ExecService(context.Context, *pipeline.Container) error // StreamService defines a function that // tails the output for a service. - StreamService(context.Context, *pipeline.Container) error + StreamService(context.Context, *pipeline.Container, chan struct{}) error // DestroyService defines a function that // cleans up the service after execution. DestroyService(context.Context, *pipeline.Container) error @@ -103,7 +103,7 @@ type Engine interface { ExecStep(context.Context, *pipeline.Container) error // StreamStep defines a function that // tails the output for a step. - StreamStep(context.Context, *pipeline.Container) error + StreamStep(context.Context, *pipeline.Container, chan struct{}) error // DestroyStep defines a function that // cleans up the step after execution. DestroyStep(context.Context, *pipeline.Container) error diff --git a/executor/linux/secret.go b/executor/linux/secret.go index ad2c2b25..a879b868 100644 --- a/executor/linux/secret.go +++ b/executor/linux/secret.go @@ -19,6 +19,7 @@ import ( "github.com/go-vela/worker/internal/step" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) // secretSvc handles communication with secret processes during a build. @@ -128,21 +129,44 @@ func (s *secretSvc) exec(ctx context.Context, p *pipeline.SecretSlice) error { // https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField logger := s.client.Logger.WithField("secret", _secret.Origin.Name) - logger.Debug("running container") - // run the runtime container - err := s.client.Runtime.RunContainer(ctx, _secret.Origin, s.client.pipeline) - if err != nil { - return err - } + // create a channel so Runtime can optionally synchronize RunContainer and TailContainer + runtimeChannel := make(chan struct{}) + + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + run, runCtx := errgroup.WithContext(ctx) - go func() { + run.Go(func() error { + logger.Debug("running container") + // run the runtime container + err := s.client.Runtime.RunContainer(runCtx, _secret.Origin, s.client.pipeline, runtimeChannel) + if err != nil { + return err + } + return nil + }) + + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + logs, logCtx := errgroup.WithContext(ctx) + + logs.Go(func() error { logger.Debug("stream logs for container") // stream logs from container - err = s.client.secret.stream(ctx, _secret.Origin) + err = s.client.secret.stream(logCtx, _secret.Origin, runtimeChannel) if err != nil { logger.Error(err) } - }() + return nil + }) + + // ensure that RunContainer has finished requesting the runtime container + err = run.Wait() + if err != nil { + return err + } logger.Debug("waiting for container") // wait for the runtime container @@ -250,7 +274,7 @@ func (s *secretSvc) pull(secret *pipeline.Secret) (*library.Secret, error) { } // stream tails the output for a secret plugin. -func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container) error { +func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error { // stream all the logs to the init step _log, err := step.LoadLogs(s.client.init, &s.client.stepLogs) if err != nil { @@ -288,7 +312,7 @@ func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container) error { logger.Debug("tailing container") // tail the runtime container - rc, err := s.client.Runtime.TailContainer(ctx, ctn) + rc, err := s.client.Runtime.TailContainer(ctx, ctn, runtimeChannel) if err != nil { return err } diff --git a/executor/linux/secret_test.go b/executor/linux/secret_test.go index 66ebd4da..70d5808a 100644 --- a/executor/linux/secret_test.go +++ b/executor/linux/secret_test.go @@ -523,6 +523,11 @@ func TestLinux_Secret_stream(t *testing.T) { }, } + // these tests use docker runtime, so simulate docker runtime behavior by + // passing a closed channel to let TailContainer start right away + runtimeChannel := make(chan struct{}) + close(runtimeChannel) + // run tests for _, test := range tests { _engine, err := New( @@ -540,7 +545,7 @@ func TestLinux_Secret_stream(t *testing.T) { // add init container info to client _ = _engine.CreateBuild(context.Background()) - err = _engine.secret.stream(context.Background(), test.container) + err = _engine.secret.stream(context.Background(), test.container, runtimeChannel) if test.failure { if err == nil { diff --git a/executor/linux/service.go b/executor/linux/service.go index 66fa3309..a5a6ea21 100644 --- a/executor/linux/service.go +++ b/executor/linux/service.go @@ -136,12 +136,23 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error // https://pkg.go.dev/github.com/go-vela/worker/internal/service#Snapshot defer func() { service.Snapshot(ctn, c.build, c.Vela, c.Logger, c.repo, _service) }() - logger.Debug("running container") - // run the runtime container - err = c.Runtime.RunContainer(ctx, ctn, c.pipeline) - if err != nil { - return err - } + // create a channel so Runtime can optionally synchronize RunContainer and TailContainer + runtimeChannel := make(chan struct{}) + + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + run, runCtx := errgroup.WithContext(ctx) + + run.Go(func() error { + logger.Debug("running container") + // run the runtime container + err = c.Runtime.RunContainer(runCtx, ctn, c.pipeline, runtimeChannel) + if err != nil { + return err + } + return nil + }) // create an error group with the parent context // @@ -151,21 +162,26 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error logs.Go(func() error { logger.Debug("streaming logs for container") // stream logs from container - err := c.StreamService(logCtx, ctn) + err := c.StreamService(logCtx, ctn, runtimeChannel) if err != nil { logger.Error(err) } - return nil }) + // ensure that RunContainer has finished requesting the runtime container + err = run.Wait() + if err != nil { + return err + } + return nil } // StreamService tails the output for a service. // // nolint: funlen // ignore function length -func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) error { +func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error { // update engine logger with service metadata // // https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField @@ -181,7 +197,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err defer func() { // tail the runtime container - rc, err := c.Runtime.TailContainer(ctx, ctn) + rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel) if err != nil { logger.Errorf("unable to tail container output for upload: %v", err) @@ -221,7 +237,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err logger.Debug("tailing container") // tail the runtime container - rc, err := c.Runtime.TailContainer(ctx, ctn) + rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel) if err != nil { return err } diff --git a/executor/linux/service_test.go b/executor/linux/service_test.go index ab9fee11..81b72111 100644 --- a/executor/linux/service_test.go +++ b/executor/linux/service_test.go @@ -352,6 +352,11 @@ func TestLinux_StreamService(t *testing.T) { }, } + // these tests use docker runtime, so simulate docker runtime behavior by + // passing a closed channel to let TailContainer start right away + runtimeChannel := make(chan struct{}) + close(runtimeChannel) + // run tests for _, test := range tests { _engine, err := New( @@ -371,7 +376,7 @@ func TestLinux_StreamService(t *testing.T) { _engine.serviceLogs.Store(test.container.ID, new(library.Log)) } - err = _engine.StreamService(context.Background(), test.container) + err = _engine.StreamService(context.Background(), test.container, runtimeChannel) if test.failure { if err == nil { diff --git a/executor/linux/step.go b/executor/linux/step.go index f823aede..1e0a5905 100644 --- a/executor/linux/step.go +++ b/executor/linux/step.go @@ -148,12 +148,23 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error { // https://pkg.go.dev/github.com/go-vela/worker/internal/step#Snapshot defer func() { step.Snapshot(ctn, c.build, c.Vela, c.Logger, c.repo, _step) }() - logger.Debug("running container") - // run the runtime container - err = c.Runtime.RunContainer(ctx, ctn, c.pipeline) - if err != nil { - return err - } + // create a channel so Runtime can optionally synchronize RunContainer and TailContainer + runtimeChannel := make(chan struct{}) + + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + run, runCtx := errgroup.WithContext(ctx) + + run.Go(func() error { + logger.Debug("running container") + // run the runtime container + err = c.Runtime.RunContainer(runCtx, ctn, c.pipeline, runtimeChannel) + if err != nil { + return err + } + return nil + }) // create an error group with the parent context // @@ -163,7 +174,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error { logs.Go(func() error { logger.Debug("streaming logs for container") // stream logs from container - err := c.StreamStep(logCtx, ctn) + err := c.StreamStep(logCtx, ctn, runtimeChannel) if err != nil { logger.Error(err) } @@ -171,6 +182,12 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error { return nil }) + // ensure that RunContainer has finished requesting the runtime container + err = run.Wait() + if err != nil { + return err + } + // do not wait for detached containers if ctn.Detach { return nil @@ -196,7 +213,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error { // StreamStep tails the output for a step. // // nolint: funlen // ignore function length -func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error { +func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error { // TODO: remove hardcoded reference if ctn.Name == "init" { return nil @@ -219,7 +236,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error defer func() { // tail the runtime container - rc, err := c.Runtime.TailContainer(ctx, ctn) + rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel) if err != nil { logger.Errorf("unable to tail container output for upload: %v", err) @@ -264,7 +281,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error logger.Debug("tailing container") // tail the runtime container - rc, err := c.Runtime.TailContainer(ctx, ctn) + rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel) if err != nil { return err } diff --git a/executor/linux/step_test.go b/executor/linux/step_test.go index ace74ba0..71d9a8e7 100644 --- a/executor/linux/step_test.go +++ b/executor/linux/step_test.go @@ -397,6 +397,11 @@ func TestLinux_StreamStep(t *testing.T) { }, } + // these tests use docker runtime, so simulate docker runtime behavior by + // passing a closed channel to let TailContainer start right away + runtimeChannel := make(chan struct{}) + close(runtimeChannel) + // run tests for _, test := range tests { _engine, err := New( @@ -417,7 +422,7 @@ func TestLinux_StreamStep(t *testing.T) { _engine.stepLogs.Store(test.container.ID, new(library.Log)) } - err = _engine.StreamStep(context.Background(), test.container) + err = _engine.StreamStep(context.Background(), test.container, runtimeChannel) if test.failure { if err == nil { diff --git a/executor/local/service.go b/executor/local/service.go index bc5269c5..f3047594 100644 --- a/executor/local/service.go +++ b/executor/local/service.go @@ -16,6 +16,8 @@ import ( "github.com/go-vela/types/constants" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" + + "golang.org/x/sync/errgroup" ) // create a service logging pattern. @@ -90,27 +92,50 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error // https://pkg.go.dev/github.com/go-vela/worker/internal/service#Snapshot defer func() { service.Snapshot(ctn, c.build, nil, nil, nil, _service) }() - // run the runtime container - err = c.Runtime.RunContainer(ctx, ctn, c.pipeline) - if err != nil { - return err - } + // create a channel so Runtime can optionally synchronize RunContainer and TailContainer + runtimeChannel := make(chan struct{}) - go func() { + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + run, runCtx := errgroup.WithContext(ctx) + + run.Go(func() error { + // run the runtime container + err = c.Runtime.RunContainer(runCtx, ctn, c.pipeline, runtimeChannel) + if err != nil { + return err + } + return nil + }) + + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + logs, logCtx := errgroup.WithContext(ctx) + + logs.Go(func() error { // stream logs from container - err := c.StreamService(context.Background(), ctn) + err := c.StreamService(logCtx, ctn, runtimeChannel) if err != nil { fmt.Fprintln(os.Stdout, "unable to stream logs for service:", err) } - }() + return nil + }) + + // ensure that RunContainer has finished requesting the runtime container + err = run.Wait() + if err != nil { + return err + } return nil } // StreamService tails the output for a service. -func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) error { +func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error { // tail the runtime container - rc, err := c.Runtime.TailContainer(ctx, ctn) + rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel) if err != nil { return err } diff --git a/executor/local/service_test.go b/executor/local/service_test.go index c6bd1ae9..118dc3be 100644 --- a/executor/local/service_test.go +++ b/executor/local/service_test.go @@ -277,6 +277,11 @@ func TestLocal_StreamService(t *testing.T) { }, } + // these tests use docker runtime, so simulate docker runtime behavior by + // passing a closed channel to let TailContainer start right away + runtimeChannel := make(chan struct{}) + close(runtimeChannel) + // run tests for _, test := range tests { _engine, err := New( @@ -290,7 +295,7 @@ func TestLocal_StreamService(t *testing.T) { t.Errorf("unable to create executor engine: %v", err) } - err = _engine.StreamService(context.Background(), test.container) + err = _engine.StreamService(context.Background(), test.container, runtimeChannel) if test.failure { if err == nil { diff --git a/executor/local/step.go b/executor/local/step.go index 032ccbbf..3ec2788b 100644 --- a/executor/local/step.go +++ b/executor/local/step.go @@ -15,6 +15,8 @@ import ( "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" "github.com/go-vela/worker/internal/step" + + "golang.org/x/sync/errgroup" ) // create a step logging pattern. @@ -97,20 +99,43 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error { // https://pkg.go.dev/github.com/go-vela/worker/internal/step#Snapshot defer func() { step.Snapshot(ctn, c.build, nil, nil, nil, _step) }() - // run the runtime container - err = c.Runtime.RunContainer(ctx, ctn, c.pipeline) - if err != nil { - return err - } + // create a channel so Runtime can optionally synchronize RunContainer and TailContainer + runtimeChannel := make(chan struct{}) - go func() { + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + run, runCtx := errgroup.WithContext(ctx) + + run.Go(func() error { + // run the runtime container + err = c.Runtime.RunContainer(runCtx, ctn, c.pipeline, runtimeChannel) + if err != nil { + return err + } + return nil + }) + + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + logs, logCtx := errgroup.WithContext(ctx) + + logs.Go(func() error { // stream logs from container - err := c.StreamStep(context.Background(), ctn) + err := c.StreamStep(logCtx, ctn, runtimeChannel) if err != nil { // TODO: Should this be changed or removed? fmt.Println(err) } - }() + return nil + }) + + // ensure that RunContainer has finished requesting the runtime container + err = run.Wait() + if err != nil { + return err + } // do not wait for detached containers if ctn.Detach { @@ -133,14 +158,14 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error { } // StreamStep tails the output for a step. -func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error { +func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error { // TODO: remove hardcoded reference if ctn.Name == "init" { return nil } // tail the runtime container - rc, err := c.Runtime.TailContainer(ctx, ctn) + rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel) if err != nil { return err } diff --git a/executor/local/step_test.go b/executor/local/step_test.go index c6dad3b4..2ccda9bf 100644 --- a/executor/local/step_test.go +++ b/executor/local/step_test.go @@ -326,6 +326,11 @@ func TestLocal_StreamStep(t *testing.T) { }, } + // these tests use docker runtime, so simulate docker runtime behavior by + // passing a closed channel to let TailContainer start right away + runtimeChannel := make(chan struct{}) + close(runtimeChannel) + // run tests for _, test := range tests { _engine, err := New( @@ -339,7 +344,7 @@ func TestLocal_StreamStep(t *testing.T) { t.Errorf("unable to create executor engine: %v", err) } - err = _engine.StreamStep(context.Background(), test.container) + err = _engine.StreamStep(context.Background(), test.container, runtimeChannel) if test.failure { if err == nil { diff --git a/runtime/docker/container.go b/runtime/docker/container.go index 9f5106ec..45f5f657 100644 --- a/runtime/docker/container.go +++ b/runtime/docker/container.go @@ -89,9 +89,11 @@ func (c *client) RemoveContainer(ctx context.Context, ctn *pipeline.Container) e } // RunContainer creates and starts the pipeline container. -func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *pipeline.Build) error { +func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *pipeline.Build, runtimeChannel chan struct{}) error { c.Logger.Tracef("running container %s", ctn.ID) + defer close(runtimeChannel) + // allocate new container config from pipeline container containerConf := ctnConfig(ctn) // allocate new host config with volume data @@ -233,7 +235,10 @@ func (c *client) SetupContainer(ctx context.Context, ctn *pipeline.Container) er } // TailContainer captures the logs for the pipeline container. -func (c *client) TailContainer(ctx context.Context, ctn *pipeline.Container) (io.ReadCloser, error) { +func (c *client) TailContainer(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) (io.ReadCloser, error) { + // wait for RunContainer to finish before continuing + <-runtimeChannel + c.Logger.Tracef("tailing output for container %s", ctn.ID) // create options for capturing container logs diff --git a/runtime/docker/container_test.go b/runtime/docker/container_test.go index da704f0f..4e966cd4 100644 --- a/runtime/docker/container_test.go +++ b/runtime/docker/container_test.go @@ -208,7 +208,9 @@ func TestDocker_RunContainer(t *testing.T) { _engine.config.Volumes = test.volumes } - err = _engine.RunContainer(context.Background(), test.container, test.pipeline) + runtimeChannel := make(chan struct{}) + + err = _engine.RunContainer(context.Background(), test.container, test.pipeline, runtimeChannel) if test.failure { if err == nil { @@ -330,9 +332,13 @@ func TestDocker_TailContainer(t *testing.T) { }, } + // pass a closed channel to let TailContainer start right away + runtimeChannel := make(chan struct{}) + close(runtimeChannel) + // run tests for _, test := range tests { - _, err = _engine.TailContainer(context.Background(), test.container) + _, err = _engine.TailContainer(context.Background(), test.container, runtimeChannel) if test.failure { if err == nil { diff --git a/runtime/engine.go b/runtime/engine.go index 918f55c1..4d001f68 100644 --- a/runtime/engine.go +++ b/runtime/engine.go @@ -46,13 +46,13 @@ type Engine interface { RemoveContainer(context.Context, *pipeline.Container) error // RunContainer defines a function that creates // and starts the pipeline container. - RunContainer(context.Context, *pipeline.Container, *pipeline.Build) error + RunContainer(context.Context, *pipeline.Container, *pipeline.Build, chan struct{}) error // SetupContainer defines a function that prepares // the image for the pipeline container. SetupContainer(context.Context, *pipeline.Container) error // TailContainer defines a function that captures // the logs on the pipeline container. - TailContainer(context.Context, *pipeline.Container) (io.ReadCloser, error) + TailContainer(context.Context, *pipeline.Container, chan struct{}) (io.ReadCloser, error) // WaitContainer defines a function that blocks // until the pipeline container completes. WaitContainer(context.Context, *pipeline.Container) error diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index d40c1a9c..8b3ba158 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -72,7 +72,9 @@ func (c *client) RemoveContainer(ctx context.Context, ctn *pipeline.Container) e } // RunContainer creates and starts the pipeline container. -func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *pipeline.Build) error { +func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *pipeline.Build, runtimeChannel chan struct{}) error { + // wait for TailContainer to establish logs stream before continuing + <-runtimeChannel c.Logger.Tracef("running container %s", ctn.ID) // parse image from step _image, err := image.ParseWithError(ctn.Image) @@ -222,9 +224,11 @@ func (c *client) setupContainerEnvironment(ctn *pipeline.Container) error { } // TailContainer captures the logs for the pipeline container. -func (c *client) TailContainer(ctx context.Context, ctn *pipeline.Container) (io.ReadCloser, error) { +func (c *client) TailContainer(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) (io.ReadCloser, error) { c.Logger.Tracef("tailing output for container %s", ctn.ID) + defer close(runtimeChannel) + // create object to store container logs var logs io.ReadCloser @@ -243,7 +247,7 @@ func (c *client) TailContainer(ctx context.Context, ctn *pipeline.Container) (io // Pods get deleted after job completion, and names for // pod+container don't get reused. So, previous // should only retrieve logs for the current build step. - Previous: true, + //Previous: true, Timestamps: false, } diff --git a/runtime/kubernetes/container_test.go b/runtime/kubernetes/container_test.go index 46a53950..80da847f 100644 --- a/runtime/kubernetes/container_test.go +++ b/runtime/kubernetes/container_test.go @@ -115,6 +115,10 @@ func TestKubernetes_RunContainer(t *testing.T) { }, } + // pass a closed channel to let RunContainer start right away + runtimeChannel := make(chan struct{}) + close(runtimeChannel) + // run tests for _, test := range tests { _engine, err := NewMock(test.pod) @@ -126,7 +130,7 @@ func TestKubernetes_RunContainer(t *testing.T) { _engine.config.Volumes = test.volumes } - err = _engine.RunContainer(context.Background(), test.container, test.pipeline) + err = _engine.RunContainer(context.Background(), test.container, test.pipeline, runtimeChannel) if test.failure { if err == nil {