Skip to content

Commit

Permalink
feat: support nice/ionice as a repo setting (#309)
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge authored Jun 1, 2024
1 parent 43fb254 commit 0c9f366
Show file tree
Hide file tree
Showing 17 changed files with 1,011 additions and 439 deletions.
877 changes: 540 additions & 337 deletions gen/go/v1/config.pb.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions internal/orchestrator/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package logging
import (
"context"
"io"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type contextKey int
Expand All @@ -22,3 +25,20 @@ func WriterFromContext(ctx context.Context) io.Writer {
func ContextWithWriter(ctx context.Context, logger io.Writer) context.Context {
return context.WithValue(ctx, contextKeyLogWriter, logger)
}

// Logger returns a logger from the context, or the global logger if none is found.
// this is somewhat expensive, it should be called once per task.
func Logger(ctx context.Context) *zap.Logger {
writer := WriterFromContext(ctx)
if writer == nil {
return zap.L()
}
p := zap.NewProductionEncoderConfig()
p.EncodeTime = zapcore.ISO8601TimeEncoder
fe := zapcore.NewJSONEncoder(p)
l := zap.New(zapcore.NewTee(
zap.L().Core(),
zapcore.NewCore(fe, zapcore.AddSync(writer), zapcore.DebugLevel),
))
return l
}
52 changes: 52 additions & 0 deletions internal/orchestrator/repo/command_prefix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package repo

import (
"errors"
"os/exec"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/pkg/restic"
)

func niceAvailable() bool {
_, err := exec.LookPath("nice")
return err == nil
}

func ioniceAvailable() bool {
_, err := exec.LookPath("ionice")
return err == nil
}

// resolveCommandPrefix returns a list of restic.GenericOption that should be applied to a restic command based on the given prefix.
func resolveCommandPrefix(prefix *v1.CommandPrefix) ([]restic.GenericOption, error) {
var opts []restic.GenericOption

if prefix.GetCpuNice() != v1.CommandPrefix_CPU_DEFAULT {
if !niceAvailable() {
return nil, errors.New("nice not available, cpu_nice cannot be used")
}
switch prefix.GetCpuNice() {
case v1.CommandPrefix_CPU_HIGH:
opts = append(opts, restic.WithPrefixCommand("nice", "-n", "-10"))
case v1.CommandPrefix_CPU_LOW:
opts = append(opts, restic.WithPrefixCommand("nice", "-n", "10"))
}
}

if prefix.GetIoNice() != v1.CommandPrefix_IO_DEFAULT {
if !ioniceAvailable() {
return nil, errors.New("ionice not available, io_nice cannot be used")
}
switch prefix.GetIoNice() {
case v1.CommandPrefix_IO_IDLE:
opts = append(opts, restic.WithPrefixCommand("ionice", "-c", "3")) // idle priority, only runs when other IO is not queued.
case v1.CommandPrefix_IO_BEST_EFFORT_LOW:
opts = append(opts, restic.WithPrefixCommand("ionice", "-c", "2", "-n", "7")) // best effort, low priority. Default is -n 4.
case v1.CommandPrefix_IO_BEST_EFFORT_HIGH:
opts = append(opts, restic.WithPrefixCommand("ionice", "-c", "2", "-n", "0")) // best effort, high(er) than default priority. Default is -n 4.
}
}

return opts, nil
}
9 changes: 8 additions & 1 deletion internal/orchestrator/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,18 @@ func NewRepoOrchestrator(config *v1.Config, repoConfig *v1.Repo, resticPath stri
for _, f := range repoConfig.GetFlags() {
args, err := shlex.Split(f)
if err != nil {
return nil, fmt.Errorf("failed to parse flag %q for repo %q: %w", f, repoConfig.Id, err)
return nil, fmt.Errorf("parse flag %q for repo %q: %w", f, repoConfig.Id, err)
}
opts = append(opts, restic.WithFlags(args...))
}

// Resolve command prefix
if extraOpts, err := resolveCommandPrefix(repoConfig.GetCommandPrefix()); err != nil {
return nil, fmt.Errorf("resolve command prefix: %w", err)
} else {
opts = append(opts, extraOpts...)
}

// Add BatchMode=yes to sftp.args if it's not already set.
if slices.IndexFunc(repoConfig.GetFlags(), func(a string) bool {
return strings.Contains(a, "sftp.args")
Expand Down
83 changes: 58 additions & 25 deletions internal/orchestrator/repo/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"os"
"runtime"
"slices"
"strings"
"testing"
Expand All @@ -20,39 +21,71 @@ var configForTest = &v1.Config{
func TestBackup(t *testing.T) {
t.Parallel()

repo := t.TempDir()
testData := test.CreateTestData(t)

// create a new repo with cache disabled for testing
r := &v1.Repo{
Id: "test",
Uri: repo,
Password: "test",
Flags: []string{"--no-cache"},
tcs := []struct {
name string
repo *v1.Repo
plan *v1.Plan
unixOnly bool
}{
{
name: "backup",
repo: &v1.Repo{
Id: "test",
Uri: t.TempDir(),
Password: "test",
},
plan: &v1.Plan{
Id: "test",
Repo: "test",
Paths: []string{testData},
},
},
{
name: "backup with ionice",
repo: &v1.Repo{
Id: "test",
Uri: t.TempDir(),
Password: "test",
CommandPrefix: &v1.CommandPrefix{
IoNice: v1.CommandPrefix_IO_BEST_EFFORT_LOW,
CpuNice: v1.CommandPrefix_CPU_LOW,
},
},
plan: &v1.Plan{
Id: "test",
Repo: "test",
Paths: []string{testData},
},
unixOnly: true,
},
}

plan := &v1.Plan{
Id: "test",
Repo: "test",
Paths: []string{testData},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
if tc.unixOnly && runtime.GOOS == "windows" {
t.Skip("skipping on windows")
}

orchestrator, err := NewRepoOrchestrator(configForTest, r, helpers.ResticBinary(t))
if err != nil {
t.Fatalf("failed to create repo orchestrator: %v", err)
}
orchestrator, err := NewRepoOrchestrator(configForTest, tc.repo, helpers.ResticBinary(t))
if err != nil {
t.Fatalf("failed to create repo orchestrator: %v", err)
}

summary, err := orchestrator.Backup(context.Background(), plan, nil)
if err != nil {
t.Fatalf("backup error: %v", err)
}
summary, err := orchestrator.Backup(context.Background(), tc.plan, nil)
if err != nil {
t.Fatalf("backup error: %v", err)
}

if summary.SnapshotId == "" {
t.Fatal("expected snapshot id")
}
if summary.SnapshotId == "" {
t.Fatal("expected snapshot id")
}

if summary.FilesNew != 100 {
t.Fatalf("expected 100 new files, got %d", summary.FilesNew)
if summary.FilesNew != 100 {
t.Fatalf("expected 100 new files, got %d", summary.FilesNew)
}
})
}
}

Expand Down
7 changes: 7 additions & 0 deletions internal/orchestrator/taskrunnerimpl.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package orchestrator

import (
"context"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/hook"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/orchestrator/logging"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
"github.com/garethgeorge/backrest/internal/orchestrator/tasks"
"go.uber.org/zap"
)

// taskRunnerImpl is an implementation of TaskRunner for the default orchestrator.
Expand Down Expand Up @@ -117,3 +120,7 @@ func (t *taskRunnerImpl) Config() *v1.Config {
t.config = t.orchestrator.Config()
return t.config
}

func (t *taskRunnerImpl) Logger(ctx context.Context) *zap.Logger {
return logging.Logger(ctx).Named(t.t.Name())
}
26 changes: 0 additions & 26 deletions internal/orchestrator/tasks/loggingutil.go

This file was deleted.

3 changes: 3 additions & 0 deletions internal/orchestrator/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/garethgeorge/backrest/internal/hook"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -46,6 +47,8 @@ type TaskRunner interface {
ScheduleTask(task Task, priority int) error
// Config returns the current config.
Config() *v1.Config
// Logger returns the logger.
Logger(ctx context.Context) *zap.Logger
}

// ScheduledTask is a task that is scheduled to run at a specific time.
Expand Down
8 changes: 4 additions & 4 deletions internal/orchestrator/tasks/taskbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (t *BackupTask) Next(now time.Time, runner TaskRunner) (ScheduledTask, erro
}

func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner) error {
l := Logger(ctx, st.Task)
l := runner.Logger(ctx)

startTime := time.Now()
op := st.Op
Expand Down Expand Up @@ -133,10 +133,10 @@ func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunne

backupOp.OperationBackup.LastStatus = protoutil.BackupProgressEntryToProto(entry)
} else if entry.MessageType == "error" {
zap.S().Warnf("an unknown error was encountered in processing item: %v", entry.Item)
l.Sugar().Warnf("an unknown error was encountered in processing item: %v", entry.Item)
backupError, err := protoutil.BackupProgressEntryToBackupError(entry)
if err != nil {
zap.S().Errorf("failed to convert backup progress entry to backup error: %v", err)
l.Sugar().Errorf("failed to convert backup progress entry to backup error: %v", err)
return
}
if len(backupOp.OperationBackup.Errors) > maxBackupErrorHistoryLength ||
Expand All @@ -158,7 +158,7 @@ func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunne
sendWg.Add(1)
go func() {
if err := runner.UpdateOperation(op); err != nil {
zap.S().Errorf("failed to update oplog with progress for backup: %v", err)
l.Sugar().Errorf("failed to update oplog with progress for backup: %v", err)
}
sendWg.Done()
}()
Expand Down
15 changes: 8 additions & 7 deletions internal/orchestrator/tasks/taskforget.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewOneoffForgetTask(repoID, planID string, flowID int64, at time.Time) Task
func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) error {
t := st.Task
log := taskRunner.OpLog()
l := taskRunner.Logger(ctx)

r, err := taskRunner.GetRepoOrchestrator(t.RepoID())
if err != nil {
Expand All @@ -69,13 +70,13 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner)
}

tags := []string{repo.TagForPlan(t.PlanID())}
if compat, err := useLegacyCompatMode(log, t.RepoID(), t.PlanID()); err != nil {
if compat, err := useLegacyCompatMode(l, log, t.RepoID(), t.PlanID()); err != nil {
return fmt.Errorf("check legacy compat mode: %w", err)
} else if !compat {
tags = append(tags, repo.TagForInstance(taskRunner.Config().Instance))
} else {
zap.L().Warn("forgetting snapshots without instance ID, using legacy behavior (e.g. --tags not including instance ID)")
zap.S().Warnf("to avoid this warning, tag all snapshots with the instance ID e.g. by running: \r\n"+
l.Warn("forgetting snapshots without instance ID, using legacy behavior (e.g. --tags not including instance ID)")
l.Sugar().Warnf("to avoid this warning, tag all snapshots with the instance ID e.g. by running: \r\n"+
"restic tag --set '%s' --set '%s' --tag '%s'", repo.TagForPlan(t.PlanID()), repo.TagForInstance(taskRunner.Config().Instance), repo.TagForPlan(t.PlanID()))
}

Expand Down Expand Up @@ -103,7 +104,7 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner)
}
}

zap.S().Debugf("found %v snapshots were forgotten, marking this in oplog", len(ops))
l.Sugar().Debugf("found %v snapshots were forgotten, marking this in oplog", len(ops))

for _, op := range ops {
if indexOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
Expand All @@ -120,7 +121,7 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner)

// useLegacyCompatMode checks if there are any snapshots that were created without a `created-by` tag still exist in the repo.
// The property is overridden if mixed `created-by` tag values are found.
func useLegacyCompatMode(log *oplog.OpLog, repoID, planID string) (bool, error) {
func useLegacyCompatMode(l *zap.Logger, log *oplog.OpLog, repoID, planID string) (bool, error) {
instanceIDs := make(map[string]struct{})
if err := log.ForEach(oplog.Query{RepoId: repoID, PlanId: planID}, indexutil.CollectAll(), func(op *v1.Operation) error {
if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok && !snapshotOp.OperationIndexSnapshot.GetForgot() {
Expand All @@ -136,9 +137,9 @@ func useLegacyCompatMode(log *oplog.OpLog, repoID, planID string) (bool, error)
}
delete(instanceIDs, "")
if len(instanceIDs) > 1 {
zap.L().Warn("found mixed instance IDs in indexed snapshots, overriding legacy forget behavior to include instance ID tags. This may result in unexpected behavior -- please inspect the tags on your snapshots.")
l.Sugar().Warn("found mixed instance IDs in indexed snapshots, overriding legacy forget behavior to include instance ID tags. This may result in unexpected behavior -- please inspect the tags on your snapshots.")
return false, nil
}
zap.L().Warn("found legacy snapshots without instance ID, recommending legacy forget behavior.")
l.Sugar().Warn("found legacy snapshots without instance ID, recommending legacy forget behavior.")
return true, nil
}
3 changes: 2 additions & 1 deletion internal/orchestrator/tasks/taskindexsnapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewOneoffIndexSnapshotsTask(repoID string, at time.Time) Task {
func indexSnapshotsHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) error {
t := st.Task
oplog := taskRunner.OpLog()
l := taskRunner.Logger(ctx)

repo, err := taskRunner.GetRepoOrchestrator(t.RepoID())
if err != nil {
Expand Down Expand Up @@ -148,7 +149,7 @@ func indexSnapshotsHelper(ctx context.Context, st ScheduledTask, taskRunner Task
}

// Print stats at the end of indexing.
zap.L().Debug("indexed snapshots",
l.Debug("indexed snapshots",
zap.String("repo", t.RepoID()),
zap.Duration("duration", time.Since(startTime)),
zap.Int("alreadyIndexed", len(foundIds)),
Expand Down
Loading

0 comments on commit 0c9f366

Please sign in to comment.