Skip to content

Commit

Permalink
feat: overhaul task interface and introduce 'flow ID' for simpler gro…
Browse files Browse the repository at this point in the history
…uping of operations (#253)
  • Loading branch information
garethgeorge authored Apr 29, 2024
1 parent 84b4b68 commit 7a10bdc
Show file tree
Hide file tree
Showing 44 changed files with 1,996 additions and 1,763 deletions.
277 changes: 150 additions & 127 deletions gen/go/v1/operations.pb.go

Large diffs are not rendered by default.

216 changes: 113 additions & 103 deletions gen/go/v1/service.pb.go

Large diffs are not rendered by default.

75 changes: 36 additions & 39 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"path"
"sync"
"time"

"connectrpc.com/connect"
Expand All @@ -17,6 +16,8 @@ import (
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
"github.com/garethgeorge/backrest/internal/orchestrator"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
"github.com/garethgeorge/backrest/internal/orchestrator/tasks"
"github.com/garethgeorge/backrest/internal/protoutil"
"github.com/garethgeorge/backrest/internal/resticinstaller"
"github.com/garethgeorge/backrest/internal/rotatinglog"
Expand Down Expand Up @@ -107,7 +108,7 @@ func (s *BackrestHandler) AddRepo(ctx context.Context, req *connect.Request[v1.R
return nil, fmt.Errorf("failed to find or install restic binary: %w", err)
}

r, err := orchestrator.NewRepoOrchestrator(req.Msg, bin)
r, err := repo.NewRepoOrchestrator(req.Msg, bin)
if err != nil {
return nil, fmt.Errorf("failed to configure repo: %w", err)
}
Expand All @@ -127,7 +128,7 @@ func (s *BackrestHandler) AddRepo(ctx context.Context, req *connect.Request[v1.R

// index snapshots for the newly added repository.
zap.L().Debug("scheduling index snapshots task")
s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, req.Msg.Id, time.Now()), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots)
s.orchestrator.ScheduleTask(tasks.NewOneoffIndexSnapshotsTask(req.Msg.Id, time.Now()), tasks.TaskPriorityInteractive+tasks.TaskPriorityIndexSnapshots)

zap.L().Debug("done add repo")
return connect.NewResponse(c), nil
Expand All @@ -136,7 +137,7 @@ func (s *BackrestHandler) AddRepo(ctx context.Context, req *connect.Request[v1.R
// ListSnapshots implements POST /v1/snapshots
func (s *BackrestHandler) ListSnapshots(ctx context.Context, req *connect.Request[v1.ListSnapshotsRequest]) (*connect.Response[v1.ResticSnapshotList], error) {
query := req.Msg
repo, err := s.orchestrator.GetRepo(query.RepoId)
repo, err := s.orchestrator.GetRepoOrchestrator(query.RepoId)
if err != nil {
return nil, fmt.Errorf("failed to get repo: %w", err)
}
Expand Down Expand Up @@ -170,7 +171,7 @@ func (s *BackrestHandler) ListSnapshots(ctx context.Context, req *connect.Reques

func (s *BackrestHandler) ListSnapshotFiles(ctx context.Context, req *connect.Request[v1.ListSnapshotFilesRequest]) (*connect.Response[v1.ListSnapshotFilesResponse], error) {
query := req.Msg
repo, err := s.orchestrator.GetRepo(query.RepoId)
repo, err := s.orchestrator.GetRepoOrchestrator(query.RepoId)
if err != nil {
return nil, fmt.Errorf("failed to get repo: %w", err)
}
Expand Down Expand Up @@ -258,6 +259,8 @@ func (s *BackrestHandler) GetOperations(ctx context.Context, req *connect.Reques
err = s.oplog.ForEachByRepo(req.Msg.RepoId, idCollector, opCollector)
} else if req.Msg.SnapshotId != "" {
err = s.oplog.ForEachBySnapshotId(req.Msg.SnapshotId, idCollector, opCollector)
} else if req.Msg.FlowId != 0 {
err = s.oplog.ForEachByFlowId(req.Msg.FlowId, idCollector, opCollector)
} else if len(req.Msg.Ids) > 0 {
ops = make([]*v1.Operation, 0, len(req.Msg.Ids))
for i, id := range req.Msg.Ids {
Expand Down Expand Up @@ -285,7 +288,7 @@ func (s *BackrestHandler) IndexSnapshots(ctx context.Context, req *connect.Reque
return nil, fmt.Errorf("failed to get repo %q: %w", req.Msg.Value, err)
}

s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, req.Msg.Value, time.Now()), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots)
s.orchestrator.ScheduleTask(tasks.NewOneoffIndexSnapshotsTask(req.Msg.Value, time.Now()), tasks.TaskPriorityInteractive+tasks.TaskPriorityIndexSnapshots)

return connect.NewResponse(&emptypb.Empty{}), nil
}
Expand All @@ -295,38 +298,36 @@ func (s *BackrestHandler) Backup(ctx context.Context, req *connect.Request[types
if err != nil {
return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.Value, err)
}
var wg sync.WaitGroup
wg.Add(1)
s.orchestrator.ScheduleTask(orchestrator.NewOneoffBackupTask(s.orchestrator, plan, time.Now()), orchestrator.TaskPriorityInteractive, func(e error) {
wait := make(chan struct{})
s.orchestrator.ScheduleTask(tasks.NewOneoffBackupTask(plan, time.Now()), tasks.TaskPriorityInteractive, func(e error) {
err = e
wg.Done()
close(wait)
})
wg.Wait()
<-wait
return connect.NewResponse(&emptypb.Empty{}), err
}

func (s *BackrestHandler) Forget(ctx context.Context, req *connect.Request[v1.ForgetRequest]) (*connect.Response[emptypb.Empty], error) {
at := time.Now()
var err error
_, err := s.orchestrator.GetPlan(req.Msg.PlanId)
if err != nil {
return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.PlanId, err)
}

if req.Msg.SnapshotId != "" && req.Msg.PlanId != "" && req.Msg.RepoId != "" {
wait := make(chan struct{})
s.orchestrator.ScheduleTask(
orchestrator.NewOneoffForgetSnapshotTask(s.orchestrator, req.Msg.RepoId, req.Msg.PlanId, req.Msg.SnapshotId, at),
orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) {
tasks.NewOneoffForgetSnapshotTask(req.Msg.RepoId, req.Msg.PlanId, 0, at, req.Msg.SnapshotId),
tasks.TaskPriorityInteractive+tasks.TaskPriorityForget, func(e error) {
err = e
close(wait)
})
<-wait
} else if req.Msg.RepoId != "" && req.Msg.PlanId != "" {
plan, err := s.orchestrator.GetPlan(req.Msg.PlanId)
if err != nil {
return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.PlanId, err)
}

wait := make(chan struct{})
s.orchestrator.ScheduleTask(
orchestrator.NewOneoffForgetTask(s.orchestrator, plan, "", at),
orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) {
tasks.NewOneoffForgetTask(req.Msg.RepoId, req.Msg.PlanId, 0, at),
tasks.TaskPriorityInteractive+tasks.TaskPriorityForget, func(e error) {
err = e
close(wait)
})
Expand All @@ -348,13 +349,12 @@ func (s *BackrestHandler) Prune(ctx context.Context, req *connect.Request[types.
}

at := time.Now()
var wg sync.WaitGroup
wg.Add(1)
s.orchestrator.ScheduleTask(orchestrator.NewOneoffPruneTask(s.orchestrator, plan, at, true), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityPrune, func(e error) {
wait := make(chan struct{})
s.orchestrator.ScheduleTask(tasks.NewOneoffPruneTask(plan.Repo, plan.Id, 0, at, true), tasks.TaskPriorityInteractive+tasks.TaskPriorityPrune, func(e error) {
err = e
wg.Done()
close(wait)
})
wg.Wait()
<-wait

return connect.NewResponse(&emptypb.Empty{}), nil
}
Expand All @@ -375,19 +375,17 @@ func (s *BackrestHandler) Restore(ctx context.Context, req *connect.Request[v1.R

at := time.Now()

s.orchestrator.ScheduleTask(orchestrator.NewOneoffRestoreTask(s.orchestrator, orchestrator.RestoreTaskOpts{
RepoId: req.Msg.RepoId,
PlanId: req.Msg.PlanId,
SnapshotId: req.Msg.SnapshotId,
Path: req.Msg.Path,
Target: target,
}, at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityDefault)
flowID, err := tasks.FlowIDForSnapshotID(s.oplog, req.Msg.SnapshotId)
if err != nil {
return nil, fmt.Errorf("failed to get flow ID for snapshot %q: %w", req.Msg.SnapshotId, err)
}
s.orchestrator.ScheduleTask(tasks.NewOneoffRestoreTask(req.Msg.RepoId, req.Msg.PlanId, flowID, at, req.Msg.SnapshotId, req.Msg.Path, target), tasks.TaskPriorityInteractive+tasks.TaskPriorityDefault)

return connect.NewResponse(&emptypb.Empty{}), nil
}

func (s *BackrestHandler) Unlock(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) {
repo, err := s.orchestrator.GetRepo(req.Msg.Value)
repo, err := s.orchestrator.GetRepoOrchestrator(req.Msg.Value)
if err != nil {
return nil, fmt.Errorf("failed to get repo %q: %w", req.Msg.Value, err)
}
Expand All @@ -402,13 +400,12 @@ func (s *BackrestHandler) Unlock(ctx context.Context, req *connect.Request[types
func (s *BackrestHandler) Stats(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) {
at := time.Now()
var err error
var wg sync.WaitGroup
wg.Add(1)
s.orchestrator.ScheduleTask(orchestrator.NewOneoffStatsTask(s.orchestrator, req.Msg.Value, orchestrator.PlanForUnassociatedOperations, at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityStats, func(e error) {
wait := make(chan struct{})
s.orchestrator.ScheduleTask(tasks.NewOneoffStatsTask(req.Msg.Value, tasks.PlanForUnassociatedOperations, at), tasks.TaskPriorityInteractive+tasks.TaskPriorityStats, func(e error) {
err = e
wg.Done()
close(wait)
})
wg.Wait()
<-wait
return connect.NewResponse(&emptypb.Empty{}), err

}
Expand Down
27 changes: 14 additions & 13 deletions internal/api/backresthandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,22 @@ func TestBackup(t *testing.T) {
}

// Wait for the index snapshot operation to appear in the oplog.
var snapshotId string
var snapshotOp *v1.Operation
if err := retry(t, 10, 2*time.Second, func() error {
operations := getOperations(t, sut.oplog)
if index := slices.IndexFunc(operations, func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationIndexSnapshot)
return op.Status == v1.OperationStatus_STATUS_SUCCESS && ok
}); index != -1 {
op := operations[index]
snapshotId = op.SnapshotId
snapshotOp = operations[index]
return nil
}
return errors.New("snapshot not indexed")
}); err != nil {
t.Fatalf("Couldn't find snapshot in oplog")
}

if snapshotId == "" {
if snapshotOp.SnapshotId == "" {
t.Fatalf("snapshotId must be set")
}

Expand All @@ -162,8 +161,8 @@ func TestBackup(t *testing.T) {
return op.Status == v1.OperationStatus_STATUS_SUCCESS && ok
}); index != -1 {
op := operations[index]
if op.SnapshotId != snapshotId {
t.Fatalf("Snapshot ID mismatch on forget operation")
if op.FlowId != snapshotOp.FlowId {
t.Fatalf("Flow ID mismatch on forget operation")
}
return nil
}
Expand Down Expand Up @@ -460,30 +459,29 @@ func TestRestore(t *testing.T) {
}

// Wait for the index snapshot operation to appear in the oplog.
var snapshotId string
var snapshotOp *v1.Operation
if err := retry(t, 10, 2*time.Second, func() error {
operations := getOperations(t, sut.oplog)
if index := slices.IndexFunc(operations, func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationIndexSnapshot)
return op.Status == v1.OperationStatus_STATUS_SUCCESS && ok
}); index != -1 {
op := operations[index]
snapshotId = op.SnapshotId
snapshotOp = operations[index]
return nil
}
return errors.New("snapshot not indexed")
}); err != nil {
t.Fatalf("Couldn't find snapshot in oplog")
}

if snapshotId == "" {
if snapshotOp.SnapshotId == "" {
t.Fatalf("snapshotId must be set")
}

restoreTarget := t.TempDir()

_, err = sut.handler.Restore(context.Background(), connect.NewRequest(&v1.RestoreSnapshotRequest{
SnapshotId: snapshotId,
SnapshotId: snapshotOp.SnapshotId,
PlanId: "test",
RepoId: "local",
Target: restoreTarget,
Expand All @@ -500,8 +498,11 @@ func TestRestore(t *testing.T) {
return op.Status == v1.OperationStatus_STATUS_SUCCESS && ok
}); index != -1 {
op := operations[index]
if op.SnapshotId != snapshotId {
t.Fatalf("Snapshot ID mismatch on restore operation")
if op.FlowId != snapshotOp.FlowId {
t.Errorf("Flow ID mismatch on restore operation")
}
if op.SnapshotId != snapshotOp.SnapshotId {
t.Errorf("Snapshot ID mismatch on restore operation")
}
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions internal/hook/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ func NewHookExecutor(oplog *oplog.OpLog, bigOutputStore *rotatinglog.RotatingLog

// ExecuteHooks schedules tasks for the hooks subscribed to the given event. The vars map is used to substitute variables
// Hooks are pulled both from the provided plan and from the repo config.
func (e *HookExecutor) ExecuteHooks(repo *v1.Repo, plan *v1.Plan, events []v1.Hook_Condition, vars HookVars) error {
func (e *HookExecutor) ExecuteHooks(flowID int64, repo *v1.Repo, plan *v1.Plan, events []v1.Hook_Condition, vars HookVars) error {
operationBase := v1.Operation{
Status: v1.OperationStatus_STATUS_INPROGRESS,
PlanId: plan.GetId(),
RepoId: repo.GetId(),
FlowId: flowID,
}

vars.Repo = repo
Expand All @@ -59,10 +60,11 @@ func (e *HookExecutor) ExecuteHooks(repo *v1.Repo, plan *v1.Plan, events []v1.Ho
operation.UnixTimeStartMs = curTimeMs()
operation.Op = &v1.Operation_OperationRunHook{
OperationRunHook: &v1.OperationRunHook{
Name: name,
Name: name,
Condition: event,
},
}
zap.L().Info("running hook", zap.String("plan", plan.Id), zap.Int64("opId", operation.Id), zap.String("hook", name))
zap.L().Info("running hook", zap.String("plan", repo.Id), zap.Int64("opId", operation.Id), zap.String("hook", name))
if err := e.executeHook(operation, h, event, vars); err != nil {
zap.S().Errorf("error on repo hook %v on condition %v: %v", idx, event.String(), err)
if isHaltingError(err) {
Expand All @@ -84,7 +86,8 @@ func (e *HookExecutor) ExecuteHooks(repo *v1.Repo, plan *v1.Plan, events []v1.Ho
operation.UnixTimeStartMs = curTimeMs()
operation.Op = &v1.Operation_OperationRunHook{
OperationRunHook: &v1.OperationRunHook{
Name: name,
Name: name,
Condition: event,
},
}

Expand Down
13 changes: 13 additions & 0 deletions internal/ioutil/ioutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,16 @@ func (w *OutputCapturer) Bytes() []byte {

return buf.Bytes()
}

type SynchronizedWriter struct {
mu sync.Mutex
W io.Writer
}

var _ io.Writer = &SynchronizedWriter{}

func (w *SynchronizedWriter) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.W.Write(p)
}
Loading

0 comments on commit 7a10bdc

Please sign in to comment.