Skip to content

Commit

Permalink
feat: support ImportSnapshotOperation in oplog
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Nov 16, 2023
1 parent 2c421d6 commit 89f95b3
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 55 deletions.
6 changes: 2 additions & 4 deletions internal/database/indexutil/indexutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,18 @@ import (
bolt "go.etcd.io/bbolt"
)

// IndexByteValue indexes a value and recordId tuple creating multimap from value to lists of associated recordIds.
func IndexByteValue(b *bolt.Bucket, value []byte, recordId int64) error {
key := serializationutil.BytesToKey(value)
key = append(key, serializationutil.Itob(recordId)...)
return b.Put(key, []byte{})
}

// IndexSearchByteValue searches the index given a value and returns an iterator over the associated recordIds.
func IndexSearchByteValue(b *bolt.Bucket, value []byte) *IndexSearchIterator {
return newSearchIterator(b, serializationutil.BytesToKey(value))
}

func IndexSearchIntValue(b *bolt.Bucket, value int64) *IndexSearchIterator {
return newSearchIterator(b, serializationutil.Itob(value))
}

type IndexSearchIterator struct {
c *bolt.Cursor
k []byte
Expand Down
45 changes: 45 additions & 0 deletions internal/database/indexutil/indexutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package indexutil

import (
"fmt"
"testing"

"go.etcd.io/bbolt"
)

func TestIndexing(t *testing.T) {
db, err := bbolt.Open(t.TempDir() + "/test.boltdb", 0600, nil)
if err != nil {
t.Fatalf("error opening database: %s", err)
}

if err := db.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucket([]byte("test"))
if err != nil {
return fmt.Errorf("error creating bucket: %s", err)
}
for id := 0; id < 100; id += 1 {
if err := IndexByteValue(b, []byte("document"), int64(id)); err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatalf("db.Update error: %v", err)
}

if err := db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte("test"))
ids := IndexSearchByteValue(b, []byte("document")).ToSlice()
if len(ids) != 100 {
t.Errorf("want 100 ids, got %d", len(ids))
}
ids = IndexSearchByteValue(b, []byte("other")).ToSlice()
if len(ids) != 0 {
t.Errorf("want 0 ids, got %d", len(ids))
}
return nil
}); err != nil {
t.Fatalf("db.View error: %v", err)
}
}
133 changes: 82 additions & 51 deletions internal/database/oplog/oplog.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package oplog

import (
"bytes"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -30,6 +29,7 @@ var (
OpLogBucket = []byte("oplog.log") // oplog stores the operations themselves
RepoIndexBucket = []byte("oplog.repo_idx") // repo_index tracks IDs of operations affecting a given repo
PlanIndexBucket = []byte("oplog.plan_idx") // plan_index tracks IDs of operations affecting a given plan
IndexedSnapshotsSetBucket = []byte("oplog.indexed_snapshots") // indexed_snapshots is a set of snapshot IDs that have been indexed
)

// OpLog represents a log of operations performed.
Expand All @@ -53,7 +53,9 @@ func NewOpLog(databasePath string) (*OpLog, error) {

// Create the buckets if they don't exist
if err := db.Update(func(tx *bolt.Tx) error {
for _, bucket := range [][]byte{SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket} {
for _, bucket := range [][]byte{
SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket, IndexedSnapshotsSetBucket,
} {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return fmt.Errorf("error creating bucket %s: %s", string(bucket), err)
}
Expand All @@ -70,48 +72,104 @@ func (o *OpLog) Close() error {
return o.db.Close()
}

// Add adds a generic operation to the operation log.
func (o *OpLog) Add(op *v1.Operation) error {
if op.Id != 0 {
return errors.New("operation already has an ID, OpLog.Add is expected to set the ID")
}

err := o.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(OpLogBucket)

id, err := b.NextSequence()
err := o.addOperationHelper(tx, op)
if err != nil {
return fmt.Errorf("error getting next sequence: %w", err)
return err
}
return nil
})
if err == nil {
o.notifyHelper(EventTypeOpCreated, op)
}

op.Id = int64(id)
return err
}

bytes, err := proto.Marshal(op)
if err != nil {
return fmt.Errorf("error marshalling operation: %w", err)
func (o *OpLog) BulkAdd(ops []*v1.Operation) {
o.db.Update(func(tx *bolt.Tx) error {
for _, op := range ops {
if err := o.addOperationHelper(tx, op); err != nil {
return err
}
}
return nil
})
}

func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
b := tx.Bucket(OpLogBucket)

if err := b.Put(serializationutil.Itob(op.Id), bytes); err != nil {
return fmt.Errorf("error putting operation into bucket: %w", err)
id, err := b.NextSequence()
if err != nil {
return fmt.Errorf("error getting next sequence: %w", err)
}

op.Id = int64(id)

bytes, err := proto.Marshal(op)
if err != nil {
return fmt.Errorf("error marshalling operation: %w", err)
}


if err := b.Put(serializationutil.Itob(op.Id), bytes); err != nil {
return fmt.Errorf("error putting operation into bucket: %w", err)
}

// Update always universal indices
if op.RepoId != "" {
if err := indexutil.IndexByteValue(tx.Bucket(RepoIndexBucket), []byte(op.RepoId), op.Id); err != nil {
return fmt.Errorf("error adding operation to repo index: %w", err)
}
}
if op.PlanId != "" {
if err := indexutil.IndexByteValue(tx.Bucket(PlanIndexBucket), []byte(op.PlanId), op.Id); err != nil {
return fmt.Errorf("error adding operation to repo index: %w", err)
}
}

if op.RepoId != "" {
if err := indexutil.IndexByteValue(tx.Bucket(RepoIndexBucket), []byte(op.RepoId), op.Id); err != nil {
return fmt.Errorf("error adding operation to repo index: %w", err)
}
// Update operation type dependent indices.
switch wrappedOp := op.Op.(type) {
case *v1.Operation_OperationBackup:
// Nothing extra to be done.
case *v1.Operation_OperationIndexSnapshot:
if wrappedOp.OperationIndexSnapshot == nil || wrappedOp.OperationIndexSnapshot.Snapshot == nil {
return errors.New("op.OperationIndexSnapshot or op.OperationIndexSnapshot.Snapshot is nil")
}
if op.PlanId != "" {
if err := indexutil.IndexByteValue(tx.Bucket(PlanIndexBucket), []byte(op.PlanId), op.Id); err != nil {
return fmt.Errorf("error adding operation to repo index: %w", err)
}
snapshotId := serializationutil.NormalizeSnapshotId(wrappedOp.OperationIndexSnapshot.Snapshot.Id)
key := serializationutil.BytesToKey([]byte(snapshotId))
if err := tx.Bucket(IndexedSnapshotsSetBucket).Put(key, serializationutil.Itob(op.Id)); err != nil {
return fmt.Errorf("error adding OperationIndexSnapshot to indexed snapshots set: %w", err)
}
default:
return fmt.Errorf("unknown operation type: %T", wrappedOp)
}

return nil
}

func (o *OpLog) HasIndexedSnapshot(snapshotId string) (int64, error) {
var id int64
if err := o.db.View(func(tx *bolt.Tx) error {
key := serializationutil.BytesToKey([]byte(snapshotId))
idBytes := tx.Bucket(IndexedSnapshotsSetBucket).Get(key)
if idBytes == nil {
id = -1
} else {
id = serializationutil.Btoi(idBytes)
}
return nil
})
if err == nil {
o.notifyHelper(EventTypeOpCreated, op)
}); err != nil {
return 0, err
}
return err
return id, nil
}

func (o *OpLog) Update(op *v1.Operation) error {
Expand Down Expand Up @@ -237,33 +295,6 @@ func (o *OpLog) Unsubscribe(callback *func(EventType, *v1.Operation)) {
}
}

// addOpToIndexBucket adds the given operation ID to the given index bucket for the given key ID.
func (o *OpLog) addOpToIndexBucket(tx *bolt.Tx, bucket []byte, indexId string, opId int64) error {
b := tx.Bucket(bucket)

var key []byte
key = append(key, serializationutil.Stob(indexId)...)
key = append(key, serializationutil.Itob(opId)...)
if err := b.Put(key, []byte{}); err != nil {
return fmt.Errorf("error adding operation to repo index: %w", err)
}
return nil
}

// readOpsFromIndexBucket reads all operations from the given index bucket for the given key ID.
func (o *OpLog) readOpsFromIndexBucket(tx *bolt.Tx, bucket []byte, indexId string) ([]int64, error) {
b := tx.Bucket(bucket)

var ops []int64
c := b.Cursor()
prefix := serializationutil.Stob(indexId)
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
ops = append(ops, serializationutil.Btoi(k[len(prefix):]))
}

return ops, nil
}

type Filter func([]int64)[]int64

func FilterKeepAll() Filter {
Expand Down
39 changes: 39 additions & 0 deletions internal/database/oplog/oplog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,45 @@ func TestBigIO(t *testing.T) {
}
}

func TestIndexSnapshot(t *testing.T) {
t.Parallel()
log, err := NewOpLog(t.TempDir() + "/test.boltdb")
if err != nil {
t.Fatalf("error creating oplog: %s", err)
}

op := &v1.Operation{
PlanId: "plan1",
RepoId: "repo1",
Op: &v1.Operation_OperationIndexSnapshot{
OperationIndexSnapshot: &v1.OperationIndexSnapshot{
Snapshot: &v1.ResticSnapshot{
Id: "abcdefghijklmnop",
},
},
},
}
if err := log.Add(op); err != nil {
t.Fatalf("error adding operation: %s", err)
}

id, err := log.HasIndexedSnapshot("abcdefgh")
if err != nil {
t.Fatalf("error checking for snapshot: %s", err)
}
if id != op.Id {
t.Fatalf("want id %d, got %d", op.Id, id)
}

id, err = log.HasIndexedSnapshot("notfound")
if err != nil {
t.Fatalf("error checking for snapshot: %s", err)
}
if id != -1 {
t.Fatalf("want id -1, got %d", id)
}
}

func collectMessages(ops []*v1.Operation) []string {
var messages []string
for _, op := range ops {
Expand Down

0 comments on commit 89f95b3

Please sign in to comment.