Skip to content

Commit

Permalink
feat: snapshot items are viewable in the UI and minor element orderin…
Browse files Browse the repository at this point in the history
…g fixes
  • Loading branch information
garethgeorge committed Nov 16, 2023
1 parent a90b30e commit 65dcd74
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 136 deletions.
18 changes: 0 additions & 18 deletions DESIGN-NOTES.md

This file was deleted.

6 changes: 4 additions & 2 deletions cmd/resticui/resticui.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ func main() {


// Create and serve API server
oplog, err := oplog.NewOpLog(path.Join(dataPath(), "oplog.boltdb"))
oplogFile := path.Join(dataPath(), "oplog.boltdb")
oplog, err := oplog.NewOpLog(oplogFile)
if err != nil {
zap.S().Fatalf("Error creating oplog: %v", err)
zap.S().Warnf("Operation log may be corrupted, if errors recur delete the file %q and restart. Your backups stored in your repos are safe.", oplogFile)
zap.S().Fatalf("Error creating oplog : %v", err)
}
defer oplog.Close()

Expand Down
1 change: 0 additions & 1 deletion internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func (s *Server) GetOperationEvents(_ *emptypb.Empty, stream v1.ResticUI_GetOper
errorChan := make(chan error)
defer close(errorChan)
callback := func(eventType oplog.EventType, op *v1.Operation) {
zap.S().Debug("Sending an event")
var eventTypeMapped v1.OperationEventType
switch eventType {
case oplog.EventTypeOpCreated:
Expand Down
6 changes: 5 additions & 1 deletion internal/config/jsonstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func (f *JsonFileStore) Update(config *v1.Config) error {
return fmt.Errorf("invalid config: %w", err)
}

data, err := protojson.Marshal(config)
data, err := protojson.MarshalOptions{
Indent: " ",
Multiline: true,
EmitUnpopulated: true,
}.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal config: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/database/indexutil/indexutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func (i *IndexSearchIterator) Next() (int64, bool) {
if i.k == nil || !bytes.HasPrefix(i.k, i.prefix) {
return 0, false
}
id := serializationutil.Btoi(i.k[len(i.prefix):])
id, err := serializationutil.Btoi(i.k[len(i.prefix):])
if err != nil {
// this sholud never happen, if it does it indicates database corruption.
return 0, false
}
i.k, _ = i.c.Next()
return id, true
}
Expand Down
85 changes: 44 additions & 41 deletions internal/database/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,19 @@ import (
"google.golang.org/protobuf/proto"
)


type EventType int

const (
EventTypeUnknown = EventType(iota)
EventTypeUnknown = EventType(iota)
EventTypeOpCreated = EventType(iota)
EventTypeOpUpdated = EventType(iota)
)

var (
SystemBucket = []byte("oplog.system") // system stores metadata
OpLogBucket = []byte("oplog.log") // oplog stores the operations themselves
RepoIndexBucket = []byte("oplog.repo_idx") // repo_index tracks IDs of operations affecting a given repo
PlanIndexBucket = []byte("oplog.plan_idx") // plan_index tracks IDs of operations affecting a given plan
SystemBucket = []byte("oplog.system") // system stores metadata
OpLogBucket = []byte("oplog.log") // oplog stores the operations themselves
RepoIndexBucket = []byte("oplog.repo_idx") // repo_index tracks IDs of operations affecting a given repo
PlanIndexBucket = []byte("oplog.plan_idx") // plan_index tracks IDs of operations affecting a given plan
IndexedSnapshotsSetBucket = []byte("oplog.indexed_snapshots") // indexed_snapshots is a set of snapshot IDs that have been indexed
)

Expand All @@ -39,7 +38,7 @@ type OpLog struct {
db *bolt.DB

subscribersMu sync.RWMutex
subscribers []*func(EventType, *v1.Operation)
subscribers []*func(EventType, *v1.Operation)
}

func NewOpLog(databasePath string) (*OpLog, error) {
Expand All @@ -52,47 +51,47 @@ func NewOpLog(databasePath string) (*OpLog, error) {
return nil, fmt.Errorf("error opening database: %s", err)
}

// Create the buckets if they don't exist
if err := db.Update(func(tx *bolt.Tx) error {
// Create the buckets if they don't exist
for _, bucket := range [][]byte{
SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket, IndexedSnapshotsSetBucket,
} {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return fmt.Errorf("creating bucket %s: %s", string(bucket), err)
}
}

// Validate the operation log on startup.
sysBucket := tx.Bucket(SystemBucket)
opLogBucket := tx.Bucket(OpLogBucket)
c := opLogBucket.Cursor()
if lastValidated := sysBucket.Get([]byte("last_validated")); lastValidated != nil {
c.Seek(lastValidated)
// Validate the operation log on startup.
sysBucket := tx.Bucket(SystemBucket)
opLogBucket := tx.Bucket(OpLogBucket)
c := opLogBucket.Cursor()
if lastValidated := sysBucket.Get([]byte("last_validated")); lastValidated != nil {
c.Seek(lastValidated)
}
for k, v := c.First(); k != nil; k, v = c.Next() {
op := &v1.Operation{}
if err := proto.Unmarshal(v, op); err != nil {
zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err))
continue
}
for k, v := c.First(); k != nil; k, v = c.Next() {
op := &v1.Operation{}
if err := proto.Unmarshal(v, op); err != nil {
zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err))
continue
if op.Status == v1.OperationStatus_STATUS_INPROGRESS {
op.Status = v1.OperationStatus_STATUS_ERROR
op.DisplayMessage = "Operation timeout."
bytes, err := proto.Marshal(op)
if err != nil {
return fmt.Errorf("marshalling operation: %w", err)
}
if op.Status == v1.OperationStatus_STATUS_INPROGRESS {
op.Status = v1.OperationStatus_STATUS_ERROR
op.DisplayMessage = "Operation timeout."
bytes, err := proto.Marshal(op)
if err != nil {
return fmt.Errorf("marshalling operation: %w", err)
}
if err := opLogBucket.Put(k, bytes); err != nil {
return fmt.Errorf("putting operation into bucket: %w", err)
}
if err := opLogBucket.Put(k, bytes); err != nil {
return fmt.Errorf("putting operation into bucket: %w", err)
}
}
if lastValidated, _ := c.Last(); lastValidated != nil {
if err := sysBucket.Put([]byte("last_validated"), lastValidated); err != nil {
return fmt.Errorf("checkpointing last_validated key: %w", err)
}
}
if lastValidated, _ := c.Last(); lastValidated != nil {
if err := sysBucket.Put([]byte("last_validated"), lastValidated); err != nil {
return fmt.Errorf("checkpointing last_validated key: %w", err)
}

}

return nil
}); err != nil {
return nil, err
Expand Down Expand Up @@ -156,7 +155,6 @@ func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
return fmt.Errorf("error marshalling operation: %w", err)
}


if err := b.Put(serializationutil.Itob(op.Id), bytes); err != nil {
return fmt.Errorf("error putting operation into bucket: %w", err)
}
Expand Down Expand Up @@ -196,12 +194,17 @@ func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
func (o *OpLog) HasIndexedSnapshot(snapshotId string) (int64, error) {
var id int64
if err := o.db.View(func(tx *bolt.Tx) error {
snapshotId := serializationutil.NormalizeSnapshotId(snapshotId)
key := serializationutil.BytesToKey([]byte(snapshotId))
idBytes := tx.Bucket(IndexedSnapshotsSetBucket).Get(key)
if idBytes == nil {
id = -1
} else {
id = serializationutil.Btoi(idBytes)
var err error
id, err = serializationutil.Btoi(idBytes)
if err != nil {
return fmt.Errorf("database corrupt, couldn't convert ID bytes to int: %w", err)
}
}
return nil
}); err != nil {
Expand Down Expand Up @@ -266,7 +269,7 @@ func (o *OpLog) Get(id int64) (*v1.Operation, error) {
if err := o.db.View(func(tx *bolt.Tx) error {
var err error
op, err = o.getHelper(tx.Bucket(OpLogBucket), id)
return err
return err
}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -319,7 +322,7 @@ func (o *OpLog) GetByPlan(planId string, filter Filter) ([]*v1.Operation, error)

func (o *OpLog) Subscribe(callback *func(EventType, *v1.Operation)) {
o.subscribersMu.Lock()
defer o.subscribersMu.Unlock()
defer o.subscribersMu.Unlock()
o.subscribers = append(o.subscribers, callback)
}

Expand All @@ -329,13 +332,13 @@ func (o *OpLog) Unsubscribe(callback *func(EventType, *v1.Operation)) {
subs := o.subscribers
for i, c := range subs {
if c == callback {
subs[i] = subs[len(subs) - 1]
o.subscribers = subs[:len(o.subscribers) - 1]
subs[i] = subs[len(subs)-1]
o.subscribers = subs[:len(o.subscribers)-1]
}
}
}

type Filter func([]int64)[]int64
type Filter func([]int64) []int64

func FilterKeepAll() Filter {
return func(ids []int64) []int64 {
Expand Down
26 changes: 20 additions & 6 deletions internal/database/serializationutil/serializationutil.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package serializationutil

import "encoding/binary"
import (
"encoding/binary"
"errors"
)

var ErrInvalidLength = errors.New("invalid length")

func Itob(v int64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
return b
}

func Btoi(b []byte) int64 {
return int64(binary.BigEndian.Uint64(b))
func Btoi(b []byte) (int64, error) {
if len(b) != 8 {
return 0, ErrInvalidLength
}
return int64(binary.BigEndian.Uint64(b)), nil
}

func Stob(v string) []byte {
Expand All @@ -19,9 +27,15 @@ func Stob(v string) []byte {
return b
}

func Btos(b []byte) (string, int64) {
length := Btoi(b[:8])
return string(b[8:8+length]), 8+length
func Btos(b []byte) (string, int64, error) {
if len(b) < 8 {
return "", 0, ErrInvalidLength
}
length, _ := Btoi(b[:8])
if int64(len(b)) < 8+length {
return "", 0, ErrInvalidLength
}
return string(b[8:8+length]), 8+length, nil
}

func BytesToKey(b []byte) []byte {
Expand Down
4 changes: 2 additions & 2 deletions internal/orchestrator/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ func (r *RepoOrchestrator) updateSnapshotsIfNeeded(ctx context.Context, force bo
}

sort.SliceStable(snapshots, func(i, j int) bool {
return snapshots[i].Time < snapshots[j].Time
return snapshots[i].UnixTimeMs() < snapshots[j].UnixTimeMs()
})
r.snapshots = snapshots

zap.L().Debug("Updated snapshots", zap.String("repo", r.repoConfig.Id), zap.Duration("duration", time.Since(startTime)))
zap.L().Debug("updated snapshots", zap.String("repo", r.repoConfig.Id), zap.Duration("duration", time.Since(startTime)))

return nil
}
Expand Down
4 changes: 3 additions & 1 deletion internal/orchestrator/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,14 @@ func indexSnapshotsHelper(ctx context.Context, orchestrator *Orchestrator, plan
opTime := curTimeMillis()
var indexOps []*v1.Operation
for _, snapshot := range snapshots {
zap.L().Debug("checking if snapshot has been indexed", zap.String("snapshot", snapshot.Id))
opid, err := orchestrator.oplog.HasIndexedSnapshot(snapshot.Id)
if err != nil {
return fmt.Errorf("HasIndexSnapshot for snapshot %q: %w", snapshot.Id, err)
}

if opid < 0 {
if opid >= 0 {
alreadyIndexed += 1
continue
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/restic/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ type Snapshot struct {
Username string `json:"username"`
Tags []string `json:"tags"`
Parent string `json:"parent"`
unixTimeMs int64 `json:"-"`
}

func (s *Snapshot) ToProto() *v1.ResticSnapshot {
t, err := time.Parse(time.RFC3339Nano, s.Time)
if err != nil {
t = time.Unix(0, 0)
}

return &v1.ResticSnapshot{
Id: s.Id,
UnixTimeMs: t.UnixMilli(),
UnixTimeMs: s.UnixTimeMs(),
Tree: s.Tree,
Paths: s.Paths,
Hostname: s.Hostname,
Expand All @@ -54,6 +52,18 @@ func (s *Snapshot) ToProto() *v1.ResticSnapshot {
}
}

func (s *Snapshot) UnixTimeMs() int64 {
if s.unixTimeMs != 0 {
return s.unixTimeMs
}
t, err := time.Parse(time.RFC3339Nano, s.Time)
if err != nil {
t = time.Unix(0, 0)
}
s.unixTimeMs = t.UnixMilli()
return s.unixTimeMs
}

type BackupProgressEntry struct {
// Common fields
MessageType string `json:"message_type"` // "summary" or "status"
Expand Down
Loading

0 comments on commit 65dcd74

Please sign in to comment.