From 65dcd7482f0fffb1b972d4921a6a2f15b98b80e5 Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Wed, 15 Nov 2023 23:30:23 -0800 Subject: [PATCH] feat: snapshot items are viewable in the UI and minor element ordering fixes --- DESIGN-NOTES.md | 18 ---- cmd/resticui/resticui.go | 6 +- internal/api/server.go | 1 - internal/config/jsonstore.go | 6 +- internal/database/indexutil/indexutil.go | 6 +- internal/database/oplog/oplog.go | 85 ++++++++++--------- .../serializationutil/serializationutil.go | 26 ++++-- internal/orchestrator/repo.go | 4 +- internal/orchestrator/tasks.go | 4 +- pkg/restic/outputs.go | 20 +++-- webui/src/components/OperationList.tsx | 26 ++---- webui/src/state/oplog.ts | 61 ++++++------- webui/src/views/PlanView.tsx | 10 +-- 13 files changed, 137 insertions(+), 136 deletions(-) delete mode 100644 DESIGN-NOTES.md diff --git a/DESIGN-NOTES.md b/DESIGN-NOTES.md deleted file mode 100644 index 568cec79..00000000 --- a/DESIGN-NOTES.md +++ /dev/null @@ -1,18 +0,0 @@ -# Datastructures - - - config - - user provided configuration, is potentially updated either on startup or by set config rpc - - configures - - repos - a list of restic repos to which data may be backed up - - plans - a list of backup plans which consist of - - directories - - schedule - - retention policy - - cache - - the cache is a local cache of the restic repo's properties e.g. output from listing snapshots, etc. This may be held in ram or on disk? TBD: decide. - - state - - state is tracked plan-by-plan and is persisted to disk - - stores recent operations done for a plan e.g. last backup, last prune, last check, etc. - - stores status and errors for each plan - - history is fixed size and is flushed to disk periodically (e.g. every 60 seconds). - - the state of a repo is the merge of the states of the plans that reference it. \ No newline at end of file diff --git a/cmd/resticui/resticui.go b/cmd/resticui/resticui.go index 3f3d79a9..e374fa3d 100644 --- a/cmd/resticui/resticui.go +++ b/cmd/resticui/resticui.go @@ -41,9 +41,11 @@ func main() { // Create and serve API server - oplog, err := oplog.NewOpLog(path.Join(dataPath(), "oplog.boltdb")) + oplogFile := path.Join(dataPath(), "oplog.boltdb") + oplog, err := oplog.NewOpLog(oplogFile) if err != nil { - zap.S().Fatalf("Error creating oplog: %v", err) + zap.S().Warnf("Operation log may be corrupted, if errors recur delete the file %q and restart. Your backups stored in your repos are safe.", oplogFile) + zap.S().Fatalf("Error creating oplog : %v", err) } defer oplog.Close() diff --git a/internal/api/server.go b/internal/api/server.go index 462c375d..e88091a3 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -135,7 +135,6 @@ func (s *Server) GetOperationEvents(_ *emptypb.Empty, stream v1.ResticUI_GetOper errorChan := make(chan error) defer close(errorChan) callback := func(eventType oplog.EventType, op *v1.Operation) { - zap.S().Debug("Sending an event") var eventTypeMapped v1.OperationEventType switch eventType { case oplog.EventTypeOpCreated: diff --git a/internal/config/jsonstore.go b/internal/config/jsonstore.go index aa51581b..01e55641 100644 --- a/internal/config/jsonstore.go +++ b/internal/config/jsonstore.go @@ -52,7 +52,11 @@ func (f *JsonFileStore) Update(config *v1.Config) error { return fmt.Errorf("invalid config: %w", err) } - data, err := protojson.Marshal(config) + data, err := protojson.MarshalOptions{ + Indent: " ", + Multiline: true, + EmitUnpopulated: true, + }.Marshal(config) if err != nil { return fmt.Errorf("failed to marshal config: %w", err) } diff --git a/internal/database/indexutil/indexutil.go b/internal/database/indexutil/indexutil.go index fffc1f59..4cfdd0da 100644 --- a/internal/database/indexutil/indexutil.go +++ b/internal/database/indexutil/indexutil.go @@ -39,7 +39,11 @@ func (i *IndexSearchIterator) Next() (int64, bool) { if i.k == nil || !bytes.HasPrefix(i.k, i.prefix) { return 0, false } - id := serializationutil.Btoi(i.k[len(i.prefix):]) + id, err := serializationutil.Btoi(i.k[len(i.prefix):]) + if err != nil { + // this sholud never happen, if it does it indicates database corruption. + return 0, false + } i.k, _ = i.c.Next() return id, true } diff --git a/internal/database/oplog/oplog.go b/internal/database/oplog/oplog.go index 26957f65..8bc3ff2a 100644 --- a/internal/database/oplog/oplog.go +++ b/internal/database/oplog/oplog.go @@ -16,20 +16,19 @@ import ( "google.golang.org/protobuf/proto" ) - type EventType int const ( - EventTypeUnknown = EventType(iota) + EventTypeUnknown = EventType(iota) EventTypeOpCreated = EventType(iota) EventTypeOpUpdated = EventType(iota) ) var ( - SystemBucket = []byte("oplog.system") // system stores metadata - OpLogBucket = []byte("oplog.log") // oplog stores the operations themselves - RepoIndexBucket = []byte("oplog.repo_idx") // repo_index tracks IDs of operations affecting a given repo - PlanIndexBucket = []byte("oplog.plan_idx") // plan_index tracks IDs of operations affecting a given plan + SystemBucket = []byte("oplog.system") // system stores metadata + OpLogBucket = []byte("oplog.log") // oplog stores the operations themselves + RepoIndexBucket = []byte("oplog.repo_idx") // repo_index tracks IDs of operations affecting a given repo + PlanIndexBucket = []byte("oplog.plan_idx") // plan_index tracks IDs of operations affecting a given plan IndexedSnapshotsSetBucket = []byte("oplog.indexed_snapshots") // indexed_snapshots is a set of snapshot IDs that have been indexed ) @@ -39,7 +38,7 @@ type OpLog struct { db *bolt.DB subscribersMu sync.RWMutex - subscribers []*func(EventType, *v1.Operation) + subscribers []*func(EventType, *v1.Operation) } func NewOpLog(databasePath string) (*OpLog, error) { @@ -52,47 +51,47 @@ func NewOpLog(databasePath string) (*OpLog, error) { return nil, fmt.Errorf("error opening database: %s", err) } - // Create the buckets if they don't exist if err := db.Update(func(tx *bolt.Tx) error { + // Create the buckets if they don't exist for _, bucket := range [][]byte{ SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket, IndexedSnapshotsSetBucket, } { if _, err := tx.CreateBucketIfNotExists(bucket); err != nil { return fmt.Errorf("creating bucket %s: %s", string(bucket), err) } + } - // Validate the operation log on startup. - sysBucket := tx.Bucket(SystemBucket) - opLogBucket := tx.Bucket(OpLogBucket) - c := opLogBucket.Cursor() - if lastValidated := sysBucket.Get([]byte("last_validated")); lastValidated != nil { - c.Seek(lastValidated) + // Validate the operation log on startup. + sysBucket := tx.Bucket(SystemBucket) + opLogBucket := tx.Bucket(OpLogBucket) + c := opLogBucket.Cursor() + if lastValidated := sysBucket.Get([]byte("last_validated")); lastValidated != nil { + c.Seek(lastValidated) + } + for k, v := c.First(); k != nil; k, v = c.Next() { + op := &v1.Operation{} + if err := proto.Unmarshal(v, op); err != nil { + zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err)) + continue } - for k, v := c.First(); k != nil; k, v = c.Next() { - op := &v1.Operation{} - if err := proto.Unmarshal(v, op); err != nil { - zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err)) - continue + if op.Status == v1.OperationStatus_STATUS_INPROGRESS { + op.Status = v1.OperationStatus_STATUS_ERROR + op.DisplayMessage = "Operation timeout." + bytes, err := proto.Marshal(op) + if err != nil { + return fmt.Errorf("marshalling operation: %w", err) } - if op.Status == v1.OperationStatus_STATUS_INPROGRESS { - op.Status = v1.OperationStatus_STATUS_ERROR - op.DisplayMessage = "Operation timeout." - bytes, err := proto.Marshal(op) - if err != nil { - return fmt.Errorf("marshalling operation: %w", err) - } - if err := opLogBucket.Put(k, bytes); err != nil { - return fmt.Errorf("putting operation into bucket: %w", err) - } + if err := opLogBucket.Put(k, bytes); err != nil { + return fmt.Errorf("putting operation into bucket: %w", err) } } - if lastValidated, _ := c.Last(); lastValidated != nil { - if err := sysBucket.Put([]byte("last_validated"), lastValidated); err != nil { - return fmt.Errorf("checkpointing last_validated key: %w", err) - } + } + if lastValidated, _ := c.Last(); lastValidated != nil { + if err := sysBucket.Put([]byte("last_validated"), lastValidated); err != nil { + return fmt.Errorf("checkpointing last_validated key: %w", err) } - } + return nil }); err != nil { return nil, err @@ -156,7 +155,6 @@ func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error { return fmt.Errorf("error marshalling operation: %w", err) } - if err := b.Put(serializationutil.Itob(op.Id), bytes); err != nil { return fmt.Errorf("error putting operation into bucket: %w", err) } @@ -196,12 +194,17 @@ func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error { func (o *OpLog) HasIndexedSnapshot(snapshotId string) (int64, error) { var id int64 if err := o.db.View(func(tx *bolt.Tx) error { + snapshotId := serializationutil.NormalizeSnapshotId(snapshotId) key := serializationutil.BytesToKey([]byte(snapshotId)) idBytes := tx.Bucket(IndexedSnapshotsSetBucket).Get(key) if idBytes == nil { id = -1 } else { - id = serializationutil.Btoi(idBytes) + var err error + id, err = serializationutil.Btoi(idBytes) + if err != nil { + return fmt.Errorf("database corrupt, couldn't convert ID bytes to int: %w", err) + } } return nil }); err != nil { @@ -266,7 +269,7 @@ func (o *OpLog) Get(id int64) (*v1.Operation, error) { if err := o.db.View(func(tx *bolt.Tx) error { var err error op, err = o.getHelper(tx.Bucket(OpLogBucket), id) - return err + return err }); err != nil { return nil, err } @@ -319,7 +322,7 @@ func (o *OpLog) GetByPlan(planId string, filter Filter) ([]*v1.Operation, error) func (o *OpLog) Subscribe(callback *func(EventType, *v1.Operation)) { o.subscribersMu.Lock() - defer o.subscribersMu.Unlock() + defer o.subscribersMu.Unlock() o.subscribers = append(o.subscribers, callback) } @@ -329,13 +332,13 @@ func (o *OpLog) Unsubscribe(callback *func(EventType, *v1.Operation)) { subs := o.subscribers for i, c := range subs { if c == callback { - subs[i] = subs[len(subs) - 1] - o.subscribers = subs[:len(o.subscribers) - 1] + subs[i] = subs[len(subs)-1] + o.subscribers = subs[:len(o.subscribers)-1] } } } -type Filter func([]int64)[]int64 +type Filter func([]int64) []int64 func FilterKeepAll() Filter { return func(ids []int64) []int64 { diff --git a/internal/database/serializationutil/serializationutil.go b/internal/database/serializationutil/serializationutil.go index d7a4bf97..ef104ee1 100644 --- a/internal/database/serializationutil/serializationutil.go +++ b/internal/database/serializationutil/serializationutil.go @@ -1,6 +1,11 @@ package serializationutil -import "encoding/binary" +import ( + "encoding/binary" + "errors" +) + +var ErrInvalidLength = errors.New("invalid length") func Itob(v int64) []byte { b := make([]byte, 8) @@ -8,8 +13,11 @@ func Itob(v int64) []byte { return b } -func Btoi(b []byte) int64 { - return int64(binary.BigEndian.Uint64(b)) +func Btoi(b []byte) (int64, error) { + if len(b) != 8 { + return 0, ErrInvalidLength + } + return int64(binary.BigEndian.Uint64(b)), nil } func Stob(v string) []byte { @@ -19,9 +27,15 @@ func Stob(v string) []byte { return b } -func Btos(b []byte) (string, int64) { - length := Btoi(b[:8]) - return string(b[8:8+length]), 8+length +func Btos(b []byte) (string, int64, error) { + if len(b) < 8 { + return "", 0, ErrInvalidLength + } + length, _ := Btoi(b[:8]) + if int64(len(b)) < 8+length { + return "", 0, ErrInvalidLength + } + return string(b[8:8+length]), 8+length, nil } func BytesToKey(b []byte) []byte { diff --git a/internal/orchestrator/repo.go b/internal/orchestrator/repo.go index a058b40a..6c7a385d 100644 --- a/internal/orchestrator/repo.go +++ b/internal/orchestrator/repo.go @@ -66,11 +66,11 @@ func (r *RepoOrchestrator) updateSnapshotsIfNeeded(ctx context.Context, force bo } sort.SliceStable(snapshots, func(i, j int) bool { - return snapshots[i].Time < snapshots[j].Time + return snapshots[i].UnixTimeMs() < snapshots[j].UnixTimeMs() }) r.snapshots = snapshots - zap.L().Debug("Updated snapshots", zap.String("repo", r.repoConfig.Id), zap.Duration("duration", time.Since(startTime))) + zap.L().Debug("updated snapshots", zap.String("repo", r.repoConfig.Id), zap.Duration("duration", time.Since(startTime))) return nil } diff --git a/internal/orchestrator/tasks.go b/internal/orchestrator/tasks.go index 0c2dee22..2adeb51b 100644 --- a/internal/orchestrator/tasks.go +++ b/internal/orchestrator/tasks.go @@ -160,12 +160,14 @@ func indexSnapshotsHelper(ctx context.Context, orchestrator *Orchestrator, plan opTime := curTimeMillis() var indexOps []*v1.Operation for _, snapshot := range snapshots { + zap.L().Debug("checking if snapshot has been indexed", zap.String("snapshot", snapshot.Id)) opid, err := orchestrator.oplog.HasIndexedSnapshot(snapshot.Id) if err != nil { return fmt.Errorf("HasIndexSnapshot for snapshot %q: %w", snapshot.Id, err) } - if opid < 0 { + if opid >= 0 { + alreadyIndexed += 1 continue } diff --git a/pkg/restic/outputs.go b/pkg/restic/outputs.go index 245e7b4c..c2df020b 100644 --- a/pkg/restic/outputs.go +++ b/pkg/restic/outputs.go @@ -35,16 +35,14 @@ type Snapshot struct { Username string `json:"username"` Tags []string `json:"tags"` Parent string `json:"parent"` + unixTimeMs int64 `json:"-"` } func (s *Snapshot) ToProto() *v1.ResticSnapshot { - t, err := time.Parse(time.RFC3339Nano, s.Time) - if err != nil { - t = time.Unix(0, 0) - } + return &v1.ResticSnapshot{ Id: s.Id, - UnixTimeMs: t.UnixMilli(), + UnixTimeMs: s.UnixTimeMs(), Tree: s.Tree, Paths: s.Paths, Hostname: s.Hostname, @@ -54,6 +52,18 @@ func (s *Snapshot) ToProto() *v1.ResticSnapshot { } } +func (s *Snapshot) UnixTimeMs() int64 { + if s.unixTimeMs != 0 { + return s.unixTimeMs + } + t, err := time.Parse(time.RFC3339Nano, s.Time) + if err != nil { + t = time.Unix(0, 0) + } + s.unixTimeMs = t.UnixMilli() + return s.unixTimeMs +} + type BackupProgressEntry struct { // Common fields MessageType string `json:"message_type"` // "summary" or "status" diff --git a/webui/src/components/OperationList.tsx b/webui/src/components/OperationList.tsx index 9c175754..c7e0f89f 100644 --- a/webui/src/components/OperationList.tsx +++ b/webui/src/components/OperationList.tsx @@ -3,28 +3,18 @@ import { Operation, OperationStatus } from "../../gen/ts/v1/operations.pb"; import { Col, Collapse, Empty, List, Progress, Row, Typography } from "antd"; import { AlertOutlined, DatabaseOutlined } from "@ant-design/icons"; import { BackupProgressEntry } from "../../gen/ts/v1/restic.pb"; +import { EOperation } from "../state/oplog"; export const OperationList = ({ operations, -}: React.PropsWithoutRef<{ operations: Operation[] }>) => { - interface OpWrapper { - startTimeMs: number; - operation: Operation; - } - const ops = operations.map((operation) => { - return { - time: parseInt(operation.unixTimeStartMs!), - operation, - }; - }); - - ops.sort((a, b) => b.time - a.time); +}: React.PropsWithoutRef<{ operations: EOperation[] }>) => { + operations.sort((a, b) => b.parsedTime - a.parsedTime); - const elems = ops.map(({ operation }) => ( + const elems = operations.map((operation) => ( )); - if (ops.length === 0) { + if (operations.length === 0) { return ( ( - + )} /> ); @@ -47,7 +37,7 @@ export const OperationList = ({ export const OperationRow = ({ operation, -}: React.PropsWithoutRef<{ operation: Operation }>) => { +}: React.PropsWithoutRef<{ operation: EOperation }>) => { let contents: React.ReactNode; let color = "grey"; diff --git a/webui/src/state/oplog.ts b/webui/src/state/oplog.ts index 116168a0..be811c49 100644 --- a/webui/src/state/oplog.ts +++ b/webui/src/state/oplog.ts @@ -9,6 +9,11 @@ import { GetOperationsRequest, ResticUI } from "../../gen/ts/v1/service.pb"; import { EventEmitter } from "events"; import { useAlertApi } from "../components/Alerts"; +export type EOperation = Operation & { + parsedId: number; + parsedTime: number; +}; + const subscribers: ((event: OperationEvent) => void)[] = []; // Start fetching and emitting operations. @@ -71,24 +76,28 @@ export const unsubscribeFromOperations = ( export const buildOperationListListener = ( req: GetOperationsRequest, - callback: (event: OperationEvent | null, list: Operation[]) => void + callback: ( + event: OperationEventType | null, + operation: EOperation | null, + list: EOperation[] + ) => void ) => { - let operations: Operation[] = []; + let operations: EOperation[] = []; (async () => { - const opsFromServer = await getOperations(req); + let opsFromServer = (await getOperations(req)).map(toEop); operations = opsFromServer.filter( - (o) => !operations.find((op) => op.id === o.id) + (o) => !operations.find((op) => op.parsedId === o.parsedId) ); operations.sort((a, b) => { - return parseInt(a.id!) - parseInt(b.id!); + return a.parsedId - b.parsedId; }); - callback(null, operations); + callback(null, null, operations); })(); return (event: OperationEvent) => { - const op = event.operation!; + const op = toEop(event.operation!); const type = event.type!; if (!!req.planId && op.planId !== req.planId) { return; @@ -110,35 +119,17 @@ export const buildOperationListListener = ( operations.push(op); } - callback(event, operations); + callback(event.type || null, op, operations); }; }; -// OperationsStateTracker tracks the state of operations starting with an initial query -export class OperationListSubscriber { - private listener: ((event: OperationEvent) => void) | null = null; - private operations: Operation[] = []; - private eventEmitter = new EventEmitter(); - constructor(private req: GetOperationsRequest) { - this.listener = (event: OperationEvent) => { - this.eventEmitter.emit("changed"); - }; - subscribeToOperations(this.listener); - getOperations(req).then((ops) => { - this.operations = ops; - this.eventEmitter.emit("changed"); - }); - } - - getOperations() { - return this.operations; - } +const toEop = (op: Operation): EOperation => { + const time = + op.operationIndexSnapshot?.snapshot?.unixTimeMs || op.unixTimeStartMs; - onChange(callback: () => void) { - this.eventEmitter.on("changed", callback); - } - - destroy() { - unsubscribeFromOperations(this.listener!); - } -} \ No newline at end of file + return { + ...op, + parsedId: parseInt(op.id!), + parsedTime: parseInt(time!), + }; +}; diff --git a/webui/src/views/PlanView.tsx b/webui/src/views/PlanView.tsx index bb5210c1..bbf82f51 100644 --- a/webui/src/views/PlanView.tsx +++ b/webui/src/views/PlanView.tsx @@ -8,8 +8,8 @@ import { useRecoilValue } from "recoil"; import { configState } from "../state/config"; import { useAlertApi } from "../components/Alerts"; import { ResticUI } from "../../gen/ts/v1/service.pb"; -import { Operation } from "../../gen/ts/v1/operations.pb"; import { + EOperation, buildOperationListListener, subscribeToOperations, unsubscribeFromOperations, @@ -19,12 +19,12 @@ import { OperationList } from "../components/OperationList"; export const PlanView = ({ plan }: React.PropsWithChildren<{ plan: Plan }>) => { const showModal = useShowModal(); const alertsApi = useAlertApi()!; - const [operations, setOperations] = useState([]); + const [operations, setOperations] = useState([]); useEffect(() => { const listener = buildOperationListListener( - { planId: plan.id, lastN: "100" }, - (event, operations) => { + { planId: plan.id, lastN: "1000" }, + (event, changedOp, operations) => { setOperations([...operations]); } ); @@ -78,7 +78,7 @@ export const PlanView = ({ plan }: React.PropsWithChildren<{ plan: Plan }>) => { Prune Now -

Operations List

+

Backup Action History ({operations.length} loaded)

);