diff --git a/.bruno/Snapshot.bru b/.bruno/Snapshot.bru index ce1e13a8..a774fb00 100644 --- a/.bruno/Snapshot.bru +++ b/.bruno/Snapshot.bru @@ -14,6 +14,6 @@ body:json { { "database-path": "/home/kirusfg/Code/litestream/test.db", "replica-name": "s3", - "retain": true + "cleanup": true } } diff --git a/.bruno/Checkpoint.bru b/.bruno/Sync.bru similarity index 63% rename from .bruno/Checkpoint.bru rename to .bruno/Sync.bru index 55a7227b..b1bfdd5f 100644 --- a/.bruno/Checkpoint.bru +++ b/.bruno/Sync.bru @@ -1,11 +1,11 @@ meta { - name: Checkpoint + name: Sync type: http seq: 2 } post { - url: http://localhost:9091/checkpoint + url: http://localhost:9091/sync body: json auth: none } @@ -14,7 +14,7 @@ body:json { { "database-path": "/home/kirusfg/Code/litestream/test.db", "replica-name": "s3", - "mode": "PASSIVE", - "sync": true + "checkpoint": true, + "checkpoint-mode": "PASSIVE" } } diff --git a/cmd/litestream/checkpoint.go b/cmd/litestream/checkpoint.go deleted file mode 100644 index ad886ec3..00000000 --- a/cmd/litestream/checkpoint.go +++ /dev/null @@ -1,138 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "net/http" - - "github.com/benbjohnson/litestream" -) - -type CheckpointRequest struct { - DatabasePath string `json:"database-path"` - ReplicaName string `json:"replica-name"` - Mode string `json:"mode"` - Sync bool `json:"sync"` -} - -type CheckpointResponse struct { - Status string `json:"status"` - Error string `json:"error"` -} - -type CheckpointHandler struct { - // Context to execute checkpoints in - ctx context.Context - - // The command running the replication process - c *ReplicateCommand - - // Where to send log messages, defaults to log.Default() - Logger *slog.Logger -} - -func NewCheckpointHandler(ctx context.Context, c *ReplicateCommand) *CheckpointHandler { - return &CheckpointHandler{ - ctx: ctx, - c: c, - Logger: slog.Default(), - } -} - -func (h *CheckpointHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if h.c == nil { - w.WriteHeader(500) - res := CheckpointResponse{Status: "error", Error: "checkpoint handler has not been initialized properly (ReplicateCommand is nil)"} - json.NewEncoder(w).Encode(res) - return - } - - // Check if the request is a POST - if r.Method != "POST" { - w.WriteHeader(405) - res := CheckpointResponse{Status: "error", Error: "method not allowed"} - json.NewEncoder(w).Encode(res) - return - } - - // Parse request - var req CheckpointRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - w.WriteHeader(400) - res := CheckpointResponse{Status: "error", Error: "invalid request"} - json.NewEncoder(w).Encode(res) - return - } - - // Check if the requested database is being replicated - var db *litestream.DB - for _, cdb := range h.c.DBs { - if cdb.Path() == req.DatabasePath { - db = cdb - break - } - } - - if db == nil { - h.Logger.Info(fmt.Sprintf("database %s not found", req.DatabasePath)) - w.WriteHeader(404) - res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("database %s not found", req.DatabasePath)} - json.NewEncoder(w).Encode(res) - return - } - - // Check if the requested database replica exists - var rep *litestream.Replica - for _, crep := range db.Replicas { - if crep.Name() == req.ReplicaName { - rep = crep - break - } - } - - if rep == nil { - h.Logger.Info(fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)) - w.WriteHeader(404) - res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)} - json.NewEncoder(w).Encode(res) - return - } - - // Check if the requested mode is valid - mode := litestream.CheckpointModePassive - if req.Mode == "FULL" { - mode = litestream.CheckpointModeFull - } else if req.Mode == "RESTART" { - mode = litestream.CheckpointModeRestart - } else if req.Mode == "TRUNCATE" { - mode = litestream.CheckpointModeTruncate - } - - // Issue checkpoint - h.Logger.Info(fmt.Sprintf("issuing checkpoint on database %s", req.DatabasePath)) - if err := db.Checkpoint(h.ctx, mode); err != nil { - h.Logger.Info(fmt.Sprintf("error issuing checkpoint on database %s: %s", req.DatabasePath, err)) - w.WriteHeader(500) - res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("error issuing checkpoint on database %s: %s", req.DatabasePath, err)} - json.NewEncoder(w).Encode(res) - return - } - - // Issue sync - if req.Sync { - h.Logger.Info(fmt.Sprintf("issuing sync on replica %s for database %s", req.ReplicaName, req.DatabasePath)) - if err := rep.Sync(h.ctx); err != nil { - h.Logger.Info(fmt.Sprintf("error issuing sync on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)) - w.WriteHeader(500) - res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("error issuing sync on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)} - json.NewEncoder(w).Encode(res) - return - } - } - - w.WriteHeader(200) - res := CheckpointResponse{Status: "ok"} - json.NewEncoder(w).Encode(res) -} diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index dac8b7f1..8ee02e92 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -184,8 +184,8 @@ type HTTPConfig struct { // Whether to handle configuration update signals via HTTP or not. ConfigUpdates bool `yaml:"config-updates"` - // Whether to handle checkpoint signals via HTTP or not. This disables timeout-based checkpoints. - Checkpoint bool `yaml:"checkpoint"` + // Whether to handle sync signals via HTTP or not. This disables timeout-based checkpoints. + Sync bool `yaml:"sync"` // Whether to handle snapshot signals via HTTP or not. This disables timeout-based snapshots. Snapshot bool `yaml:"snapshot"` diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index ef97f520..87507787 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -149,9 +149,9 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { http.Handle("/config", NewConfigHandler(c)) start = true } - if c.Config.HTTP.Checkpoint { - slog.Info("watching for checkpoint signals on", "url", fmt.Sprintf("http://%s/checkpoint", hostport)) - http.Handle("/checkpoint", NewCheckpointHandler(ctx, c)) + if c.Config.HTTP.Sync { + slog.Info("watching for sync signals on", "url", fmt.Sprintf("http://%s/sync", hostport)) + http.Handle("/sync", NewSyncHandler(ctx, c)) start = true } if c.Config.HTTP.Snapshot { diff --git a/cmd/litestream/snapshot.go b/cmd/litestream/snapshot.go index a79d38dd..2b22e34e 100644 --- a/cmd/litestream/snapshot.go +++ b/cmd/litestream/snapshot.go @@ -13,7 +13,7 @@ import ( type SnapshotRequest struct { DatabasePath string `json:"database-path"` ReplicaName string `json:"replica-name"` - Retain bool `json:"retain"` + Cleanup bool `json:"cleanup"` } type SnapshotResponse struct { @@ -60,7 +60,7 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var req SnapshotRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { w.WriteHeader(400) - res := CheckpointResponse{Status: "error", Error: "invalid request"} + res := SnapshotResponse{Status: "error", Error: "invalid request"} json.NewEncoder(w).Encode(res) return } @@ -77,7 +77,7 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if db == nil { h.Logger.Info(fmt.Sprintf("database %s not found", req.DatabasePath)) w.WriteHeader(404) - res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("database %s not found", req.DatabasePath)} + res := SnapshotResponse{Status: "error", Error: fmt.Sprintf("database %s not found", req.DatabasePath)} json.NewEncoder(w).Encode(res) return } @@ -94,7 +94,7 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if rep == nil { h.Logger.Info(fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)) w.WriteHeader(404) - res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)} + res := SnapshotResponse{Status: "error", Error: fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)} json.NewEncoder(w).Encode(res) return } @@ -104,23 +104,23 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if _, err := rep.Snapshot(h.ctx); err != nil { h.Logger.Info(fmt.Sprintf("error creating snapshot of replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)) w.WriteHeader(500) - res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("error issuing snapshot on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)} + res := SnapshotResponse{Status: "error", Error: fmt.Sprintf("error issuing snapshot on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)} json.NewEncoder(w).Encode(res) return } - if req.Retain { + if req.Cleanup { h.Logger.Info(fmt.Sprintf("retaining snapshots of replica %s for database %s", req.ReplicaName, req.DatabasePath)) if err := rep.EnforceRetention(h.ctx); err != nil { h.Logger.Info(fmt.Sprintf("error retaining snapshots of replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)) w.WriteHeader(500) - res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("error issuing retain on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)} + res := SnapshotResponse{Status: "error", Error: fmt.Sprintf("error issuing retain on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)} json.NewEncoder(w).Encode(res) return } } w.WriteHeader(200) - res := CheckpointResponse{Status: "ok"} + res := SnapshotResponse{Status: "ok"} json.NewEncoder(w).Encode(res) } diff --git a/cmd/litestream/sync.go b/cmd/litestream/sync.go new file mode 100644 index 00000000..ff56874b --- /dev/null +++ b/cmd/litestream/sync.go @@ -0,0 +1,138 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + + "github.com/benbjohnson/litestream" +) + +type SyncRequest struct { + DatabasePath string `json:"database-path"` + ReplicaName string `json:"replica-name"` + Checkpoint bool `json:"checkpoint"` + CheckpointMode string `json:"checkpoint-mode"` +} + +type SyncResponse struct { + Status string `json:"status"` + Error string `json:"error"` +} + +type SyncHandler struct { + // Context to execute checkpoints in + ctx context.Context + + // The command running the replication process + c *ReplicateCommand + + // Where to send log messages, defaults to log.Default() + Logger *slog.Logger +} + +func NewSyncHandler(ctx context.Context, c *ReplicateCommand) *SyncHandler { + return &SyncHandler{ + ctx: ctx, + c: c, + Logger: slog.Default(), + } +} + +func (h *SyncHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if h.c == nil { + w.WriteHeader(500) + res := SyncResponse{Status: "error", Error: "sync handler has not been initialized properly (ReplicateCommand is nil)"} + json.NewEncoder(w).Encode(res) + return + } + + // Check if the request is a POST + if r.Method != "POST" { + w.WriteHeader(405) + res := SyncResponse{Status: "error", Error: "method not allowed"} + json.NewEncoder(w).Encode(res) + return + } + + // Parse request + var req SyncRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + w.WriteHeader(400) + res := SyncResponse{Status: "error", Error: "invalid request"} + json.NewEncoder(w).Encode(res) + return + } + + // Check if the requested database is being replicated + var db *litestream.DB + for _, cdb := range h.c.DBs { + if cdb.Path() == req.DatabasePath { + db = cdb + break + } + } + + if db == nil { + h.Logger.Info(fmt.Sprintf("database %s not found", req.DatabasePath)) + w.WriteHeader(404) + res := SyncResponse{Status: "error", Error: fmt.Sprintf("database %s not found", req.DatabasePath)} + json.NewEncoder(w).Encode(res) + return + } + + // Check if the requested database replica exists + var rep *litestream.Replica + for _, crep := range db.Replicas { + if crep.Name() == req.ReplicaName { + rep = crep + break + } + } + + if rep == nil { + h.Logger.Info(fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)) + w.WriteHeader(404) + res := SyncResponse{Status: "error", Error: fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)} + json.NewEncoder(w).Encode(res) + return + } + + // Check the requested checkpoint mode + mode := litestream.CheckpointModePassive + if req.CheckpointMode == "FULL" { + mode = litestream.CheckpointModeFull + } else if req.CheckpointMode == "RESTART" { + mode = litestream.CheckpointModeRestart + } else if req.CheckpointMode == "TRUNCATE" { + mode = litestream.CheckpointModeTruncate + } + + // Issue sync + h.Logger.Info(fmt.Sprintf("issuing sync on replica %s for database %s", req.ReplicaName, req.DatabasePath)) + if err := rep.Sync(h.ctx); err != nil { + h.Logger.Info(fmt.Sprintf("error issuing sync on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)) + w.WriteHeader(500) + res := SyncResponse{Status: "error", Error: fmt.Sprintf("error issuing sync on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)} + json.NewEncoder(w).Encode(res) + return + } + + // Issue checkpoint + if req.Checkpoint { + h.Logger.Info(fmt.Sprintf("issuing checkpoint on database %s", req.DatabasePath)) + if err := db.Checkpoint(h.ctx, mode); err != nil { + h.Logger.Info(fmt.Sprintf("error issuing checkpoint on database %s: %s", req.DatabasePath, err)) + w.WriteHeader(500) + res := SyncResponse{Status: "error", Error: fmt.Sprintf("error issuing checkpoint on database %s: %s", req.DatabasePath, err)} + json.NewEncoder(w).Encode(res) + return + } + } + + w.WriteHeader(200) + res := SyncResponse{Status: "ok"} + json.NewEncoder(w).Encode(res) +}