Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force closing of pipe to child process #4336

Merged
merged 2 commits into from
May 29, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 72 additions & 19 deletions client/driver/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ type UniversalExecutor struct {
processExited chan interface{}
fsIsolationEnforced bool

lre *logging.FileRotator
lro *logging.FileRotator
lre *logRotatorWrapper
lro *logRotatorWrapper
rotatorLock sync.Mutex

syslogServer *logging.SyslogServer
Expand Down Expand Up @@ -252,8 +252,8 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
if err := e.configureLoggers(); err != nil {
return nil, err
}
e.cmd.Stdout = e.lro
e.cmd.Stderr = e.lre
e.cmd.Stdout = e.lro.processOutWriter
e.cmd.Stderr = e.lre.processOutWriter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be processErrWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that is just the writer end of the pipe. The difference is e.lro and e.lre which are log rotator output and error respectively


// Look up the binary path and make it executable
absPath, err := e.lookupBin(e.ctx.TaskEnv.ReplaceEnv(command.Cmd))
Expand Down Expand Up @@ -348,7 +348,12 @@ func (e *UniversalExecutor) configureLoggers() error {
if err != nil {
return fmt.Errorf("error creating new stdout log file for %q: %v", e.ctx.Task.Name, err)
}
e.lro = lro

r, err := NewLogRotatorWrapper(lro)
if err != nil {
return err
}
e.lro = r
}

if e.lre == nil {
Expand All @@ -357,7 +362,12 @@ func (e *UniversalExecutor) configureLoggers() error {
if err != nil {
return fmt.Errorf("error creating new stderr log file for %q: %v", e.ctx.Task.Name, err)
}
e.lre = lre

r, err := NewLogRotatorWrapper(lre)
if err != nil {
return err
}
e.lre = r
}
return nil
}
Expand All @@ -375,14 +385,14 @@ func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error
if e.lro == nil {
return fmt.Errorf("log rotator for stdout doesn't exist")
}
e.lro.MaxFiles = logConfig.MaxFiles
e.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
e.lro.rotatorWriter.MaxFiles = logConfig.MaxFiles
e.lro.rotatorWriter.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)

if e.lre == nil {
return fmt.Errorf("log rotator for stderr doesn't exist")
}
e.lre.MaxFiles = logConfig.MaxFiles
e.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
e.lre.rotatorWriter.MaxFiles = logConfig.MaxFiles
e.lre.rotatorWriter.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
return nil
}

Expand All @@ -393,10 +403,10 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
e.rotatorLock.Lock()
if e.lro != nil && e.lre != nil {
fileSize := int64(task.LogConfig.MaxFileSizeMB * 1024 * 1024)
e.lro.MaxFiles = task.LogConfig.MaxFiles
e.lro.FileSize = fileSize
e.lre.MaxFiles = task.LogConfig.MaxFiles
e.lre.FileSize = fileSize
e.lro.rotatorWriter.MaxFiles = task.LogConfig.MaxFiles
e.lro.rotatorWriter.FileSize = fileSize
e.lre.rotatorWriter.MaxFiles = task.LogConfig.MaxFiles
e.lre.rotatorWriter.FileSize = fileSize
}
e.rotatorLock.Unlock()
return nil
Expand Down Expand Up @@ -799,7 +809,7 @@ func (e *UniversalExecutor) LaunchSyslogServer() (*SyslogServerState, error) {

e.syslogServer = logging.NewSyslogServer(l, e.syslogChan, e.logger)
go e.syslogServer.Start()
go e.collectLogs(e.lre, e.lro)
go e.collectLogs(e.lre.rotatorWriter, e.lro.rotatorWriter)
syslogAddr := fmt.Sprintf("%s://%s", l.Addr().Network(), l.Addr().String())
return &SyslogServerState{Addr: syslogAddr}, nil
}
Expand All @@ -809,11 +819,54 @@ func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
if logParts.Severity == syslog.LOG_ERR {
e.lre.Write(logParts.Message)
e.lre.Write([]byte{'\n'})
we.Write(logParts.Message)
we.Write([]byte{'\n'})
} else {
e.lro.Write(logParts.Message)
e.lro.Write([]byte{'\n'})
wo.Write(logParts.Message)
wo.Write([]byte{'\n'})
}
}
}

// logRotatorWrapper wraps our log rotator and exposes a pipe that can feed the
// log rotator data. The processOutWriter should be attached to the process and
// data will be copied from the reader to the rotator.
type logRotatorWrapper struct {
processOutWriter *os.File
processOutReader *os.File
rotatorWriter *logging.FileRotator
}

// NewLogRotatorWrapper takes a rotator and returns a wrapper that has the
// processOutWriter to attach to the processes stdout or stderr.
func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err)
}

wrap := &logRotatorWrapper{
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
}
wrap.start()
return wrap, nil
}

// start starts a go-routine that copies from the pipe into the rotator. This is
// called by the constructor and not the user of the wrapper.
func (l *logRotatorWrapper) start() {
go func() {
io.Copy(l.rotatorWriter, l.processOutReader)
l.processOutReader.Close() // in case io.Copy stopped due to write error
}()
return
}

// Close closes the rotator and the process writer to ensure that the Wait
// command exits.
func (l *logRotatorWrapper) Close() error {
l.rotatorWriter.Close()
return l.processOutWriter.Close()
}