Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEP-0011: Add StdoutConfig and StderrConfig to steps. #4882

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/entrypoint/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ The following flags are available:
- `-wait_file_content`: expects the `wait_file` to contain actual
contents. It will continue watching for `wait_file` until it has
content.
- `-stdout_path`: If specified, the stdout of the sub-process will be
copied to the given path on the local filesystem.
- `-stderr_path`: If specified, the stderr of the sub-process will be
copied to the given path on the local filesystem. It can be set to the
same value as `{{stdout_path}}` so both streams are copied to the same
file. However, there is no ordering guarantee on data copied from both
streams.

Any extra positional arguments are passed to the original entrypoint command.

Expand Down
121 changes: 121 additions & 0 deletions cmd/entrypoint/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright 2022 The Tekton Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main
bradbeck marked this conversation as resolved.
Show resolved Hide resolved

import (
"errors"
"io"
"math"
"os"
"time"
)

type ioResult struct {
numBytes int
err error
}

// readAsync implements a non-blocking read.
func readAsync(r io.Reader, p []byte) <-chan ioResult {
resultCh := make(chan ioResult, 1)
go func() {
defer close(resultCh)
n, err := r.Read(p)
resultCh <- ioResult{n, err}
}()
return resultCh
}

// copyAsync performs a non-blocking copy from src to dst.
func copyAsync(dst io.Writer, src io.Reader, stopCh <-chan struct{}) <-chan ioResult {
resultCh := make(chan ioResult, 1)
go func() {
defer close(resultCh)

buf := make([]byte, 1024)
result := ioResult{}
readCh := readAsync(src, buf)
stopped := false
done := false
timer := time.NewTimer(time.Duration(math.MaxInt64))
defer timer.Stop()

for !done {
// If the stop channel is signalled, continue the loop to read the rest of the available
// data with a short timeout instead of a non-blocking read to mitigate the race between
// this loop and Read() running in another goroutine.
if stopped {
if !timer.Stop() {
<-timer.C
}
timer.Reset(100 * time.Millisecond)
}
select {
case r := <-readCh:
if r.numBytes != 0 {
nw, err := dst.Write(buf[:r.numBytes])
result.numBytes += nw
if err != nil {
result.err = err
done = true
} else if nw < r.numBytes {
result.err = io.ErrShortWrite
done = true
}
}
if r.err != nil {
if !errors.Is(r.err, io.EOF) {
result.err = r.err
}
done = true
}
if !done {
readCh = readAsync(src, buf)
}
case <-stopCh:
stopped = true
stopCh = nil
case <-timer.C:
done = true
}
}

resultCh <- result
}()
return resultCh
}

// asyncWriter creates a write that duplicates its writes to the provided writer asynchronously.
func asyncWriter(w io.Writer, stopCh <-chan struct{}) (io.Writer, <-chan error, error) {
pr, pw, err := os.Pipe()
if err != nil {
return nil, nil, err
}

doneCh := make(chan error, 1)
go func() {
defer close(doneCh)

if err := (<-copyAsync(w, pr, stopCh)).err; err != nil {
doneCh <- err
}
pr.Close()
pw.Close()
}()

return pw, doneCh, nil
}
92 changes: 92 additions & 0 deletions cmd/entrypoint/io_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2022 The Tekton Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main
bradbeck marked this conversation as resolved.
Show resolved Hide resolved

import (
"bytes"
"errors"
"io"
"testing"
)

func TestCopyAsyncEOF(t *testing.T) {
stopCh := make(chan struct{}, 1)
defer close(stopCh)

pr, pw := io.Pipe()
defer pr.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
pw.Write([]byte(expectedString))
pw.Close()

if c := <-copyCh; c.err != nil {
t.Fatalf("Unexpected error: %v", c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}

func TestCopyAsyncStop(t *testing.T) {
stopCh := make(chan struct{}, 1)

pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
pw.Write([]byte(expectedString))

close(stopCh)

if c := <-copyCh; c.err != nil {
t.Fatalf("Unexpected error: %v", c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}

func TestCopyAsyncError(t *testing.T) {
stopCh := make(chan struct{}, 1)
defer close(stopCh)

pr, pw := io.Pipe()
defer pr.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
expectedError := errors.New("test error")
pw.Write([]byte(expectedString))
pw.CloseWithError(expectedError)

if c := <-copyCh; !errors.Is(c.err, expectedError) {
t.Errorf("Expected error %v but got %v", expectedError, c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}
19 changes: 12 additions & 7 deletions cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var (
terminationPath = flag.String("termination_path", "/tekton/termination", "If specified, file to write upon termination")
results = flag.String("results", "", "If specified, list of file names that might contain task results")
timeout = flag.Duration("timeout", time.Duration(0), "If specified, sets timeout for step")
stdoutPath = flag.String("stdout_path", "", "If specified, file to copy stdout to")
stderrPath = flag.String("stderr_path", "", "If specified, file to copy stderr to")
breakpointOnFailure = flag.Bool("breakpoint_on_failure", false, "If specified, expect steps to not skip on failure")
onError = flag.String("on_error", "", "Set to \"continue\" to ignore an error and continue when a container terminates with a non-zero exit code."+
" Set to \"stopAndFail\" to declare a failure with a step error and stop executing the rest of the steps.")
Expand Down Expand Up @@ -130,13 +132,16 @@ func main() {
}

e := entrypoint.Entrypointer{
Command: append(cmd, commandArgs...),
WaitFiles: strings.Split(*waitFiles, ","),
WaitFileContent: *waitFileContent,
PostFile: *postFile,
TerminationPath: *terminationPath,
Waiter: &realWaiter{waitPollingInterval: defaultWaitPollingInterval, breakpointOnFailure: *breakpointOnFailure},
Runner: &realRunner{},
Command: append(cmd, commandArgs...),
WaitFiles: strings.Split(*waitFiles, ","),
WaitFileContent: *waitFileContent,
PostFile: *postFile,
TerminationPath: *terminationPath,
Waiter: &realWaiter{waitPollingInterval: defaultWaitPollingInterval, breakpointOnFailure: *breakpointOnFailure},
Runner: &realRunner{
stdoutPath: *stdoutPath,
stderrPath: *stderrPath,
},
PostWriter: &realPostWriter{},
Results: strings.Split(*results, ","),
Timeout: timeout,
Expand Down
94 changes: 92 additions & 2 deletions cmd/entrypoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ package main

import (
"context"
"io"
"log"
"os"
"os/exec"
"os/signal"
"path/filepath"
"sync"
"syscall"

"github.com/tektoncd/pipeline/pkg/entrypoint"
Expand All @@ -34,11 +38,34 @@ import (

// realRunner actually runs commands.
type realRunner struct {
signals chan os.Signal
sync.Mutex
signals chan os.Signal
signalsClosed bool
stdoutPath string
stderrPath string
}

var _ entrypoint.Runner = (*realRunner)(nil)

// close closes the signals channel which is used to receive system signals.
func (rr *realRunner) close() {
rr.Lock()
defer rr.Unlock()
if rr.signals != nil && !rr.signalsClosed {
close(rr.signals)
rr.signalsClosed = true
}
}

// signal allows the caller to simulate the sending of a system signal.
func (rr *realRunner) signal(signal os.Signal) {
rr.Lock()
defer rr.Unlock()
if rr.signals != nil && !rr.signalsClosed {
rr.signals <- signal
}
}

// Run executes the entrypoint.
func (rr *realRunner) Run(ctx context.Context, args ...string) error {
if len(args) == 0 {
Expand All @@ -50,13 +77,76 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
if rr.signals == nil {
rr.signals = make(chan os.Signal, 1)
}
defer close(rr.signals)
defer rr.close()
signal.Notify(rr.signals)
defer signal.Reset()

cmd := exec.CommandContext(ctx, name, args...)
stopCh := make(chan struct{}, 1)
defer close(stopCh)

cmd.Stdout = os.Stdout
var stdoutFile *os.File
if rr.stdoutPath != "" {
var err error
var doneCh <-chan error
// Create directory if it doesn't already exist
if err = os.MkdirAll(filepath.Dir(rr.stdoutPath), os.ModePerm); err != nil {
return err
}
if stdoutFile, err = os.Create(rr.stdoutPath); err != nil {
return err
}
// We use os.Pipe in asyncWriter to copy stdout instead of cmd.StdoutPipe or providing an
// io.Writer directly because otherwise Go would wait for the underlying fd to be closed by the
// child process before returning from cmd.Wait even if the process is no longer running. This
// would cause a deadlock if the child spawns a long running descendant process before exiting.
if cmd.Stdout, doneCh, err = asyncWriter(io.MultiWriter(os.Stdout, stdoutFile), stopCh); err != nil {
return err
}
go func() {
if err := <-doneCh; err != nil {
log.Fatalf("Copying stdout: %v", err)
}
stdoutFile.Close()
}()
}

cmd.Stderr = os.Stderr
var stderrFile *os.File
if rr.stderrPath != "" {
var err error
var doneCh <-chan error
if rr.stderrPath == rr.stdoutPath {
fd, err := syscall.Dup(int(stdoutFile.Fd()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why copy stdout file descriptor and create a new file stderrFile? Can we rely on the already initialized file instance stdoutFile? I prefer implementing this way to keep both stdout and stderr independent of each other but just wondering if it's possible to reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is so that the file descriptors can be closed independently, but still point to the same file. Will do some additional checking to confirm.

if err != nil {
return err
}
stderrFile = os.NewFile(uintptr(fd), rr.stderrPath)
} else {
// Create directory if it doesn't already exist
if err = os.MkdirAll(filepath.Dir(rr.stderrPath), os.ModePerm); err != nil {
return err
}
if stderrFile, err = os.Create(rr.stderrPath); err != nil {
return err
}
}
// We use os.Pipe in asyncWriter to copy stderr instead of cmd.StderrPipe or providing an
// io.Writer directly because otherwise Go would wait for the underlying fd to be closed by the
// child process before returning from cmd.Wait even if the process is no longer running. This
// would cause a deadlock if the child spawns a long running descendant process before exiting.
if cmd.Stderr, doneCh, err = asyncWriter(io.MultiWriter(os.Stderr, stderrFile), stopCh); err != nil {
return err
}
go func() {
if err := <-doneCh; err != nil {
log.Fatalf("Copying stderr: %v", err)
}
stderrFile.Close()
}()
}

// dedicated PID group used to forward signals to
// main process and all children
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
Expand Down
Loading