Skip to content

Commit

Permalink
fix: remove /checkpoint in favor of /sync
Browse files Browse the repository at this point in the history
  • Loading branch information
kirusfg committed Jul 27, 2024
1 parent d07c0ea commit 0dde438
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 156 deletions.
2 changes: 1 addition & 1 deletion .bruno/Snapshot.bru
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ body:json {
{
"database-path": "/home/kirusfg/Code/litestream/test.db",
"replica-name": "s3",
"retain": true
"cleanup": true
}
}
8 changes: 4 additions & 4 deletions .bruno/Checkpoint.bru → .bruno/Sync.bru
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
meta {
name: Checkpoint
name: Sync
type: http
seq: 2
}

post {
url: http://localhost:9091/checkpoint
url: http://localhost:9091/sync
body: json
auth: none
}
Expand All @@ -14,7 +14,7 @@ body:json {
{
"database-path": "/home/kirusfg/Code/litestream/test.db",
"replica-name": "s3",
"mode": "PASSIVE",
"sync": true
"checkpoint": true,
"checkpoint-mode": "PASSIVE"
}
}
138 changes: 0 additions & 138 deletions cmd/litestream/checkpoint.go

This file was deleted.

4 changes: 2 additions & 2 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ type HTTPConfig struct {
// Whether to handle configuration update signals via HTTP or not.
ConfigUpdates bool `yaml:"config-updates"`

// Whether to handle checkpoint signals via HTTP or not. This disables timeout-based checkpoints.
Checkpoint bool `yaml:"checkpoint"`
// Whether to handle sync signals via HTTP or not. This disables timeout-based checkpoints.
Sync bool `yaml:"sync"`

// Whether to handle snapshot signals via HTTP or not. This disables timeout-based snapshots.
Snapshot bool `yaml:"snapshot"`
Expand Down
6 changes: 3 additions & 3 deletions cmd/litestream/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
http.Handle("/config", NewConfigHandler(c))
start = true
}
if c.Config.HTTP.Checkpoint {
slog.Info("watching for checkpoint signals on", "url", fmt.Sprintf("http://%s/checkpoint", hostport))
http.Handle("/checkpoint", NewCheckpointHandler(ctx, c))
if c.Config.HTTP.Sync {
slog.Info("watching for sync signals on", "url", fmt.Sprintf("http://%s/sync", hostport))
http.Handle("/sync", NewSyncHandler(ctx, c))
start = true
}
if c.Config.HTTP.Snapshot {
Expand Down
16 changes: 8 additions & 8 deletions cmd/litestream/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type SnapshotRequest struct {
DatabasePath string `json:"database-path"`
ReplicaName string `json:"replica-name"`
Retain bool `json:"retain"`
Cleanup bool `json:"cleanup"`
}

type SnapshotResponse struct {
Expand Down Expand Up @@ -60,7 +60,7 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req SnapshotRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
w.WriteHeader(400)
res := CheckpointResponse{Status: "error", Error: "invalid request"}
res := SnapshotResponse{Status: "error", Error: "invalid request"}
json.NewEncoder(w).Encode(res)
return
}
Expand All @@ -77,7 +77,7 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if db == nil {
h.Logger.Info(fmt.Sprintf("database %s not found", req.DatabasePath))
w.WriteHeader(404)
res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("database %s not found", req.DatabasePath)}
res := SnapshotResponse{Status: "error", Error: fmt.Sprintf("database %s not found", req.DatabasePath)}
json.NewEncoder(w).Encode(res)
return
}
Expand All @@ -94,7 +94,7 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if rep == nil {
h.Logger.Info(fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath))
w.WriteHeader(404)
res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)}
res := SnapshotResponse{Status: "error", Error: fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)}
json.NewEncoder(w).Encode(res)
return
}
Expand All @@ -104,23 +104,23 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if _, err := rep.Snapshot(h.ctx); err != nil {
h.Logger.Info(fmt.Sprintf("error creating snapshot of replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err))
w.WriteHeader(500)
res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("error issuing snapshot on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)}
res := SnapshotResponse{Status: "error", Error: fmt.Sprintf("error issuing snapshot on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)}
json.NewEncoder(w).Encode(res)
return
}

if req.Retain {
if req.Cleanup {
h.Logger.Info(fmt.Sprintf("retaining snapshots of replica %s for database %s", req.ReplicaName, req.DatabasePath))
if err := rep.EnforceRetention(h.ctx); err != nil {
h.Logger.Info(fmt.Sprintf("error retaining snapshots of replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err))
w.WriteHeader(500)
res := CheckpointResponse{Status: "error", Error: fmt.Sprintf("error issuing retain on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)}
res := SnapshotResponse{Status: "error", Error: fmt.Sprintf("error issuing retain on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)}
json.NewEncoder(w).Encode(res)
return
}
}

w.WriteHeader(200)
res := CheckpointResponse{Status: "ok"}
res := SnapshotResponse{Status: "ok"}
json.NewEncoder(w).Encode(res)
}
138 changes: 138 additions & 0 deletions cmd/litestream/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"

"github.com/benbjohnson/litestream"
)

type SyncRequest struct {
DatabasePath string `json:"database-path"`
ReplicaName string `json:"replica-name"`
Checkpoint bool `json:"checkpoint"`
CheckpointMode string `json:"checkpoint-mode"`
}

type SyncResponse struct {
Status string `json:"status"`
Error string `json:"error"`
}

type SyncHandler struct {
// Context to execute checkpoints in
ctx context.Context

// The command running the replication process
c *ReplicateCommand

// Where to send log messages, defaults to log.Default()
Logger *slog.Logger
}

func NewSyncHandler(ctx context.Context, c *ReplicateCommand) *SyncHandler {
return &SyncHandler{
ctx: ctx,
c: c,
Logger: slog.Default(),
}
}

func (h *SyncHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.c == nil {
w.WriteHeader(500)
res := SyncResponse{Status: "error", Error: "sync handler has not been initialized properly (ReplicateCommand is nil)"}
json.NewEncoder(w).Encode(res)
return
}

// Check if the request is a POST
if r.Method != "POST" {
w.WriteHeader(405)
res := SyncResponse{Status: "error", Error: "method not allowed"}
json.NewEncoder(w).Encode(res)
return
}

// Parse request
var req SyncRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
w.WriteHeader(400)
res := SyncResponse{Status: "error", Error: "invalid request"}
json.NewEncoder(w).Encode(res)
return
}

// Check if the requested database is being replicated
var db *litestream.DB
for _, cdb := range h.c.DBs {
if cdb.Path() == req.DatabasePath {
db = cdb
break
}
}

if db == nil {
h.Logger.Info(fmt.Sprintf("database %s not found", req.DatabasePath))
w.WriteHeader(404)
res := SyncResponse{Status: "error", Error: fmt.Sprintf("database %s not found", req.DatabasePath)}
json.NewEncoder(w).Encode(res)
return
}

// Check if the requested database replica exists
var rep *litestream.Replica
for _, crep := range db.Replicas {
if crep.Name() == req.ReplicaName {
rep = crep
break
}
}

if rep == nil {
h.Logger.Info(fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath))
w.WriteHeader(404)
res := SyncResponse{Status: "error", Error: fmt.Sprintf("replica %s for database %s not found", req.ReplicaName, req.DatabasePath)}
json.NewEncoder(w).Encode(res)
return
}

// Check the requested checkpoint mode
mode := litestream.CheckpointModePassive
if req.CheckpointMode == "FULL" {
mode = litestream.CheckpointModeFull
} else if req.CheckpointMode == "RESTART" {
mode = litestream.CheckpointModeRestart
} else if req.CheckpointMode == "TRUNCATE" {
mode = litestream.CheckpointModeTruncate
}

// Issue sync
h.Logger.Info(fmt.Sprintf("issuing sync on replica %s for database %s", req.ReplicaName, req.DatabasePath))
if err := rep.Sync(h.ctx); err != nil {
h.Logger.Info(fmt.Sprintf("error issuing sync on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err))
w.WriteHeader(500)
res := SyncResponse{Status: "error", Error: fmt.Sprintf("error issuing sync on replica %s for database %s: %s", req.ReplicaName, req.DatabasePath, err)}
json.NewEncoder(w).Encode(res)
return
}

// Issue checkpoint
if req.Checkpoint {
h.Logger.Info(fmt.Sprintf("issuing checkpoint on database %s", req.DatabasePath))
if err := db.Checkpoint(h.ctx, mode); err != nil {
h.Logger.Info(fmt.Sprintf("error issuing checkpoint on database %s: %s", req.DatabasePath, err))
w.WriteHeader(500)
res := SyncResponse{Status: "error", Error: fmt.Sprintf("error issuing checkpoint on database %s: %s", req.DatabasePath, err)}
json.NewEncoder(w).Encode(res)
return
}
}

w.WriteHeader(200)
res := SyncResponse{Status: "ok"}
json.NewEncoder(w).Encode(res)
}

0 comments on commit 0dde438

Please sign in to comment.