From 386f46a090e6df28f28cbca15d992ce4ad6d5dd5 Mon Sep 17 00:00:00 2001 From: Gareth Date: Sun, 5 May 2024 02:28:45 -0700 Subject: [PATCH] feat: add force kill signal handler that dumps stacks --- backrest.go | 29 +++++++++++++++---- internal/orchestrator/repo/repo.go | 2 +- internal/orchestrator/tasks/taskbackup.go | 2 +- .../orchestrator/tasks/taskindexsnapshots.go | 2 -- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/backrest.go b/backrest.go index 59d5be1f..773b5e8b 100644 --- a/backrest.go +++ b/backrest.go @@ -9,8 +9,10 @@ import ( "os" "os/signal" "path" + "runtime" "strings" "sync" + "sync/atomic" "syscall" "github.com/garethgeorge/backrest/gen/go/v1/v1connect" @@ -46,7 +48,8 @@ func main() { } ctx, cancel := context.WithCancel(context.Background()) - go onterm(cancel) + go onterm(os.Interrupt, cancel) + go onterm(os.Interrupt, newForceKillHandler()) // Load the configuration configStore := createConfigProvider() @@ -146,11 +149,13 @@ func createConfigProvider() config.ConfigStore { } } -func onterm(callback func()) { +func onterm(s os.Signal, callback func()) { sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, os.Interrupt, syscall.SIGTERM) - <-sigchan - callback() + signal.Notify(sigchan, s, syscall.SIGTERM) + for { + <-sigchan + callback() + } } func getSecret() []byte { @@ -174,3 +179,17 @@ func getSecret() []byte { } return secret } + +func newForceKillHandler() func() { + var times atomic.Int32 + return func() { + if times.Load() > 0 { + buf := make([]byte, 1<<16) + runtime.Stack(buf, true) + os.Stderr.Write(buf) + zap.S().Fatal("dumped all running coroutine stack traces, forcing termination") + } + times.Add(1) + zap.S().Warn("attempting graceful shutdown, to force termination press Ctrl+C again") + } +} diff --git a/internal/orchestrator/repo/repo.go b/internal/orchestrator/repo/repo.go index 38bb28e8..f70cb979 100644 --- a/internal/orchestrator/repo/repo.go +++ b/internal/orchestrator/repo/repo.go @@ -37,7 +37,7 @@ func NewRepoOrchestrator(config *v1.Config, repoConfig *v1.Repo, resticPath stri var opts []restic.GenericOption opts = append(opts, restic.WithEnviron()) - opts = append(opts, restic.WithEnv("RESTIC_PROGRESS_FPS=0.5")) + opts = append(opts, restic.WithEnv("RESTIC_PROGRESS_FPS=2")) if env := repoConfig.GetEnv(); len(env) != 0 { for _, e := range env { diff --git a/internal/orchestrator/tasks/taskbackup.go b/internal/orchestrator/tasks/taskbackup.go index 82a4cbbe..f69c7747 100644 --- a/internal/orchestrator/tasks/taskbackup.go +++ b/internal/orchestrator/tasks/taskbackup.go @@ -146,7 +146,7 @@ func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunne zap.S().Warnf("unexpected message type %q in backup progress entry", entry.MessageType) } - if time.Since(lastSent) < 1*time.Second { + if time.Since(lastSent) <= 1000*time.Millisecond { return } lastSent = time.Now() diff --git a/internal/orchestrator/tasks/taskindexsnapshots.go b/internal/orchestrator/tasks/taskindexsnapshots.go index 302b1564..ea109b9a 100644 --- a/internal/orchestrator/tasks/taskindexsnapshots.go +++ b/internal/orchestrator/tasks/taskindexsnapshots.go @@ -50,8 +50,6 @@ func indexSnapshotsHelper(ctx context.Context, st ScheduledTask, taskRunner Task t := st.Task oplog := taskRunner.OpLog() - config := taskRunner.Config() - repo, err := taskRunner.GetRepoOrchestrator(t.RepoID()) if err != nil { return fmt.Errorf("couldn't get repo %q: %w", t.RepoID(), err)