Skip to content

Commit

Permalink
feat: add force kill signal handler that dumps stacks
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed May 5, 2024
1 parent be0cdd5 commit 386f46a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
29 changes: 24 additions & 5 deletions backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"os"
"os/signal"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"

"github.com/garethgeorge/backrest/gen/go/v1/v1connect"
Expand Down Expand Up @@ -46,7 +48,8 @@ func main() {
}

ctx, cancel := context.WithCancel(context.Background())
go onterm(cancel)
go onterm(os.Interrupt, cancel)
go onterm(os.Interrupt, newForceKillHandler())

// Load the configuration
configStore := createConfigProvider()
Expand Down Expand Up @@ -146,11 +149,13 @@ func createConfigProvider() config.ConfigStore {
}
}

func onterm(callback func()) {
func onterm(s os.Signal, callback func()) {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt, syscall.SIGTERM)
<-sigchan
callback()
signal.Notify(sigchan, s, syscall.SIGTERM)
for {
<-sigchan
callback()
}
}

func getSecret() []byte {
Expand All @@ -174,3 +179,17 @@ func getSecret() []byte {
}
return secret
}

func newForceKillHandler() func() {
var times atomic.Int32
return func() {
if times.Load() > 0 {
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
os.Stderr.Write(buf)
zap.S().Fatal("dumped all running coroutine stack traces, forcing termination")
}
times.Add(1)
zap.S().Warn("attempting graceful shutdown, to force termination press Ctrl+C again")
}
}
2 changes: 1 addition & 1 deletion internal/orchestrator/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewRepoOrchestrator(config *v1.Config, repoConfig *v1.Repo, resticPath stri

var opts []restic.GenericOption
opts = append(opts, restic.WithEnviron())
opts = append(opts, restic.WithEnv("RESTIC_PROGRESS_FPS=0.5"))
opts = append(opts, restic.WithEnv("RESTIC_PROGRESS_FPS=2"))

if env := repoConfig.GetEnv(); len(env) != 0 {
for _, e := range env {
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/tasks/taskbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunne
zap.S().Warnf("unexpected message type %q in backup progress entry", entry.MessageType)
}

if time.Since(lastSent) < 1*time.Second {
if time.Since(lastSent) <= 1000*time.Millisecond {
return
}
lastSent = time.Now()
Expand Down
2 changes: 0 additions & 2 deletions internal/orchestrator/tasks/taskindexsnapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func indexSnapshotsHelper(ctx context.Context, st ScheduledTask, taskRunner Task
t := st.Task
oplog := taskRunner.OpLog()

config := taskRunner.Config()

repo, err := taskRunner.GetRepoOrchestrator(t.RepoID())
if err != nil {
return fmt.Errorf("couldn't get repo %q: %w", t.RepoID(), err)
Expand Down

0 comments on commit 386f46a

Please sign in to comment.