Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kubernetes): Coordinate Runtime's RunContainer/TailContainer via a channel #286

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Engine interface {
ExecService(context.Context, *pipeline.Container) error
// StreamService defines a function that
// tails the output for a service.
StreamService(context.Context, *pipeline.Container) error
StreamService(context.Context, *pipeline.Container, chan struct{}) error
// DestroyService defines a function that
// cleans up the service after execution.
DestroyService(context.Context, *pipeline.Container) error
Expand Down Expand Up @@ -103,7 +103,7 @@ type Engine interface {
ExecStep(context.Context, *pipeline.Container) error
// StreamStep defines a function that
// tails the output for a step.
StreamStep(context.Context, *pipeline.Container) error
StreamStep(context.Context, *pipeline.Container, chan struct{}) error
// DestroyStep defines a function that
// cleans up the step after execution.
DestroyStep(context.Context, *pipeline.Container) error
Expand Down
46 changes: 35 additions & 11 deletions executor/linux/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-vela/worker/internal/step"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

// secretSvc handles communication with secret processes during a build.
Expand Down Expand Up @@ -128,21 +129,44 @@ func (s *secretSvc) exec(ctx context.Context, p *pipeline.SecretSlice) error {
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
logger := s.client.Logger.WithField("secret", _secret.Origin.Name)

logger.Debug("running container")
// run the runtime container
err := s.client.Runtime.RunContainer(ctx, _secret.Origin, s.client.pipeline)
if err != nil {
return err
}
// create a channel so Runtime can optionally synchronize RunContainer and TailContainer
runtimeChannel := make(chan struct{})

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
run, runCtx := errgroup.WithContext(ctx)

go func() {
run.Go(func() error {
logger.Debug("running container")
// run the runtime container
err := s.client.Runtime.RunContainer(runCtx, _secret.Origin, s.client.pipeline, runtimeChannel)
if err != nil {
return err
}
return nil
})

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
logs, logCtx := errgroup.WithContext(ctx)

logs.Go(func() error {
logger.Debug("stream logs for container")
// stream logs from container
err = s.client.secret.stream(ctx, _secret.Origin)
err = s.client.secret.stream(logCtx, _secret.Origin, runtimeChannel)
if err != nil {
logger.Error(err)
}
}()
return nil
})

// ensure that RunContainer has finished requesting the runtime container
err = run.Wait()
if err != nil {
return err
}

logger.Debug("waiting for container")
// wait for the runtime container
Expand Down Expand Up @@ -250,7 +274,7 @@ func (s *secretSvc) pull(secret *pipeline.Secret) (*library.Secret, error) {
}

// stream tails the output for a secret plugin.
func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container) error {
func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error {
// stream all the logs to the init step
_log, err := step.LoadLogs(s.client.init, &s.client.stepLogs)
if err != nil {
Expand Down Expand Up @@ -288,7 +312,7 @@ func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container) error {

logger.Debug("tailing container")
// tail the runtime container
rc, err := s.client.Runtime.TailContainer(ctx, ctn)
rc, err := s.client.Runtime.TailContainer(ctx, ctn, runtimeChannel)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion executor/linux/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ func TestLinux_Secret_stream(t *testing.T) {
},
}

// these tests use docker runtime, so simulate docker runtime behavior by
// passing a closed channel to let TailContainer start right away
runtimeChannel := make(chan struct{})
close(runtimeChannel)

// run tests
for _, test := range tests {
_engine, err := New(
Expand All @@ -540,7 +545,7 @@ func TestLinux_Secret_stream(t *testing.T) {
// add init container info to client
_ = _engine.CreateBuild(context.Background())

err = _engine.secret.stream(context.Background(), test.container)
err = _engine.secret.stream(context.Background(), test.container, runtimeChannel)

if test.failure {
if err == nil {
Expand Down
38 changes: 27 additions & 11 deletions executor/linux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,23 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
// https://pkg.go.dev/github.com/go-vela/worker/internal/service#Snapshot
defer func() { service.Snapshot(ctn, c.build, c.Vela, c.Logger, c.repo, _service) }()

logger.Debug("running container")
// run the runtime container
err = c.Runtime.RunContainer(ctx, ctn, c.pipeline)
if err != nil {
return err
}
// create a channel so Runtime can optionally synchronize RunContainer and TailContainer
runtimeChannel := make(chan struct{})

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
run, runCtx := errgroup.WithContext(ctx)

run.Go(func() error {
logger.Debug("running container")
// run the runtime container
err = c.Runtime.RunContainer(runCtx, ctn, c.pipeline, runtimeChannel)
if err != nil {
return err
}
return nil
})

// create an error group with the parent context
//
Expand All @@ -151,21 +162,26 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
logs.Go(func() error {
logger.Debug("streaming logs for container")
// stream logs from container
err := c.StreamService(logCtx, ctn)
err := c.StreamService(logCtx, ctn, runtimeChannel)
if err != nil {
logger.Error(err)
}

return nil
})

// ensure that RunContainer has finished requesting the runtime container
err = run.Wait()
if err != nil {
return err
}

return nil
}

// StreamService tails the output for a service.
//
// nolint: funlen // ignore function length
func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) error {
func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error {
// update engine logger with service metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
Expand All @@ -181,7 +197,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err

defer func() {
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, ctn)
rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel)
if err != nil {
logger.Errorf("unable to tail container output for upload: %v", err)

Expand Down Expand Up @@ -221,7 +237,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err

logger.Debug("tailing container")
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, ctn)
rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion executor/linux/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ func TestLinux_StreamService(t *testing.T) {
},
}

// these tests use docker runtime, so simulate docker runtime behavior by
// passing a closed channel to let TailContainer start right away
runtimeChannel := make(chan struct{})
close(runtimeChannel)

// run tests
for _, test := range tests {
_engine, err := New(
Expand All @@ -371,7 +376,7 @@ func TestLinux_StreamService(t *testing.T) {
_engine.serviceLogs.Store(test.container.ID, new(library.Log))
}

err = _engine.StreamService(context.Background(), test.container)
err = _engine.StreamService(context.Background(), test.container, runtimeChannel)

if test.failure {
if err == nil {
Expand Down
37 changes: 27 additions & 10 deletions executor/linux/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,23 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
// https://pkg.go.dev/github.com/go-vela/worker/internal/step#Snapshot
defer func() { step.Snapshot(ctn, c.build, c.Vela, c.Logger, c.repo, _step) }()

logger.Debug("running container")
// run the runtime container
err = c.Runtime.RunContainer(ctx, ctn, c.pipeline)
if err != nil {
return err
}
// create a channel so Runtime can optionally synchronize RunContainer and TailContainer
runtimeChannel := make(chan struct{})

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
run, runCtx := errgroup.WithContext(ctx)

run.Go(func() error {
logger.Debug("running container")
// run the runtime container
err = c.Runtime.RunContainer(runCtx, ctn, c.pipeline, runtimeChannel)
if err != nil {
return err
}
return nil
})

// create an error group with the parent context
//
Expand All @@ -163,14 +174,20 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
logs.Go(func() error {
logger.Debug("streaming logs for container")
// stream logs from container
err := c.StreamStep(logCtx, ctn)
err := c.StreamStep(logCtx, ctn, runtimeChannel)
if err != nil {
logger.Error(err)
}

return nil
})

// ensure that RunContainer has finished requesting the runtime container
err = run.Wait()
if err != nil {
return err
}

// do not wait for detached containers
if ctn.Detach {
return nil
Expand All @@ -196,7 +213,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
// StreamStep tails the output for a step.
//
// nolint: funlen // ignore function length
func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error {
func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error {
// TODO: remove hardcoded reference
if ctn.Name == "init" {
return nil
Expand All @@ -219,7 +236,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error

defer func() {
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, ctn)
rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel)
if err != nil {
logger.Errorf("unable to tail container output for upload: %v", err)

Expand Down Expand Up @@ -264,7 +281,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error

logger.Debug("tailing container")
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, ctn)
rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion executor/linux/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,11 @@ func TestLinux_StreamStep(t *testing.T) {
},
}

// these tests use docker runtime, so simulate docker runtime behavior by
// passing a closed channel to let TailContainer start right away
runtimeChannel := make(chan struct{})
close(runtimeChannel)

// run tests
for _, test := range tests {
_engine, err := New(
Expand All @@ -417,7 +422,7 @@ func TestLinux_StreamStep(t *testing.T) {
_engine.stepLogs.Store(test.container.ID, new(library.Log))
}

err = _engine.StreamStep(context.Background(), test.container)
err = _engine.StreamStep(context.Background(), test.container, runtimeChannel)

if test.failure {
if err == nil {
Expand Down
45 changes: 35 additions & 10 deletions executor/local/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"

"golang.org/x/sync/errgroup"
)

// create a service logging pattern.
Expand Down Expand Up @@ -90,27 +92,50 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
// https://pkg.go.dev/github.com/go-vela/worker/internal/service#Snapshot
defer func() { service.Snapshot(ctn, c.build, nil, nil, nil, _service) }()

// run the runtime container
err = c.Runtime.RunContainer(ctx, ctn, c.pipeline)
if err != nil {
return err
}
// create a channel so Runtime can optionally synchronize RunContainer and TailContainer
runtimeChannel := make(chan struct{})

go func() {
// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
run, runCtx := errgroup.WithContext(ctx)

run.Go(func() error {
// run the runtime container
err = c.Runtime.RunContainer(runCtx, ctn, c.pipeline, runtimeChannel)
if err != nil {
return err
}
return nil
})

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
logs, logCtx := errgroup.WithContext(ctx)

logs.Go(func() error {
// stream logs from container
err := c.StreamService(context.Background(), ctn)
err := c.StreamService(logCtx, ctn, runtimeChannel)
if err != nil {
fmt.Fprintln(os.Stdout, "unable to stream logs for service:", err)
}
}()
return nil
})

// ensure that RunContainer has finished requesting the runtime container
err = run.Wait()
if err != nil {
return err
}

return nil
}

// StreamService tails the output for a service.
func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) error {
func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container, runtimeChannel chan struct{}) error {
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, ctn)
rc, err := c.Runtime.TailContainer(ctx, ctn, runtimeChannel)
if err != nil {
return err
}
Expand Down
Loading