Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(servstate): use WaitDelay to avoid Command.Wait blocking on stdin/out/err #275

Merged
merged 10 commits into from
Aug 18, 2023
4 changes: 2 additions & 2 deletions internals/daemon/api_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r logsResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
err = encoder.Encode(newJSONLog(<-fifo))
}
if err != nil {
logger.Noticef("error writing logs: %v", err)
logger.Noticef("Cannot write logs: %v", err)
benhoyt marked this conversation as resolved.
Show resolved Hide resolved
return false
}
flushWriter(w)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (r logsResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Otherwise encode and output log directly.
err := encoder.Encode(newJSONLog(log))
if err != nil {
logger.Noticef("error writing logs: %v", err)
logger.Noticef("Cannot write logs: %v", err)
return
}
if follow {
Expand Down
8 changes: 4 additions & 4 deletions internals/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c *Command) canAccess(r *http.Request, user *UserState) accessResult {
if err == nil {
isUser = true
} else if err != errNoID {
logger.Noticef("unexpected error when attempting to get UID: %s", err)
logger.Noticef("Cannot parse UID from remote address %q: %s", r.RemoteAddr, err)
return accessForbidden
}

Expand Down Expand Up @@ -530,7 +530,7 @@ func (d *Daemon) HandleRestart(t restart.RestartType) {
defer d.mu.Unlock()
d.restartSocket = true
default:
logger.Noticef("internal error: restart handler called with unknown restart type: %v", t)
logger.Noticef("Internal error: restart handler called with unknown restart type: %v", t)
}
d.tomb.Kill(nil)
}
Expand Down Expand Up @@ -760,12 +760,12 @@ func (d *Daemon) RebootIsMissing(st *state.State) error {
// might get rolled back!!
restart.ClearReboot(st)
clearReboot(st)
logger.Noticef("pebble was restarted while a system restart was expected, pebble retried to schedule and waited again for a system restart %d times and is giving up", rebootMaxTentatives)
logger.Noticef("Pebble was restarted while a system restart was expected, pebble retried to schedule and waited again for a system restart %d times and is giving up", rebootMaxTentatives)
return nil
}
st.Set("daemon-system-restart-tentative", nTentative)
d.state = st
logger.Noticef("pebble was restarted while a system restart was expected, pebble will try to schedule and wait for a system restart again (tenative %d/%d)", nTentative, rebootMaxTentatives)
logger.Noticef("Pebble was restarted while a system restart was expected, pebble will try to schedule and wait for a system restart again (tenative %d/%d)", nTentative, rebootMaxTentatives)
return errExpectedReboot
}

Expand Down
2 changes: 1 addition & 1 deletion internals/daemon/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (r *resp) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
status := r.Status
bs, err := r.MarshalJSON()
if err != nil {
logger.Noticef("cannot marshal %#v to JSON: %v", *r, err)
logger.Noticef("Cannot marshal %#v to JSON: %v", *r, err)
bs = nil
status = 500
}
Expand Down
3 changes: 3 additions & 0 deletions internals/overlord/checkstate/checkers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"syscall"
"time"

"github.com/canonical/x-go/strutil/shlex"

Expand All @@ -38,6 +39,7 @@ import (
const (
maxErrorBytes = 10 * 1024
maxErrorLines = 20
execWaitDelay = time.Second
)

// httpChecker is a checker that ensures an HTTP GET at a specified URL returns 20x.
Expand Down Expand Up @@ -167,6 +169,7 @@ func (c *execChecker) check(ctx context.Context) error {
defer ringBuffer.Close()
cmd.Stdout = ringBuffer
cmd.Stderr = ringBuffer
cmd.WaitDelay = execWaitDelay
err = reaper.StartCommand(cmd)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions internals/overlord/cmdstate/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
const (
connectTimeout = 5 * time.Second
handshakeTimeout = 5 * time.Second
waitDelay = time.Second

wsControl = "control"
wsStdio = "stdio"
Expand Down Expand Up @@ -343,6 +344,7 @@ func (e *execution) do(ctx context.Context, task *state.Task) error {
cmd.Stdin = stdin
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.WaitDelay = waitDelay

cmd.SysProcAttr = &syscall.SysProcAttr{}
if e.userID != nil && e.groupID != nil {
Expand Down
21 changes: 18 additions & 3 deletions internals/overlord/servstate/handlers.go
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,21 @@ func (s *serviceData) startInternal() error {
s.cmd.Stdout = logWriter
s.cmd.Stderr = logWriter

// Add WaitDelay to ensure cmd.Wait() returns in a reasonable timeframe if
// the goroutines that cmd.Start() uses to copy Stdin/Stdout/Stderr are
// blocked when copying due to a sub-subprocess holding onto them. This
// only happens if the sub-subprocess uses setsid or setpgid to change
// the process group. Read more details in these issues:
//
// - https://github.com/golang/go/issues/23019
// - https://github.com/golang/go/issues/50436
//
// This isn't the original intent of kill-delay, but it seems reasonable
// to reuse it in this context. Use a value slightly less than the kill
// delay (90% of it) to avoid racing with trying to send SIGKILL (to a
// process that has already exited).
s.cmd.WaitDelay = s.killDelay() * 9 / 10 // will only overflow if kill-delay is 32 years!

// Start the process!
logger.Noticef("Service %q starting: %s", serviceName, s.config.Command)
err = reaper.StartCommand(s.cmd)
Expand Down Expand Up @@ -616,9 +631,9 @@ func (s *serviceData) sendSignal(signal string) error {
}

// killDelay reports the duration that this service should be given when being
// asked to shutdown gracefully before being force terminated. The value
// returned will either be the services pre configured value or the default
// kill delay for pebble.
// asked to shut down gracefully before being force-terminated. The value
// returned will either be the service's pre-configured value, or the default
// kill delay if that is not set.
func (s *serviceData) killDelay() time.Duration {
if s.config.KillDelay.IsSet {
return s.config.KillDelay.Value
Expand Down
117 changes: 79 additions & 38 deletions internals/overlord/servstate/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (
)

func TestMain(m *testing.M) {
// See TestReaper
// Used by TestReapZombies
if os.Getenv("PEBBLE_TEST_CREATE_ZOMBIE") == "1" {
err := createZombie()
if err != nil {
Expand All @@ -65,6 +65,21 @@ func TestMain(m *testing.M) {
return
}

// Used by TestWaitDelay
if os.Getenv("PEBBLE_TEST_WAITDELAY") == "1" {
// To get WaitDelay to kick in, we need to start a new process with
// setsid (to ensure it has a new process group ID) and passing
// os.Stdout down to the (grand)child process.
cmd := exec.Command("sleep", "10")
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
cmd.Stdout = os.Stdout
err := cmd.Run()
if err != nil {
panic(err)
}
return
}

os.Exit(m.Run())
}

Expand Down Expand Up @@ -378,18 +393,14 @@ func (s *S) TestStartBadCommand(c *C) {
}

func (s *S) TestCurrentUserGroup(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

current, err := user.Current()
c.Assert(err, IsNil)
group, err := user.LookupGroupId(current.Gid)
c.Assert(err, IsNil)

outputPath := filepath.Join(dir, "output")
outputPath := filepath.Join(c.MkDir(), "output")
layer := parseLayer(c, 0, "layer", fmt.Sprintf(`
services:
usrtest:
Expand Down Expand Up @@ -540,11 +551,7 @@ services:
}

func (s *S) TestAppendLayer(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

// Append a layer when there are no layers.
layer := parseLayer(c, 0, "label1", `
Expand All @@ -553,7 +560,7 @@ services:
override: replace
command: /bin/sh
`)
err = s.manager.AppendLayer(layer)
err := s.manager.AppendLayer(layer)
c.Assert(err, IsNil)
c.Assert(layer.Order, Equals, 1)
c.Assert(planYAML(c, s.manager), Equals, `
Expand Down Expand Up @@ -622,11 +629,7 @@ services:
}

func (s *S) TestCombineLayer(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

// "Combine" layer with no layers should just append.
layer := parseLayer(c, 0, "label1", `
Expand All @@ -635,7 +638,7 @@ services:
override: replace
command: /bin/sh
`)
err = s.manager.CombineLayer(layer)
err := s.manager.CombineLayer(layer)
c.Assert(err, IsNil)
c.Assert(layer.Order, Equals, 1)
c.Assert(planYAML(c, s.manager), Equals, `
Expand Down Expand Up @@ -735,11 +738,7 @@ services:
}

func (s *S) TestSetServiceArgs(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

// Append a layer with a few services having default args.
layer := parseLayer(c, 0, "base-layer", `
Expand All @@ -754,7 +753,7 @@ services:
override: replace
command: foo
`)
err = s.manager.AppendLayer(layer)
err := s.manager.AppendLayer(layer)
c.Assert(err, IsNil)
c.Assert(layer.Order, Equals, 1)
s.planLayersHasLen(c, s.manager, 1)
Expand Down Expand Up @@ -1601,20 +1600,16 @@ func (s *S) TestStopRunningNoServices(c *C) {
}

func (s *S) TestNoWorkingDir(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

outputPath := filepath.Join(dir, "output")
outputPath := filepath.Join(c.MkDir(), "output")
layer := parseLayer(c, 0, "layer", fmt.Sprintf(`
services:
nowrkdir:
override: replace
command: /bin/sh -c "pwd >%s; %s; sleep %g"
`, outputPath, s.insertDoneCheck(c, "nowrkdir"), shortOkayDelay.Seconds()+0.01))
err = s.manager.AppendLayer(layer)
err := s.manager.AppendLayer(layer)
c.Assert(err, IsNil)

// Service command should run in current directory (package directory)
Expand All @@ -1632,12 +1627,9 @@ services:
}

func (s *S) TestWorkingDir(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

dir := c.MkDir()
outputPath := filepath.Join(dir, "output")
layer := parseLayer(c, 0, "layer", fmt.Sprintf(`
services:
Expand All @@ -1646,7 +1638,7 @@ services:
command: /bin/sh -c "pwd >%s; %s; sleep %g"
working-dir: %s
`, outputPath, s.insertDoneCheck(c, "wrkdir"), shortOkayDelay.Seconds()+0.01, dir))
err = s.manager.AppendLayer(layer)
err := s.manager.AppendLayer(layer)
c.Assert(err, IsNil)

chg := s.startServices(c, []string{"wrkdir"}, 1)
Expand All @@ -1661,6 +1653,44 @@ services:
c.Check(string(output), Equals, dir+"\n")
}

func (s *S) TestWaitDelay(c *C) {
s.setupEmptyServiceManager(c)

// Run the test binary with PEBBLE_TEST_WAITDELAY=1 (see TestMain).
testExecutable, err := os.Executable()
c.Assert(err, IsNil)
layer := parseLayer(c, 0, "layer", fmt.Sprintf(`
services:
waitdelay:
override: replace
command: %s
environment:
PEBBLE_TEST_WAITDELAY: 1
kill-delay: 50ms
`, testExecutable))
err = s.manager.AppendLayer(layer)
c.Assert(err, IsNil)

// Start service and wait for it to be started
chg := s.startServices(c, []string{"waitdelay"}, 1)
s.st.Lock()
c.Assert(chg.Err(), IsNil)
s.st.Unlock()
s.waitUntilService(c, "waitdelay", func(svc *servstate.ServiceInfo) bool {
return svc.Current == servstate.StatusActive
})

// Try to stop the service; it will only stop if WaitDelay logic is working,
// otherwise the goroutine waiting for the child's stdout will never finish.
chg = s.stopServices(c, []string{"waitdelay"}, 1)
s.st.Lock()
c.Assert(chg.Err(), IsNil)
s.st.Unlock()
s.waitUntilService(c, "waitdelay", func(svc *servstate.ServiceInfo) bool {
return svc.Current == servstate.StatusInactive
})
}

// setupDefaultServiceManager provides a basic setup that can be used by many
// of the unit tests without having to create a custom setup.
func (s *S) setupDefaultServiceManager(c *C) {
Expand All @@ -1677,6 +1707,16 @@ func (s *S) setupDefaultServiceManager(c *C) {
c.Assert(err, IsNil)
}

// setupEmptyServiceManager sets up a service manager with no layers for tests
// that want to customize them.
func (s *S) setupEmptyServiceManager(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
}

// Make sure services are all stopped before the next test starts.
func (s *S) stopRunningServices(c *C) {
taskSet, err := servstate.StopRunning(s.st, s.manager)
Expand Down Expand Up @@ -1898,6 +1938,7 @@ func getChildSubreaper() (bool, error) {
}

func createZombie() error {
// Run the test binary with PEBBLE_TEST_ZOMBIE_CHILD=1 (see TestMain)
testExecutable, err := os.Executable()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internals/overlord/stateengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (se *StateEngine) Ensure() error {
for _, m := range se.managers {
err := m.Ensure()
if err != nil {
logger.Noticef("state ensure error: %v", err)
logger.Noticef("State ensure error: %v", err)
errs = append(errs, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion internals/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func StartCommand(cmd *exec.Cmd) error {
case ch <- -1:
default:
}
logger.Noticef("internal error: new PID %d observed while still being tracked", cmd.Process.Pid)
logger.Noticef("Internal error: new PID %d observed while still being tracked", cmd.Process.Pid)
}
// Channel is 1-buffered so the send in reapOnce never blocks, if for
// some reason someone forgets to call WaitCommand.
Expand Down
Loading