Skip to content

Commit

Permalink
feat: improved stats visualization with graphs and cleanup operation …
Browse files Browse the repository at this point in the history
…filtering
  • Loading branch information
garethgeorge committed Feb 29, 2024
1 parent 3176f6e commit 5b362cc
Show file tree
Hide file tree
Showing 14 changed files with 1,771 additions and 598 deletions.
1 change: 0 additions & 1 deletion internal/orchestrator/taskbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func backupHelper(ctx context.Context, t Task, orchestrator *Orchestrator, plan
orchestrator.ScheduleTask(NewOneoffForgetTask(orchestrator, plan, op.SnapshotId, at), TaskPriorityForget)
}
orchestrator.ScheduleTask(NewOneoffIndexSnapshotsTask(orchestrator, plan.Repo, at), TaskPriorityIndexSnapshots)
orchestrator.ScheduleTask(NewOneoffStatsTask(orchestrator, plan, op.SnapshotId, at), TaskPriorityStats)

return nil
}
11 changes: 10 additions & 1 deletion internal/orchestrator/taskcollectgarbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
// - it has a forgotten snapshot associated with it
gcHistoryAge = 30 * 24 * time.Hour
gcHistoryMaxCount = 1000
// keep stats operations for 1 year (they're small and useful for long term trends)
gcHistoryStatsAge = 365 * 24 * time.Hour
)

type CollectGarbageTask struct {
Expand Down Expand Up @@ -72,9 +74,11 @@ func (t *CollectGarbageTask) gcOperations() error {
operationsByPlan := make(map[string][]gcOpInfo)
if err := oplog.ForAll(func(op *v1.Operation) error {
if op.SnapshotId == "" || snapshotIsForgotten[op.SnapshotId] {
_, isStats := op.Op.(*v1.Operation_OperationStats)
operationsByPlan[op.PlanId] = append(operationsByPlan[op.PlanId], gcOpInfo{
id: op.Id,
timestamp: op.UnixTimeStartMs,
isStats: isStats,
})
}
return nil
Expand All @@ -94,7 +98,11 @@ func (t *CollectGarbageTask) gcOperations() error {

// check if each operation timestamp is old.
for _, opInfo := range opInfos {
if curTime-opInfo.timestamp > gcHistoryAge.Milliseconds() {
maxAgeForType := gcHistoryAge.Milliseconds()
if opInfo.isStats {
maxAgeForType = gcHistoryStatsAge.Milliseconds()
}
if curTime-opInfo.timestamp > maxAgeForType {
gcOps = append(gcOps, opInfo.id)
}
}
Expand Down Expand Up @@ -122,4 +130,5 @@ func (t *CollectGarbageTask) OperationId() int64 {
type gcOpInfo struct {
id int64 // operation ID
timestamp int64 // unix time milliseconds
isStats bool // true if this is a stats operation
}
3 changes: 3 additions & 0 deletions internal/orchestrator/taskprune.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func (t *PruneTask) Run(ctx context.Context) error {
})
return err
}

t.orch.ScheduleTask(NewOneoffStatsTask(t.orch, t.plan, time.Now()), TaskPriorityStats)

return nil
}

Expand Down
61 changes: 5 additions & 56 deletions internal/orchestrator/taskstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,91 +8,40 @@ import (

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/hook"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
"go.uber.org/zap"
)

var statBytesThreshold int64 = 10 * 1024 * 1024 * 1024 // 10 GB added.
var statOperationsThreshold int = 100 // run a stat command every 100 operations.

// StatsTask tracks a restic stats operation.
type StatsTask struct {
TaskWithOperation
plan *v1.Plan
linkSnapshot string // snapshot to link the task to (if any)
at *time.Time
plan *v1.Plan
at *time.Time
}

var _ Task = &StatsTask{}

func NewOneoffStatsTask(orchestrator *Orchestrator, plan *v1.Plan, linkSnapshot string, at time.Time) *StatsTask {
func NewOneoffStatsTask(orchestrator *Orchestrator, plan *v1.Plan, at time.Time) *StatsTask {
return &StatsTask{
TaskWithOperation: TaskWithOperation{
orch: orchestrator,
},
plan: plan,
at: &at,
linkSnapshot: linkSnapshot,
plan: plan,
at: &at,
}
}

func (t *StatsTask) Name() string {
return fmt.Sprintf("stats for plan %q", t.plan.Id)
}

func (t *StatsTask) shouldRun() (bool, error) {
var bytesSinceLastStat int64 = -1
var howFarBack int = 0
if err := t.orch.OpLog.ForEachByRepo(t.plan.Repo, indexutil.Reversed(indexutil.CollectAll()), func(op *v1.Operation) error {
if op.Status == v1.OperationStatus_STATUS_PENDING || op.Status == v1.OperationStatus_STATUS_INPROGRESS {
return nil
}
howFarBack++
if _, ok := op.Op.(*v1.Operation_OperationStats); ok {
if bytesSinceLastStat == -1 {
bytesSinceLastStat = 0
}
return oplog.ErrStopIteration
} else if backup, ok := op.Op.(*v1.Operation_OperationBackup); ok && backup.OperationBackup.LastStatus != nil {
if summary, ok := backup.OperationBackup.LastStatus.Entry.(*v1.BackupProgressEntry_Summary); ok {
bytesSinceLastStat += summary.Summary.DataAdded
}
}
return nil
}); err != nil {
return false, fmt.Errorf("iterate oplog: %w", err)
}

zap.L().Debug("distance since last stat", zap.Int64("bytes", bytesSinceLastStat), zap.String("repo", t.plan.Repo), zap.Int("opsBack", howFarBack))
if howFarBack >= statOperationsThreshold {
zap.S().Debugf("distance since last stat (%v) is exceeds threshold (%v)", howFarBack, statOperationsThreshold)
return true, nil
}
if bytesSinceLastStat == -1 || bytesSinceLastStat > statBytesThreshold {
zap.S().Debugf("bytes since last stat (%v) exceeds threshold (%v)", bytesSinceLastStat, statBytesThreshold)
return true, nil
}
return false, nil
}

func (t *StatsTask) Next(now time.Time) *time.Time {
ret := t.at
if ret != nil {
t.at = nil

shouldRun, err := t.shouldRun()
if err != nil {
zap.S().Errorf("task %v failed to check if it should run: %v", t.Name(), err)
}
if !shouldRun {
return nil
}

if err := t.setOperation(&v1.Operation{
PlanId: t.plan.Id,
RepoId: t.plan.Repo,
SnapshotId: t.linkSnapshot,
UnixTimeStartMs: timeToUnixMillis(*ret),
Status: v1.OperationStatus_STATUS_PENDING,
Op: &v1.Operation_OperationStats{},
Expand Down
Loading

0 comments on commit 5b362cc

Please sign in to comment.