Skip to content

Commit

Permalink
feat: improve log formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Jun 25, 2024
1 parent 9067027 commit 232cd57
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 55 deletions.
2 changes: 1 addition & 1 deletion internal/orchestrator/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Logger(ctx context.Context) *zap.Logger {
}
p := zap.NewProductionEncoderConfig()
p.EncodeTime = zapcore.ISO8601TimeEncoder
fe := zapcore.NewJSONEncoder(p)
fe := zapcore.NewConsoleEncoder(p)
l := zap.New(zapcore.NewTee(
zap.L().Core(),
zapcore.NewCore(fe, zapcore.AddSync(writer), zapcore.DebugLevel),
Expand Down
5 changes: 5 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ func (o *Orchestrator) Run(ctx context.Context) {
}

err := t.Task.Run(taskCtx, t.ScheduledTask, runner)
if err != nil {
fmt.Fprintf(logs, "\ntask %q returned error: %v\n", t.Task.Name(), err)
} else {
fmt.Fprintf(logs, "\ntask %q completed successfully\n", t.Task.Name())
}

if op != nil {
// write logs to log storage for this task.
Expand Down
78 changes: 75 additions & 3 deletions internal/orchestrator/repo/logging.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package repo

import (
"bytes"
"context"
"fmt"
"io"

"github.com/garethgeorge/backrest/internal/ioutil"
"github.com/garethgeorge/backrest/internal/orchestrator/logging"
"github.com/garethgeorge/backrest/pkg/restic"
)
Expand All @@ -15,8 +17,78 @@ func forwardResticLogs(ctx context.Context) (context.Context, func()) {
if writer == nil {
return ctx, func() {}
}
capture := ioutil.NewOutputCapturer(64 * 1024)
limit := &limitWriter{w: writer, n: 64 * 1024}
capture := &linePrefixer{w: limit, prefix: []byte("[restic] ")}
return restic.ContextWithLogger(ctx, capture), func() {
_, _ = writer.Write(capture.Bytes())
if limit.d > 0 {
fmt.Fprintf(writer, "Output truncated, %d bytes dropped\n", limit.d)
}
capture.Close()
}
}

// limitWriter is a writer that limits the number of bytes written to it.
type limitWriter struct {
w io.Writer
n int
d int
}

func (l *limitWriter) Write(p []byte) (rnw int, err error) {
rnw = len(p)
if l.n <= 0 {
l.d += len(p)
return 0, nil
}
if len(p) > l.n {
l.d += len(p) - l.n
p = p[:l.n]
}
_, err = l.w.Write(p)
l.n -= len(p)
return
}

type linePrefixer struct {
w io.Writer
buf []byte
prefix []byte
}

func (l *linePrefixer) Write(p []byte) (n int, err error) {
n = len(p)
l.buf = append(l.buf, p...)

bufOrig := l.buf
wroteLines := false
for {
i := bytes.IndexByte(l.buf, '\n')
if i < 0 {
break
}
wroteLines = true
if _, err := l.w.Write(l.prefix); err != nil {
return 0, err
}
if _, err := l.w.Write(l.buf[:i+1]); err != nil {
return 0, err
}
l.buf = l.buf[i+1:]
}
if wroteLines {
l.buf = append(bufOrig[:0], l.buf...)
}
return
}

func (l *linePrefixer) Close() error {
if len(l.buf) > 0 {
if _, err := l.w.Write(l.prefix); err != nil {
return err
}
if _, err := l.w.Write(l.buf); err != nil {
return err
}
}
return nil
}
11 changes: 6 additions & 5 deletions pkg/restic/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ func (e *CmdError) Is(target error) bool {

// newCmdError creates a new error indicating that running a command failed.
func newCmdError(ctx context.Context, cmd *exec.Cmd, err error) *CmdError {
cerr := &CmdError{
Command: cmd.String(),
Err: err,
shortCmd := cmd.String()
if len(shortCmd) > 100 {
shortCmd = shortCmd[:100] + "..."
}

if logger := LoggerFromContext(ctx); logger != nil {
logger.Write([]byte(cerr.Error()))
cerr := &CmdError{
Command: shortCmd,
Err: err,
}
return cerr
}
Expand Down
89 changes: 43 additions & 46 deletions pkg/restic/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ func (r *Repo) commandWithContext(ctx context.Context, args []string, opts ...Ge
sw := &ioutil.SynchronizedWriter{W: logger}
cmd.Stderr = sw
cmd.Stdout = sw
}

if logger := LoggerFromContext(ctx); logger != nil {
fmt.Fprintf(logger, "\ncommand: %v %v\n", fullCmd[0], strings.Join(fullCmd[1:], " "))
}

Expand Down Expand Up @@ -185,7 +182,49 @@ func (r *Repo) Backup(ctx context.Context, paths []string, progressCallback func
}
}
}
return summary, newCmdError(ctx, cmd, newErrorWithOutput(errors.Join(cmdErr, readErr), outputForErr.String()))
return summary, newCmdError(ctx, cmd, errors.Join(cmdErr, readErr))
}
return summary, nil
}

func (r *Repo) Restore(ctx context.Context, snapshot string, callback func(*RestoreProgressEntry), opts ...GenericOption) (*RestoreProgressEntry, error) {
opts = append(slices.Clone(opts), WithEnv("RESTIC_PROGRESS_FPS=2"))
cmdCtx, cancel := context.WithCancel(ctx)
cmd := r.commandWithContext(cmdCtx, []string{"restore", "--json", snapshot}, opts...)
buf := buffer.New(32 * 1024) // 32KB IO buffer for the realtime event parsing
reader, writer := nio.Pipe(buf)
r.pipeCmdOutputToWriter(cmd, writer)

var readErr error
var summary *RestoreProgressEntry
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
var err error
summary, err = readRestoreProgressEntries(reader, callback)
if err != nil {
readErr = fmt.Errorf("processing command output: %w", err)
}
}()

cmdErr := cmd.Run()
writer.Close()
wg.Wait()
if cmdErr != nil || readErr != nil {
if cmdErr != nil {
var exitErr *exec.ExitError
if errors.As(cmdErr, &exitErr) {
if exitErr.ExitCode() == 3 {
cmdErr = ErrPartialBackup
} else {
cmdErr = fmt.Errorf("exit code %d: %w", exitErr.ExitCode(), ErrBackupFailed)
}
}
}

return summary, newCmdError(ctx, cmd, errors.Join(cmdErr, readErr))
}
return summary, nil
}
Expand Down Expand Up @@ -275,48 +314,6 @@ func (r *Repo) Check(ctx context.Context, checkOutput io.Writer, opts ...Generic
return nil
}

func (r *Repo) Restore(ctx context.Context, snapshot string, callback func(*RestoreProgressEntry), opts ...GenericOption) (*RestoreProgressEntry, error) {
opts = append(slices.Clone(opts), WithEnv("RESTIC_PROGRESS_FPS=2"))
cmdCtx, cancel := context.WithCancel(ctx)
cmd := r.commandWithContext(cmdCtx, []string{"restore", "--json", snapshot}, opts...)
buf := buffer.New(32 * 1024) // 32KB IO buffer for the realtime event parsing
reader, writer := nio.Pipe(buf)
r.pipeCmdOutputToWriter(cmd, writer)

var readErr error
var summary *RestoreProgressEntry
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
var err error
summary, err = readRestoreProgressEntries(reader, callback)
if err != nil {
readErr = fmt.Errorf("processing command output: %w", err)
}
}()

cmdErr := cmd.Run()
writer.Close()
wg.Wait()
if cmdErr != nil || readErr != nil {
if cmdErr != nil {
var exitErr *exec.ExitError
if errors.As(cmdErr, &exitErr) {
if exitErr.ExitCode() == 3 {
cmdErr = ErrPartialBackup
} else {
cmdErr = fmt.Errorf("exit code %d: %w", exitErr.ExitCode(), ErrBackupFailed)
}
}
}

return summary, newCmdError(ctx, cmd, errors.Join(cmdErr, readErr))
}
return summary, nil
}

func (r *Repo) ListDirectory(ctx context.Context, snapshot string, path string, opts ...GenericOption) (*Snapshot, []*LsEntry, error) {
if path == "" {
// an empty path can trigger very expensive operations (e.g. iterates all files in the snapshot)
Expand Down

0 comments on commit 232cd57

Please sign in to comment.