Skip to content

Commit

Permalink
feat: implement snapshot indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Nov 16, 2023
1 parent 6491dbe commit a90b30e
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 38 deletions.
41 changes: 22 additions & 19 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,28 @@ func (o *Orchestrator) GetPlan(planId string) (*v1.Plan, error) {

// Run is the main orchestration loop. Cancel the context to stop the loop.
func (o *Orchestrator) Run(mainCtx context.Context) error {
zap.L().Info("Starting orchestrator loop")
zap.L().Info("starting orchestrator loop")

o.mu.Lock()
o.configUpdates = make(chan *v1.Config)
o.mu.Unlock()

for mainCtx.Err() == nil {
for {
o.mu.Lock()
config := o.config
o.mu.Unlock()
o.runVersion(mainCtx, config)
zap.L().Info("Restarting orchestrator loop")
if o.runVersion(mainCtx, config) {
zap.L().Info("restarting orchestrator loop")
} else {
zap.L().Info("exiting orchestrator loop, context cancelled.")
break
}
}
zap.L().Info("Exited orchestrator loop, context cancelled.")

return nil
}

// runImmutable is a helper function for Run() that runs the orchestration loop with a single version of the config.
func (o *Orchestrator) runVersion(mainCtx context.Context, config *v1.Config) {
func (o *Orchestrator) runVersion(mainCtx context.Context, config *v1.Config) bool {
lock := sync.Mutex{}
ctx, cancel := context.WithCancel(mainCtx)

Expand All @@ -126,35 +128,35 @@ func (o *Orchestrator) runVersion(mainCtx context.Context, config *v1.Config) {

runAt := t.Next(curTime)
if runAt == nil {
zap.L().Debug("Task has no next run, not scheduling.", zap.String("task", t.Name()))
zap.L().Debug("task has no next run, not scheduling.", zap.String("task", t.Name()))
return
}

timer := time.NewTimer(runAt.Sub(curTime))
zap.L().Debug("Scheduling task", zap.String("task", t.Name()), zap.String("runAt", runAt.Format(time.RFC3339)))
zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", runAt.Format(time.RFC3339)))

wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
timer.Stop()
zap.L().Debug("Cancelling scheduled (but not running) task, orchestrator context is cancelled.", zap.String("task", t.Name()))
zap.L().Debug("cancelled scheduled (but not running) task, orchestrator context is cancelled.", zap.String("task", t.Name()))
return
case <-timer.C:
lock.Lock()
defer lock.Unlock()
zap.L().Debug("Running task", zap.String("task", t.Name()))
zap.L().Info("running task", zap.String("task", t.Name()))

// Task execution runs with mainCtx meaning config changes do not interrupt it, but cancelling the orchestration loop will.
if err := t.Run(mainCtx); err != nil {
zap.L().Error("Task failed", zap.String("task", t.Name()), zap.Error(err))
zap.L().Error("task failed", zap.String("task", t.Name()), zap.Error(err))
} else {
zap.L().Debug("Task finished", zap.String("task", t.Name()))
zap.L().Debug("task finished", zap.String("task", t.Name()))
}

if ctx.Err() != nil {
zap.L().Debug("Not attempting to reschedule task, orchestrator context is cancelled.", zap.String("task", t.Name()))
zap.L().Debug("not attempting to reschedule task, orchestrator context is cancelled.", zap.String("task", t.Name()))
return
}

Expand All @@ -167,7 +169,7 @@ func (o *Orchestrator) runVersion(mainCtx context.Context, config *v1.Config) {
for _, plan := range config.Plans {
t, err := NewScheduledBackupTask(o, plan)
if err != nil {
zap.L().Error("Failed to create backup task for plan", zap.String("plan", plan.Id), zap.Error(err))
zap.L().Error("failed to create backup task for plan", zap.String("plan", plan.Id), zap.Error(err))
}

execTask(t)
Expand All @@ -178,15 +180,16 @@ func (o *Orchestrator) runVersion(mainCtx context.Context, config *v1.Config) {
select {
case t := <-o.externTasks:
execTask(t)
case <-ctx.Done():
case <-mainCtx.Done():
zap.L().Info("orchestrator context cancelled, shutting down orchestrator")
cancel()
wg.Wait()
return
return false
case <-o.configUpdates:
zap.L().Info("Orchestrator received config change, waiting for in-progress operations")
zap.L().Info("orchestrator received config change, waiting for in-progress operations then restarting")
cancel()
wg.Wait()
return
return true
}
}
}
Expand Down
57 changes: 43 additions & 14 deletions internal/orchestrator/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

v1 "github.com/garethgeorge/resticui/gen/go/v1"
"github.com/garethgeorge/resticui/pkg/restic"
"go.uber.org/zap"
)

// RepoOrchestrator is responsible for managing a single repo.
Expand All @@ -19,8 +20,10 @@ type RepoOrchestrator struct {
repoConfig *v1.Repo
repo *restic.Repo

// TODO: decide if snapshot caching is a good idea. We gain performance but
// increase background memory use by a small amount at all times (probably on the order of 1MB).
snapshotsMu sync.Mutex // enable very fast snapshot access IF no update is required.
snapshotsAge time.Time
snapshotsResetTimer *time.Timer
snapshots []*restic.Snapshot
}

Expand All @@ -31,21 +34,33 @@ func newRepoOrchestrator(repoConfig *v1.Repo, repo *restic.Repo) *RepoOrchestrat
}
}

func (r *RepoOrchestrator) updateSnapshotsIfNeeded(ctx context.Context) error {
r.snapshotsMu.Lock()
defer r.snapshotsMu.Unlock()
if time.Since(r.snapshotsAge) > 10 * time.Minute {
r.snapshots = nil
func (r *RepoOrchestrator) updateSnapshotsIfNeeded(ctx context.Context, force bool) error {
if r.snapshots != nil {
return nil
}

if r.snapshotsResetTimer != nil {
if !r.snapshotsResetTimer.Stop() {
<-r.snapshotsResetTimer.C
}
}

r.snapshotsResetTimer = time.AfterFunc(10 * time.Minute, func() {
r.snapshotsMu.Lock()
defer r.snapshotsMu.Unlock()
r.snapshots = nil
})

if r.snapshots != nil {
return nil
}

r.mu.Lock()
defer r.mu.Unlock()

snapshots, err := r.repo.Snapshots(ctx, restic.WithPropagatedEnvVars(restic.EnvToPropagate...))
startTime := time.Now()

snapshots, err := r.repo.Snapshots(ctx, restic.WithPropagatedEnvVars(restic.EnvToPropagate...), restic.WithFlags("--latest", "1000"))
if err != nil {
return fmt.Errorf("failed to update snapshots: %w", err)
}
Expand All @@ -55,26 +70,29 @@ func (r *RepoOrchestrator) updateSnapshotsIfNeeded(ctx context.Context) error {
})
r.snapshots = snapshots

zap.L().Debug("Updated snapshots", zap.String("repo", r.repoConfig.Id), zap.Duration("duration", time.Since(startTime)))

return nil
}

func (r *RepoOrchestrator) Snapshots(ctx context.Context) ([]*restic.Snapshot, error) {
if err := r.updateSnapshotsIfNeeded(ctx); err != nil {
r.snapshotsMu.Lock()
defer r.snapshotsMu.Unlock()
if err := r.updateSnapshotsIfNeeded(ctx, false); err != nil {
return nil, err
}

r.snapshotsMu.Lock()
defer r.snapshotsMu.Unlock()
return r.snapshots, nil
}

func (r *RepoOrchestrator) SnapshotsForPlan(ctx context.Context, plan *v1.Plan) ([]*restic.Snapshot, error) {
if err := r.updateSnapshotsIfNeeded(ctx); err != nil {
return nil, err
}

r.snapshotsMu.Lock()
defer r.snapshotsMu.Unlock()

if err := r.updateSnapshotsIfNeeded(ctx, false); err != nil {
return nil, err
}

return filterSnapshotsForPlan(r.snapshots, plan), nil
}

Expand All @@ -87,6 +105,8 @@ func (r *RepoOrchestrator) Backup(ctx context.Context, plan *v1.Plan, progressCa
r.mu.Lock()
defer r.mu.Unlock()

startTime := time.Now()

var opts []restic.BackupOption
opts = append(opts, restic.WithBackupPaths(plan.Paths...))
opts = append(opts, restic.WithBackupExcludes(plan.Excludes...))
Expand All @@ -101,6 +121,15 @@ func (r *RepoOrchestrator) Backup(ctx context.Context, plan *v1.Plan, progressCa
if err != nil {
return nil, fmt.Errorf("failed to backup: %w", err)
}

// Reset snapshots since a new backup has been added.
r.snapshotsMu.Lock()
r.snapshots = nil
r.snapshotsMu.Unlock()

zap.L().Debug("Backup completed", zap.String("repo", r.repoConfig.Id), zap.Duration("duration", time.Since(startTime)))


return summary, nil
}

Expand Down
75 changes: 70 additions & 5 deletions internal/orchestrator/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ func backupHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan
Op: backupOp,
}

return WithOperation(orchestrator.oplog, op, func() error {
startTime := time.Now()

err := WithOperation(orchestrator.oplog, op, func() error {
zap.L().Info("Starting backup", zap.String("plan", plan.Id))
repo, err := orchestrator.GetRepo(plan.Repo)
if err != nil {
return fmt.Errorf("failed to get repo %q: %w", plan.Repo, err)
return fmt.Errorf("couldn't get repo %q: %w", plan.Repo, err)
}

lastSent := time.Now() // debounce progress updates, these can endup being very frequent.
Expand All @@ -116,20 +118,83 @@ func backupHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan
if err := orchestrator.oplog.Update(op); err != nil {
zap.S().Errorf("failed to update oplog with progress for backup: %v", err)
}
zap.L().Debug("Backup progress", zap.Float64("progress", entry.PercentDone))
zap.L().Debug("backup progress", zap.Float64("progress", entry.PercentDone))
})
if err != nil {
return fmt.Errorf("failed to backup repo %q: %w", plan.Repo, err)
return fmt.Errorf("repo.Backup for repo %q: %w", plan.Repo, err)
}

backupOp.OperationBackup.LastStatus = summary.ToProto()
if err := orchestrator.oplog.Update(op); err != nil {
return fmt.Errorf("update oplog with summary for backup: %v", err)
}

zap.L().Info("Backup complete", zap.String("plan", plan.Id))
zap.L().Info("backup complete", zap.String("plan", plan.Id), zap.Duration("duration", time.Since(startTime)))
return nil
})
if err != nil {
return fmt.Errorf("backup operation: %w", err)
}

// this could alternatively be a separate operation, but it probably makes sense to index snapshots immediately after a backup.
if err := indexSnapshotsHelper(ctx, orchestrator, plan); err != nil {
return fmt.Errorf("reindexing snapshots after backup operation: %w", err)
}

return nil
}

func indexSnapshotsHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan) error {
repo, err := orchestrator.GetRepo(plan.Repo)
if err != nil {
return fmt.Errorf("couldn't get repo %q: %w", plan.Repo, err)
}

snapshots, err := repo.SnapshotsForPlan(ctx, plan)
if err != nil {
return fmt.Errorf("get snapshots for plan %q: %w", plan.Id, err)
}

startTime := time.Now()
alreadyIndexed := 0
opTime := curTimeMillis()
var indexOps []*v1.Operation
for _, snapshot := range snapshots {
opid, err := orchestrator.oplog.HasIndexedSnapshot(snapshot.Id)
if err != nil {
return fmt.Errorf("HasIndexSnapshot for snapshot %q: %w", snapshot.Id, err)
}

if opid < 0 {
continue
}

indexOps = append(indexOps, &v1.Operation{
RepoId: plan.Repo,
PlanId: plan.Id,
UnixTimeStartMs: opTime,
UnixTimeEndMs: opTime,
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationIndexSnapshot{
OperationIndexSnapshot: &v1.OperationIndexSnapshot{
Snapshot: snapshot.ToProto(),
},
},
})
}

if err := orchestrator.oplog.BulkAdd(indexOps); err != nil {
return fmt.Errorf("BulkAdd snapshot operations: %w", err)
}

zap.L().Debug("Indexed snapshots",
zap.String("plan", plan.Id),
zap.Duration("duration", time.Since(startTime)),
zap.Int("alreadyIndexed", alreadyIndexed),
zap.Int("newlyAdded", len(snapshots) - alreadyIndexed),
)

return err
}

// WithOperation is a utility that creates an operation to track the function's execution.
Expand Down

0 comments on commit a90b30e

Please sign in to comment.