Skip to content

Commit

Permalink
feat: allow hook exit codes to control backup execution (e.g fail, sk…
Browse files Browse the repository at this point in the history
…ip, etc)
  • Loading branch information
garethgeorge committed Apr 12, 2024
1 parent e96f403 commit c4ae5b3
Show file tree
Hide file tree
Showing 23 changed files with 723 additions and 286 deletions.
65 changes: 58 additions & 7 deletions gen/go/types/value.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

563 changes: 353 additions & 210 deletions gen/go/v1/config.pb.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions internal/hook/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package hook

import "fmt"

// HookErrorCancel requests that the calling operation cancel itself. It must be handled explicitly caller. Subsequent hooks will be skipped.
type HookErrorRequestCancel struct {
Err error
}

func (e HookErrorRequestCancel) Error() string {
return fmt.Sprintf("cancel: %v", e.Err.Error())
}

func (e HookErrorRequestCancel) Unwrap() error {
return e.Err
}

// HookErrorFatal stops evaluation of subsequent hooks and will propagate to the hook flow's caller
type HookErrorFatal struct {
Err error
}

func (e HookErrorFatal) Error() string {
return fmt.Sprintf("fatal: %v", e.Err.Error())
}

func (e HookErrorFatal) Unwrap() error {
return e.Err
}
77 changes: 59 additions & 18 deletions internal/hook/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hook

import (
"bytes"
"errors"
"fmt"
"io"
"slices"
Expand Down Expand Up @@ -34,15 +35,13 @@ func NewHookExecutor(oplog *oplog.OpLog, bigOutputStore *rotatinglog.RotatingLog

// ExecuteHooks schedules tasks for the hooks subscribed to the given event. The vars map is used to substitute variables
// Hooks are pulled both from the provided plan and from the repo config.
func (e *HookExecutor) ExecuteHooks(repo *v1.Repo, plan *v1.Plan, snapshotId string, events []v1.Hook_Condition, vars HookVars) {
func (e *HookExecutor) ExecuteHooks(repo *v1.Repo, plan *v1.Plan, events []v1.Hook_Condition, vars HookVars) error {
operationBase := v1.Operation{
Status: v1.OperationStatus_STATUS_INPROGRESS,
PlanId: plan.GetId(),
RepoId: repo.GetId(),
SnapshotId: snapshotId,
Status: v1.OperationStatus_STATUS_INPROGRESS,
PlanId: plan.GetId(),
RepoId: repo.GetId(),
}

vars.SnapshotId = snapshotId
vars.Repo = repo
vars.Plan = plan
vars.CurTime = time.Now()
Expand All @@ -56,14 +55,20 @@ func (e *HookExecutor) ExecuteHooks(repo *v1.Repo, plan *v1.Plan, snapshotId str

name := fmt.Sprintf("repo/%v/hook/%v", repo.Id, idx)
operation := proto.Clone(&operationBase).(*v1.Operation)
operation.DisplayMessage = "running " + name
operation.UnixTimeStartMs = curTimeMs()
operation.Op = &v1.Operation_OperationRunHook{
OperationRunHook: &v1.OperationRunHook{
Name: name,
},
}
zap.L().Info("running hook", zap.String("plan", plan.Id), zap.Int64("opId", operation.Id), zap.String("hook", name))
e.executeHook(operation, h, event, vars)
if err := e.executeHook(operation, h, event, vars); err != nil {
zap.S().Errorf("error on repo hook %v on condition %v: %v", idx, event.String(), err)
if isHaltingError(err) {
return fmt.Errorf("repo hook %v on condition %v: %w", idx, event.String(), err)
}
}
}

for idx, hook := range plan.GetHooks() {
Expand All @@ -75,15 +80,23 @@ func (e *HookExecutor) ExecuteHooks(repo *v1.Repo, plan *v1.Plan, snapshotId str

name := fmt.Sprintf("plan/%v/hook/%v", plan.Id, idx)
operation := proto.Clone(&operationBase).(*v1.Operation)
operation.DisplayMessage = "running " + name
operation.UnixTimeStartMs = curTimeMs()
operation.Op = &v1.Operation_OperationRunHook{
OperationRunHook: &v1.OperationRunHook{
Name: name,
},
}

zap.L().Info("running hook", zap.String("plan", plan.Id), zap.Int64("opId", operation.Id), zap.String("hook", name))
e.executeHook(operation, h, event, vars)
if err := e.executeHook(operation, h, event, vars); err != nil {
zap.S().Errorf("error on plan hook %v on condition %v: %v", idx, event.String(), err)
if isHaltingError(err) {
return fmt.Errorf("plan hook %v on condition %v: %w", idx, event.String(), err)
}
}
}
return nil
}

func firstMatchingCondition(hook *Hook, events []v1.Hook_Condition) v1.Hook_Condition {
Expand All @@ -95,35 +108,43 @@ func firstMatchingCondition(hook *Hook, events []v1.Hook_Condition) v1.Hook_Cond
return v1.Hook_CONDITION_UNKNOWN
}

func (e *HookExecutor) executeHook(op *v1.Operation, hook *Hook, event v1.Hook_Condition, vars HookVars) {
func (e *HookExecutor) executeHook(op *v1.Operation, hook *Hook, event v1.Hook_Condition, vars HookVars) error {
if err := e.oplog.Add(op); err != nil {
zap.S().Errorf("execute hook: add operation: %v", err)
return
return errors.New("couldn't create operation")
}

output := &bytes.Buffer{}
fmt.Fprintf(output, "triggering condition: %v\n", event.String())

var retErr error
if err := hook.Do(event, vars, io.MultiWriter(output)); err != nil {
output.Write([]byte(fmt.Sprintf("Error: %v", err)))
op.DisplayMessage = err.Error()
op.Status = v1.OperationStatus_STATUS_ERROR
zap.S().Errorf("execute hook: %v", err)
err = applyHookErrorPolicy(hook.OnError, err)
var cancelErr *HookErrorRequestCancel
if errors.As(err, &cancelErr) {
// if it was a cancel then it successfully indicated it's intent to the caller
// no error should be displayed in the UI.
op.Status = v1.OperationStatus_STATUS_SUCCESS
} else {
op.Status = v1.OperationStatus_STATUS_ERROR
}
retErr = err
} else {
op.Status = v1.OperationStatus_STATUS_SUCCESS
}

outputRef, err := e.logStore.Write(output.Bytes())
if err != nil {
zap.S().Errorf("execute hook: write log: %v", err)
return
retErr = errors.Join(retErr, fmt.Errorf("write logstore: %w", err))
}
op.Op.(*v1.Operation_OperationRunHook).OperationRunHook.OutputLogref = outputRef
op.Logref = outputRef

op.UnixTimeEndMs = curTimeMs()
if err := e.oplog.Update(op); err != nil {
zap.S().Errorf("execute hook: update operation: %v", err)
return
retErr = errors.Join(retErr, fmt.Errorf("update oplog: %w", err))
}
return retErr
}

func curTimeMs() int64 {
Expand Down Expand Up @@ -175,3 +196,23 @@ func (h *Hook) renderTemplateOrDefault(template string, defaultTmpl string, vars
}
return h.renderTemplate(template, vars)
}

func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error {
if err == nil || errors.As(err, &HookErrorFatal{}) || errors.As(err, &HookErrorRequestCancel{}) {
return err
}

if onError == v1.Hook_ON_ERROR_CANCEL {
return &HookErrorRequestCancel{Err: err}
} else if onError == v1.Hook_ON_ERROR_FATAL {
return &HookErrorFatal{Err: err}
}
return err
}

// isHaltingError returns true if the error is a fatal error or a request to cancel the operation
func isHaltingError(err error) bool {
var fatalErr *HookErrorFatal
var cancelErr *HookErrorRequestCancel
return errors.As(err, &fatalErr) || errors.As(err, &cancelErr)
}
28 changes: 28 additions & 0 deletions internal/hook/hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hook

import (
"bytes"
"errors"
"os/exec"
"runtime"
"testing"
Expand Down Expand Up @@ -56,3 +57,30 @@ exit $counter`,
t.Fatalf("expected exit code 3, got %v", err.(*exec.ExitError).ExitCode())
}
}

func TestCommandHookErrorHandling(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping test on windows")
}

hook := Hook(v1.Hook{
Conditions: []v1.Hook_Condition{
v1.Hook_CONDITION_SNAPSHOT_START,
},
Action: &v1.Hook_ActionCommand{
ActionCommand: &v1.Hook_Command{
Command: "exit 1",
},
},
OnError: v1.Hook_ON_ERROR_CANCEL,
})

err := applyHookErrorPolicy(hook.OnError, hook.Do(v1.Hook_CONDITION_SNAPSHOT_START, HookVars{}, &bytes.Buffer{}))
if err == nil {
t.Fatal("expected error")
}
var cancelErr *HookErrorRequestCancel
if !errors.As(err, &cancelErr) {
t.Fatalf("expected HookErrorRequestCancel, got %v", err)
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
39 changes: 32 additions & 7 deletions internal/orchestrator/taskbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,30 @@ func backupHelper(ctx context.Context, t Task, orchestrator *Orchestrator, plan
return fmt.Errorf("couldn't get repo %q: %w", plan.Repo, err)
}

orchestrator.hookExecutor.ExecuteHooks(repo.Config(), plan, "", []v1.Hook_Condition{
// Run start hooks e.g. preflight checks and backup start notifications.
if err := orchestrator.hookExecutor.ExecuteHooks(repo.Config(), plan, []v1.Hook_Condition{
v1.Hook_CONDITION_SNAPSHOT_START,
}, hook.HookVars{
Task: t.Name(),
})
}); err != nil {
var cancelErr *hook.HookErrorRequestCancel
if errors.As(err, &cancelErr) {
op.Status = v1.OperationStatus_STATUS_USER_CANCELLED // user visible cancelled status
op.DisplayMessage = err.Error()
return nil
}

// If the snapshot start hook fails we trigger error notification hooks.
retErr := fmt.Errorf("hook failed: %w", err)
_ = orchestrator.hookExecutor.ExecuteHooks(repo.Config(), plan, []v1.Hook_Condition{
v1.Hook_CONDITION_ANY_ERROR,
}, hook.HookVars{
Task: t.Name(),
Error: retErr.Error(),
})

return retErr
}

var sendWg sync.WaitGroup
lastSent := time.Now() // debounce progress updates, these can endup being very frequent.
Expand Down Expand Up @@ -160,24 +179,30 @@ func backupHelper(ctx context.Context, t Task, orchestrator *Orchestrator, plan
}()
})

sendWg.Wait()

if summary == nil {
summary = &restic.BackupProgressEntry{}
}

vars := hook.HookVars{
Task: t.Name(),
SnapshotStats: summary,
SnapshotId: summary.SnapshotId,
}
if err != nil {
vars.Error = err.Error()
orchestrator.hookExecutor.ExecuteHooks(repo.Config(), plan, "", []v1.Hook_Condition{
v1.Hook_CONDITION_SNAPSHOT_ERROR, v1.Hook_CONDITION_ANY_ERROR,
}, vars)

if !errors.Is(err, restic.ErrPartialBackup) {
_ = orchestrator.hookExecutor.ExecuteHooks(repo.Config(), plan, []v1.Hook_Condition{
v1.Hook_CONDITION_SNAPSHOT_ERROR, v1.Hook_CONDITION_ANY_ERROR,
}, vars)
return fmt.Errorf("repo.Backup for repo %q: %w", plan.Repo, err)
}
op.Status = v1.OperationStatus_STATUS_WARNING
op.DisplayMessage = "Partial backup, some files may not have been read completely."
}

orchestrator.hookExecutor.ExecuteHooks(repo.Config(), plan, summary.SnapshotId, []v1.Hook_Condition{
orchestrator.hookExecutor.ExecuteHooks(repo.Config(), plan, []v1.Hook_Condition{
v1.Hook_CONDITION_SNAPSHOT_END,
}, vars)

Expand Down
Loading

0 comments on commit c4ae5b3

Please sign in to comment.