Skip to content

Commit

Permalink
feat: improve support for instance ID tag
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed May 5, 2024
1 parent f314c7c commit be0cdd5
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 89 deletions.
16 changes: 0 additions & 16 deletions internal/config/stringutil/stringutil.go

This file was deleted.

27 changes: 7 additions & 20 deletions internal/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ import (
"strings"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/config/stringutil"
"github.com/garethgeorge/backrest/internal/config/validationutil"
"github.com/gitploy-io/cronexpr"
"github.com/hashicorp/go-multierror"
)

func ValidateConfig(c *v1.Config) error {
var err error

c.Instance, err = validateID(c.Instance)
if err != nil {
err = multierror.Append(err, fmt.Errorf("instance ID: %w", err))
if e := validationutil.ValidateID(c.Instance, validationutil.IDMaxLen); e != nil {
err = multierror.Append(err, fmt.Errorf("instance ID %q invalid: %w", c.Instance, e))
}

repos := make(map[string]*v1.Repo)
Expand Down Expand Up @@ -63,9 +62,8 @@ func ValidateConfig(c *v1.Config) error {

func validateRepo(repo *v1.Repo) error {
var err error

if repo.Id == "" || !stringutil.ValidateID(repo.Id) {
err = multierror.Append(err, fmt.Errorf("id %q contains invalid characters (or empty)", repo.Id))
if e := validationutil.ValidateID(repo.Id, 0); e != nil {
err = multierror.Append(err, fmt.Errorf("id %q invalid: %w", repo.Id, e))
}

if repo.Uri == "" {
Expand All @@ -85,12 +83,8 @@ func validateRepo(repo *v1.Repo) error {

func validatePlan(plan *v1.Plan, repos map[string]*v1.Repo) error {
var err error
if plan.Paths == nil || len(plan.Paths) == 0 {
err = multierror.Append(err, fmt.Errorf("path is required"))
}

if plan.Id == "" || !stringutil.ValidateID(plan.Id) {
err = multierror.Append(err, fmt.Errorf("id %q contains invalid characters (or empty)", plan.Id))
if e := validationutil.ValidateID(plan.Id, 0); e != nil {
err = multierror.Append(err, fmt.Errorf("id %q invalid: %w", plan.Id, e))
}

for idx, p := range plan.Paths {
Expand Down Expand Up @@ -121,10 +115,3 @@ func validatePlan(plan *v1.Plan, repos map[string]*v1.Repo) error {

return err
}

func validateID(id string) (string, error) {
if len(id) > 32 {
return "", fmt.Errorf("id %q is too long", id)
}
return stringutil.SanitizeID(id), nil
}
33 changes: 33 additions & 0 deletions internal/config/validationutil/validationutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package validationutil

import (
"errors"
"fmt"
"regexp"
)

var (
IDMaxLen = 50 // maximum length of an ID
sanitizeIDRegex = regexp.MustCompile(`[^a-zA-Z0-9_\-\.]+`) // matches invalid characters in an ID
idRegex = regexp.MustCompile(`[a-zA-Z0-9_\-\.]*`) // matches a valid ID (including empty string)
)

func SanitizeID(id string) string {
return sanitizeIDRegex.ReplaceAllString(id, "_")
}

// ValidateID checks if an ID is valid.
// It returns an error if the ID contains invalid characters, is empty, or is too long.
// The maxLen parameter is the maximum length of the ID. If maxLen is 0, the ID length is not checked.
func ValidateID(id string, maxLen int) error {
if !idRegex.MatchString(id) {
return errors.New("contains invalid characters")
}
if len(id) == 0 {
return errors.New("empty")
}
if maxLen > 0 && len(id) > maxLen {
return fmt.Errorf("too long (> %d chars)", maxLen)
}
return nil
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package stringutil
package validationutil

import "testing"
import (
"testing"
)

func TestSanitizeID(t *testing.T) {
tcs := []struct {
Expand Down
38 changes: 18 additions & 20 deletions internal/orchestrator/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type RepoOrchestrator struct {

// NewRepoOrchestrator accepts a config and a repo that is configured with the properties of that config object.
func NewRepoOrchestrator(config *v1.Config, repoConfig *v1.Repo, resticPath string) (*RepoOrchestrator, error) {
if config.Instance == "" {
return nil, errors.New("instance is a required field in the backrest config")
}

var opts []restic.GenericOption
opts = append(opts, restic.WithEnviron())
opts = append(opts, restic.WithEnv("RESTIC_PROGRESS_FPS=0.5"))
Expand Down Expand Up @@ -93,7 +97,7 @@ func (r *RepoOrchestrator) SnapshotsForPlan(ctx context.Context, plan *v1.Plan)
ctx, flush := forwardResticLogs(ctx)
defer flush()

snapshots, err := r.repo.Snapshots(ctx, restic.WithFlags("--tag", tagForPlan(plan), "--tag", r.config.Instance))
snapshots, err := r.repo.Snapshots(ctx, restic.WithFlags("--tag", TagForPlan(plan.Id), "--tag", TagForInstance(r.config.Instance)))
if err != nil {
return nil, fmt.Errorf("get snapshots for plan %q: %w", plan.Id, err)
}
Expand All @@ -119,19 +123,17 @@ func (r *RepoOrchestrator) Backup(ctx context.Context, plan *v1.Plan, progressCa
return nil, fmt.Errorf("failed to get snapshots for plan: %w", err)
}

r.l.Debug("got snapshots for plan", zap.String("repo", r.repoConfig.Id), zap.Int("count", len(snapshots)), zap.String("plan", plan.Id), zap.String("tag", tagForPlan(plan)))
r.l.Debug("got snapshots for plan", zap.String("repo", r.repoConfig.Id), zap.Int("count", len(snapshots)), zap.String("plan", plan.Id), zap.String("tag", TagForPlan(plan.Id)))

startTime := time.Now()

var opts []restic.GenericOption
opts = append(opts, restic.WithFlags("--exclude-caches"))
opts = append(opts, restic.WithFlags("--tag", tagForPlan(plan)))
if r.config.Instance != "" {
opts = append(opts, restic.WithFlags("--host", r.config.Instance))
opts = append(opts, restic.WithFlags("--tag", tagForInstance(r.config.Instance)))
} else {
return nil, errors.New("host is a required field in the backrest config")
}
opts = append(opts, restic.WithFlags(
"--exclude-caches",
"--tag", TagForPlan(plan.Id),
"--tag", TagForInstance(r.config.Instance),
"--host", r.config.Instance),
)

for _, exclude := range plan.Excludes {
opts = append(opts, restic.WithFlags("--exclude", exclude))
Expand Down Expand Up @@ -180,7 +182,7 @@ func (r *RepoOrchestrator) ListSnapshotFiles(ctx context.Context, snapshotId str
return lsEnts, nil
}

func (r *RepoOrchestrator) Forget(ctx context.Context, plan *v1.Plan) ([]*v1.ResticSnapshot, error) {
func (r *RepoOrchestrator) Forget(ctx context.Context, plan *v1.Plan, tags []string) ([]*v1.ResticSnapshot, error) {
r.mu.Lock()
defer r.mu.Unlock()
ctx, flush := forwardResticLogs(ctx)
Expand All @@ -191,9 +193,13 @@ func (r *RepoOrchestrator) Forget(ctx context.Context, plan *v1.Plan) ([]*v1.Res
return nil, fmt.Errorf("plan %q has no retention policy", plan.Id)
}

if r.config.Instance == "" {
return nil, errors.New("instance is a required field in the backrest config")
}

result, err := r.repo.Forget(
ctx, protoutil.RetentionPolicyFromProto(plan.Retention),
restic.WithFlags("--tag", tagForPlan(plan)+","+tagForInstance(r.config.Instance)),
restic.WithFlags("--tag", strings.Join(tags, ",")),
restic.WithFlags("--group-by", ""),
)
if err != nil {
Expand Down Expand Up @@ -339,14 +345,6 @@ func (r *RepoOrchestrator) Config() *v1.Repo {
return r.repoConfig
}

func tagForPlan(plan *v1.Plan) string {
return fmt.Sprintf("plan:%s", plan.Id)
}

func tagForInstance(host string) string {
return fmt.Sprintf("created-by:%s", host)
}

func sortSnapshotsByTime(snapshots []*restic.Snapshot) {
sort.SliceStable(snapshots, func(i, j int) bool {
return snapshots[i].UnixTimeMs() < snapshots[j].UnixTimeMs()
Expand Down
36 changes: 36 additions & 0 deletions internal/orchestrator/repo/tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package repo

import (
"fmt"
"strings"
)

// TagForPlan returns a tag for the plan.
func TagForPlan(planId string) string {
return fmt.Sprintf("plan:%s", planId)
}

// TagForInstance returns a tag for the instance.
func TagForInstance(instanceId string) string {
return fmt.Sprintf("created-by:%s", instanceId)
}

// InstanceIDFromTags returns the instance ID from the tags, or an empty string if not found.
func InstanceIDFromTags(tags []string) string {
for _, tag := range tags {
if strings.HasPrefix(tag, "created-by:") {
return tag[len("created-by:"):]
}
}
return ""
}

// PlanFromTags returns the plan ID from the tags, or an empty string if not found.
func PlanFromTags(tags []string) string {
for _, tag := range tags {
if strings.HasPrefix(tag, "plan:") {
return tag[len("plan:"):]
}
}
return ""
}
46 changes: 43 additions & 3 deletions internal/orchestrator/tasks/taskforget.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/hook"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -49,13 +51,14 @@ func NewOneoffForgetTask(repoID, planID string, flowID int64, at time.Time) Task

func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) error {
t := st.Task
oplog := taskRunner.OpLog()

repo, err := taskRunner.GetRepoOrchestrator(t.RepoID())
r, err := taskRunner.GetRepoOrchestrator(t.RepoID())
if err != nil {
return fmt.Errorf("get repo %q: %w", t.RepoID(), err)
}

err = repo.UnlockIfAutoEnabled(ctx)
err = r.UnlockIfAutoEnabled(ctx)
if err != nil {
return fmt.Errorf("auto unlock repo %q: %w", t.RepoID(), err)
}
Expand All @@ -65,7 +68,19 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner)
return fmt.Errorf("get plan %q: %w", t.PlanID(), err)
}

forgot, err := repo.Forget(ctx, plan)
tags := []string{repo.TagForPlan(t.PlanID())}
if compat, err := useLegacyCompatMode(oplog, t.PlanID()); err != nil {
return fmt.Errorf("check legacy compat mode: %w", err)
} else if !compat {
tags = append(tags, repo.TagForInstance(taskRunner.Config().Instance))
} else {
zap.L().Warn("forgetting snapshots without instance ID, using legacy behavior (e.g. --tags not including instance ID)")
zap.S().Warnf("to avoid this warning, tag all snapshots with the instance ID e.g. by running: \r\n"+
"restic tag --set '%s' --set '%s' --tag '%s'", repo.TagForPlan(t.PlanID()), repo.TagForInstance(taskRunner.Config().Instance), repo.TagForPlan(t.PlanID()))
}

// check if any other instance IDs exist in the repo (unassociated don't count)
forgot, err := r.Forget(ctx, plan, tags)
if err != nil {
return fmt.Errorf("forget: %w", err)
}
Expand Down Expand Up @@ -108,3 +123,28 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner)

return err
}

// useLegacyCompatMode checks if there are any snapshots that were created without a `created-by` tag still exist in the repo.
// The property is overridden if mixed `created-by` tag values are found.
func useLegacyCompatMode(oplog *oplog.OpLog, planID string) (bool, error) {
instanceIDs := make(map[string]struct{})
if err := oplog.ForEachByPlan(planID, indexutil.CollectAll(), func(op *v1.Operation) error {
if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
tags := snapshotOp.OperationIndexSnapshot.GetSnapshot().GetTags()
instanceIDs[repo.InstanceIDFromTags(tags)] = struct{}{}
}
return nil
}); err != nil {
return false, err
}
if _, ok := instanceIDs[""]; !ok {
return false, nil
}
delete(instanceIDs, "")
if len(instanceIDs) > 1 {
zap.L().Warn("found mixed instance IDs in indexed snapshots, forcing forget to use new behavior (e.g. --tags including instance ID) despite the presence of legacy (e.g. untagged) snapshots.")
return false, nil
}
zap.L().Warn("found legacy snapshots without instance ID, forget will use legacy behavior e.g. --tags not including instance ID")
return true, nil
}
47 changes: 20 additions & 27 deletions internal/orchestrator/tasks/taskindexsnapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/garethgeorge/backrest/internal/protoutil"
"github.com/garethgeorge/backrest/pkg/restic"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

func NewOneoffIndexSnapshotsTask(repoID string, at time.Time) Task {
Expand Down Expand Up @@ -71,20 +70,20 @@ func indexSnapshotsHelper(ctx context.Context, st ScheduledTask, taskRunner Task
}

// check if any migrations are required
if migrated, err := tryMigrate(ctx, repo, config, snapshots); err != nil {
return fmt.Errorf("migrate snapshots for repo %q: %w", t.RepoID(), err)
} else if migrated {
// Delete snapshot operations
if err := oplog.Delete(maps.Values(currentIds)...); err != nil {
return fmt.Errorf("delete prior indexed operations: %w", err)
}

snapshots, err = repo.Snapshots(ctx)
if err != nil {
return fmt.Errorf("get snapshots for repo %q: %w", t.RepoID(), err)
}
currentIds = nil
}
// if migrated, err := tryMigrate(ctx, repo, config, snapshots); err != nil {
// return fmt.Errorf("migrate snapshots for repo %q: %w", t.RepoID(), err)
// } else if migrated {
// // Delete snapshot operations
// if err := oplog.Delete(maps.Values(currentIds)...); err != nil {
// return fmt.Errorf("delete prior indexed operations: %w", err)
// }

// snapshots, err = repo.Snapshots(ctx)
// if err != nil {
// return fmt.Errorf("get snapshots for repo %q: %w", t.RepoID(), err)
// }
// currentIds = nil
// }

foundIds := make(map[string]struct{})

Expand All @@ -103,7 +102,7 @@ func indexSnapshotsHelper(ctx context.Context, st ScheduledTask, taskRunner Task
return fmt.Errorf("get flow ID for snapshot %q: %w", snapshot.Id, err)
}
planId := planForSnapshot(snapshotProto)
instanceID := hostForSnapshot(snapshotProto)
instanceID := instanceIDForSnapshot(snapshotProto)
indexOps = append(indexOps, &v1.Operation{
RepoId: t.RepoID(),
PlanId: planId,
Expand Down Expand Up @@ -185,21 +184,15 @@ func indexCurrentSnapshotIdsForRepo(log *oplog.OpLog, repoId string) (map[string
}

func planForSnapshot(snapshot *v1.ResticSnapshot) string {
for _, tag := range snapshot.Tags {
if strings.HasPrefix(tag, "plan:") {
return tag[len("plan:"):]
}
p := repo.PlanFromTags(snapshot.Tags)
if p != "" {
return p
}
return PlanForUnassociatedOperations
}

func hostForSnapshot(snapshot *v1.ResticSnapshot) string {
for _, tag := range snapshot.Tags {
if strings.HasPrefix(tag, "created-by:") {
return tag[len("created-by:"):]
}
}
return ""
func instanceIDForSnapshot(snapshot *v1.ResticSnapshot) string {
return repo.InstanceIDFromTags(snapshot.Tags)
}

// tryMigrate checks if the snapshots use the latest backrest tag set and migrates them if necessary.
Expand Down
Loading

0 comments on commit be0cdd5

Please sign in to comment.