Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow signal agent sends to bootstrap on cancel to be customized #1041

Merged
merged 3 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/buildkite/agent/api"
"github.com/buildkite/agent/logger"
"github.com/buildkite/agent/metrics"
"github.com/buildkite/agent/process"
"github.com/buildkite/agent/proctitle"
"github.com/buildkite/agent/retry"
)
Expand All @@ -19,6 +20,9 @@ type AgentWorkerConfig struct {
// Whether to set debug in the job
Debug bool

// What signal to use for worker cancellation
CancelSignal process.Signal

// The configuration of the agent from the CLI
AgentConfiguration AgentConfiguration
}
Expand Down Expand Up @@ -51,6 +55,9 @@ type AgentWorker struct {
// Whether to enable debug
debug bool

// The signal to use for cancellation
cancelSig process.Signal

// Stop controls
stop chan struct{}
stopping bool
Expand All @@ -71,6 +78,7 @@ func NewAgentWorker(l logger.Logger, a *api.AgentRegisterResponse, m *metrics.Co
debug: c.Debug,
agentConfiguration: c.AgentConfiguration,
stop: make(chan struct{}),
cancelSig: c.CancelSignal,
}
}

Expand Down Expand Up @@ -373,6 +381,7 @@ func (a *AgentWorker) AcceptAndRun(job *api.Job) error {
// Now that the job has been accepted, we can start it.
a.jobRunner, err = NewJobRunner(a.logger, jobMetricsScope, a.agent, accepted, a.apiClient, JobRunnerConfig{
Debug: a.debug,
CancelSignal: a.cancelSig,
AgentConfiguration: a.agentConfiguration,
})

Expand Down
16 changes: 10 additions & 6 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type JobRunnerConfig struct {
// The configuration of the agent from the CLI
AgentConfiguration AgentConfiguration

// What signal to use for worker cancellation
CancelSignal process.Signal

// Whether to set debug in the job
Debug bool
}
Expand Down Expand Up @@ -181,12 +184,13 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe

// The process that will run the bootstrap script
runner.process = process.New(l, process.Config{
Path: cmd[0],
Args: cmd[1:],
Env: processEnv,
PTY: conf.AgentConfiguration.RunInPty,
Stdout: processWriter,
Stderr: processWriter,
Path: cmd[0],
Args: cmd[1:],
Env: processEnv,
PTY: conf.AgentConfiguration.RunInPty,
Stdout: processWriter,
Stderr: processWriter,
InterruptSignal: conf.CancelSignal,
})

// Kick off our callback when the process starts
Expand Down
14 changes: 14 additions & 0 deletions clicommand/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/buildkite/agent/experiments"
"github.com/buildkite/agent/logger"
"github.com/buildkite/agent/metrics"
"github.com/buildkite/agent/process"
"github.com/buildkite/shellwords"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -80,6 +81,7 @@ type AgentStartConfig struct {
MetricsDatadogHost string `cli:"metrics-datadog-host"`
Spawn int `cli:"spawn"`
LogFormat string `cli:"log-format"`
CancelSignal string `cli:"cancel-signal"`

// Global flags
Debug bool `cli:"debug"`
Expand Down Expand Up @@ -353,6 +355,12 @@ var AgentStartCommand = cli.Command{
Value: 1,
EnvVar: "BUILDKITE_AGENT_SPAWN",
},
cli.StringFlag{
Name: "cancel-signal",
Usage: "The signal to use for cancellation",
EnvVar: "BUILDKITE_CANCEL_SIGNAL",
Value: "SIGTERM",
},

// API Flags
AgentRegisterTokenFlag,
Expand Down Expand Up @@ -590,6 +598,11 @@ var AgentStartCommand = cli.Command{
l.Info("Agents will disconnect after %d seconds of inactivity", agentConf.DisconnectAfterIdleTimeout)
}

cancelSig, err := process.ParseSignal(cfg.CancelSignal)
if err != nil {
l.Fatal("Failed to parse cancel-signal: %v", err)
}

// Create the API client
client := api.NewClient(l, loadAPIClientConfig(cfg, `Token`))

Expand Down Expand Up @@ -630,6 +643,7 @@ var AgentStartCommand = cli.Command{
agent.NewAgentWorker(
l.WithFields(logger.StringField(`agent`, ag.Name)), ag, mc, client, agent.AgentWorkerConfig{
AgentConfiguration: agentConf,
CancelSignal: cancelSig,
Debug: cfg.Debug,
}))
}
Expand Down
66 changes: 53 additions & 13 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"io"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -18,17 +20,55 @@ const (
termType = `xterm-256color`
)

type Signal int

const (
SIGHUP Signal = 1
SIGINT Signal = 2
SIGQUIT Signal = 3
SIGUSR1 Signal = 10
SIGUSR2 Signal = 12
SIGTERM Signal = 15
)

var signalMap = map[string]Signal{
`SIGHUP`: SIGHUP,
`SIGINT`: SIGINT,
`SIGQUIT`: SIGQUIT,
`SIGUSR1`: SIGUSR1,
`SIGUSR2`: SIGUSR2,
`SIGTERM`: SIGTERM,
}

func (s Signal) String() string {
for k, sig := range signalMap {
if sig == s {
return k
}
}
return strconv.FormatInt(int64(s), 10)
}

func ParseSignal(sig string) (Signal, error) {
s, ok := signalMap[strings.ToUpper(sig)]
if !ok {
return Signal(0), fmt.Errorf("Unknown signal %q", sig)
}
return s, nil
}

// Configuration for a Process
type Config struct {
PTY bool
Timestamp bool
Path string
Args []string
Env []string
Stdout io.Writer
Stderr io.Writer
Dir string
Context context.Context
PTY bool
Timestamp bool
Path string
Args []string
Env []string
Stdout io.Writer
Stderr io.Writer
Dir string
Context context.Context
InterruptSignal Signal
}

// Process is an operating system level process
Expand Down Expand Up @@ -78,7 +118,7 @@ func (p *Process) Run() error {
// Setup the process to create a process group if supported
// See https://github.com/kr/pty/issues/35 for context
if !p.conf.PTY {
SetupProcessGroup(p.command)
p.setupProcessGroup()
}

// Configure working dir and fail if it doesn't exist, otherwise
Expand Down Expand Up @@ -258,11 +298,11 @@ func (p *Process) Interrupt() error {
}

// interrupt the process (ctrl-c or SIGINT)
if err := InterruptProcessGroup(p.command.Process, p.logger); err != nil {
if err := p.interruptProcessGroup(); err != nil {
p.logger.Error("[Process] Failed to interrupt process %d: %v", p.pid, err)

// Fallback to terminating if we get an error
if termErr := TerminateProcessGroup(p.command.Process, p.logger); termErr != nil {
if termErr := p.terminateProcessGroup(); termErr != nil {
return termErr
}
}
Expand All @@ -280,7 +320,7 @@ func (p *Process) Terminate() error {
return nil
}

return TerminateProcessGroup(p.command.Process, p.logger)
return p.terminateProcessGroup()
}

func timeoutWait(waitGroup *sync.WaitGroup) error {
Expand Down
49 changes: 48 additions & 1 deletion process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ func TestProcessInterrupts(t *testing.T) {
// give the signal handler some time to install
time.Sleep(time.Millisecond * 50)

p.Interrupt()
err := p.Interrupt()
if err != nil {
t.Error(err)
}
}()

if err := p.Run(); err != nil {
Expand All @@ -183,6 +186,50 @@ func TestProcessInterrupts(t *testing.T) {
assertProcessDoesntExist(t, p)
}

func TestProcessInterruptsWithCustomSignal(t *testing.T) {
if runtime.GOOS == `windows` {
t.Skip("Works in windows, but not in docker")
}

b := &bytes.Buffer{}

p := process.New(logger.Discard, process.Config{
Path: os.Args[0],
Env: []string{"TEST_MAIN=tester-signal"},
Stdout: b,
InterruptSignal: process.SIGINT,
})

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
<-p.Started()

// give the signal handler some time to install
time.Sleep(time.Millisecond * 50)

err := p.Interrupt()
if err != nil {
t.Error(err)
}
}()

if err := p.Run(); err != nil {
t.Fatal(err)
}

wg.Wait()

output := b.String()
if output != `SIG interrupt` {
t.Fatalf("Bad output: %q", output)
}

assertProcessDoesntExist(t, p)
}

func TestProcessSetsProcessGroupID(t *testing.T) {
if runtime.GOOS == `windows` {
t.Skip("Process groups not supported on windows")
Expand Down
32 changes: 18 additions & 14 deletions process/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,34 @@
package process

import (
"os"
"os/exec"
"syscall"

"github.com/buildkite/agent/logger"
)

func SetupProcessGroup(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
func (p *Process) setupProcessGroup() {
// See https://github.com/kr/pty/issues/35 for context
if !p.conf.PTY {
p.command.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
}
}
}

func TerminateProcessGroup(p *os.Process, l logger.Logger) error {
l.Debug("[Process] Sending signal SIGKILL to PGID: %d", p.Pid)
return syscall.Kill(-p.Pid, syscall.SIGKILL)
func (p *Process) terminateProcessGroup() error {
p.logger.Debug("[Process] Sending signal SIGKILL to PGID: %d", p.Pid)
return syscall.Kill(-p.pid, syscall.SIGKILL)
}

func InterruptProcessGroup(p *os.Process, l logger.Logger) error {
l.Debug("[Process] Sending signal SIGTERM to PGID: %d", p.Pid)
func (p *Process) interruptProcessGroup() error {
intSignal := p.conf.InterruptSignal

// TODO: this should be SIGINT, but will be a breaking change
return syscall.Kill(-p.Pid, syscall.SIGTERM)
if intSignal == Signal(0) {
intSignal = SIGTERM
}

p.logger.Debug("[Process] Sending signal %s to PGID: %d", intSignal, p.Pid)
return syscall.Kill(-p.pid, syscall.Signal(intSignal))
}

func GetPgid(pid int) (int, error) {
Expand Down
17 changes: 7 additions & 10 deletions process/signal_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package process

import (
"errors"
"os"
"os/exec"
"strconv"
"syscall"

"github.com/buildkite/agent/logger"
)

// Windows has no concept of parent/child processes or signals. The best we can do
Expand All @@ -26,24 +23,24 @@ const (
createNewProcessGroupFlag = 0x00000200
)

func SetupProcessGroup(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{
func (p *Process) setupProcessGroup() {
p.command.SysProcAttr = &syscall.SysProcAttr{
CreationFlags: syscall.CREATE_UNICODE_ENVIRONMENT | createNewProcessGroupFlag,
}
}

func TerminateProcessGroup(p *os.Process, l logger.Logger) error {
l.Debug("[Process] Terminating process tree with TASKKILL.EXE PID: %d", p.Pid)
func (p *Process) terminateProcessGroup() error {
p.logger.Debug("[Process] Terminating process tree with TASKKILL.EXE PID: %d", p.Pid)

// taskkill.exe with /F will call TerminateProcess and hard-kill the process and
// anything left in it's process tree.
return exec.Command("CMD", "/C", "TASKKILL.EXE", "/F", "/T", "/PID", strconv.Itoa(p.Pid)).Run()
return exec.Command("CMD", "/C", "TASKKILL.EXE", "/F", "/T", "/PID", strconv.Itoa(p.pid)).Run()
}

func InterruptProcessGroup(p *os.Process, l logger.Logger) error {
func (p *Process) interruptProcessGroup() error {
// Sends a CTRL-BREAK signal to the process group id, which is the same as the process PID
// For some reason I cannot fathom, this returns "Incorrect function" in docker for windows
r1, _, err := procGenerateConsoleCtrlEvent.Call(syscall.CTRL_BREAK_EVENT, uintptr(p.Pid))
r1, _, err := procGenerateConsoleCtrlEvent.Call(syscall.CTRL_BREAK_EVENT, uintptr(p.pid))
if r1 == 0 {
return err
}
Expand Down