Skip to content

Commit

Permalink
fix: run stats after every prune operation
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed May 20, 2024
1 parent b22028e commit 7fce593
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 21 deletions.
8 changes: 4 additions & 4 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,16 +384,16 @@ func (s *BackrestHandler) Unlock(ctx context.Context, req *connect.Request[types
}

func (s *BackrestHandler) Stats(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) {
at := time.Now()
var err error
wait := make(chan struct{})
s.orchestrator.ScheduleTask(tasks.NewOneoffStatsTask(req.Msg.Value, tasks.PlanForUnassociatedOperations, at), tasks.TaskPriorityInteractive+tasks.TaskPriorityStats, func(e error) {
if err := s.orchestrator.ScheduleTask(tasks.NewStatsTask(req.Msg.Value, tasks.PlanForSystemTasks, true), tasks.TaskPriorityInteractive+tasks.TaskPriorityStats, func(e error) {
err = e
close(wait)
})
}); err != nil {
return nil, err
}
<-wait
return connect.NewResponse(&emptypb.Empty{}), err

}

func (s *BackrestHandler) RunCommand(ctx context.Context, req *connect.Request[v1.RunCommandRequest], resp *connect.ServerStream[types.BytesValue]) error {
Expand Down
6 changes: 6 additions & 0 deletions internal/orchestrator/tasks/taskprune.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (t *PruneTask) Next(now time.Time, runner TaskRunner) (ScheduledTask, error
if repo.PrunePolicy.GetSchedule() == nil {
return NeverScheduledTask, nil
}

var lastRan time.Time
if err := runner.OpLog().ForEach(oplog.Query{RepoId: t.RepoID()}, indexutil.Reversed(indexutil.CollectAll()), func(op *v1.Operation) error {
if _, ok := op.Op.(*v1.Operation_OperationPrune); ok {
Expand Down Expand Up @@ -157,5 +158,10 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner

opPrune.OperationPrune.Output = output

// Run a stats task after a successful prune
if err := runner.ScheduleTask(NewStatsTask(t.RepoID(), PlanForSystemTasks, false), TaskPriorityStats); err != nil {
zap.L().Error("schedule stats task", zap.Error(err))
}

return nil
}
79 changes: 62 additions & 17 deletions internal/orchestrator/tasks/taskstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,79 @@ import (

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/hook"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
)

func NewOneoffStatsTask(repoID, planID string, at time.Time) Task {
return &GenericOneoffTask{
type StatsTask struct {
BaseTask
force bool
didRun bool
}

func NewStatsTask(repoID, planID string, force bool) Task {
return &StatsTask{
BaseTask: BaseTask{
TaskName: fmt.Sprintf("stats for repo %q", repoID),
TaskRepoID: repoID,
TaskPlanID: planID,
},
OneoffTask: OneoffTask{
RunAt: at,
ProtoOp: &v1.Operation{
force: force,
}
}

func (t *StatsTask) Next(now time.Time, runner TaskRunner) (ScheduledTask, error) {
if t.force {
if t.didRun {
return NeverScheduledTask, nil
}
t.didRun = true
return ScheduledTask{
Task: t,
RunAt: now,
Op: &v1.Operation{
Op: &v1.Operation_OperationStats{},
},
}, nil
}

// TODO: make the "stats" schedule configurable.
var lastRan time.Time
if err := runner.OpLog().ForEach(oplog.Query{RepoId: t.RepoID()}, indexutil.Reversed(indexutil.CollectAll()), func(op *v1.Operation) error {
if _, ok := op.Op.(*v1.Operation_OperationStats); ok {
lastRan = time.Unix(0, op.UnixTimeEndMs*int64(time.Millisecond))
return oplog.ErrStopIteration
}
return nil
}); err != nil {
return NeverScheduledTask, fmt.Errorf("finding last backup run time: %w", err)
}

// Runs every 30 days
if now.Sub(lastRan) < 30*24*time.Hour {
return ScheduledTask{}, nil
}
return ScheduledTask{
Task: t,
RunAt: now,
Op: &v1.Operation{
Op: &v1.Operation_OperationStats{},
},
Do: func(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) error {
if err := statsHelper(ctx, st, taskRunner); err != nil {
taskRunner.ExecuteHooks([]v1.Hook_Condition{
v1.Hook_CONDITION_ANY_ERROR,
}, hook.HookVars{
Task: st.Task.Name(),
Error: err.Error(),
})
return err
}
return nil
},
}, nil
}

func (t *StatsTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner) error {
if err := statsHelper(ctx, st, runner); err != nil {
runner.ExecuteHooks([]v1.Hook_Condition{
v1.Hook_CONDITION_ANY_ERROR,
}, hook.HookVars{
Task: st.Task.Name(),
Error: err.Error(),
})
return err
}

return nil
}

func statsHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) error {
Expand Down

0 comments on commit 7fce593

Please sign in to comment.