Skip to content

Commit

Permalink
fix: concurrency issues in run command handler
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed May 12, 2024
1 parent 9d6c1ba commit 411a4fb
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 9 deletions.
7 changes: 5 additions & 2 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"bytes"
"context"
"encoding/hex"
"errors"
Expand Down Expand Up @@ -419,14 +420,16 @@ func (s *BackrestHandler) RunCommand(ctx context.Context, req *connect.Request[v

ctx, cancel := context.WithCancel(ctx)

errChan := make(chan error, 1)
outputs := make(chan []byte, 100)
errChan := make(chan error, 1)
go func() {
start := time.Now()
if err := repo.RunCommand(ctx, req.Msg.Command, func(output []byte) {
outputs <- output
outputs <- bytes.Clone(output)
}); err != nil {
errChan <- err
}
outputs <- []byte("took " + time.Since(start).String())
cancel()
}()

Expand Down
2 changes: 1 addition & 1 deletion internal/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func validatePlan(plan *v1.Plan, repos map[string]*v1.Repo) error {

if plan.Retention != nil && plan.Retention.Policy == nil {
err = multierror.Append(err, errors.New("retention policy must be nil or must specify a policy"))
} else if policyTimeBucketed, ok := plan.Retention.Policy.(*v1.RetentionPolicy_PolicyTimeBucketed); ok {
} else if policyTimeBucketed, ok := plan.Retention.GetPolicy().(*v1.RetentionPolicy_PolicyTimeBucketed); ok {
if proto.Equal(policyTimeBucketed.PolicyTimeBucketed, &v1.RetentionPolicy_TimeBucketedCounts{}) {
err = multierror.Append(err, errors.New("time bucketed policy must specify a non-empty bucket"))
}
Expand Down
8 changes: 5 additions & 3 deletions internal/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ func NewOpLog(databasePath string) (*OpLog, error) {
// Scan checks the log for incomplete operations. Should only be called at startup.
func (o *OpLog) Scan(onIncomplete func(op *v1.Operation)) error {
zap.L().Debug("scanning oplog for incomplete operations")
t := time.Now()
err := o.db.Update(func(tx *bolt.Tx) error {
sysBucket := tx.Bucket(SystemBucket)
opLogBucket := tx.Bucket(OpLogBucket)
c := opLogBucket.Cursor()
var k, v []byte
if lastValidated := sysBucket.Get([]byte("last_validated")); lastValidated != nil {
c.Seek(lastValidated)
k, v = c.Seek(lastValidated)
}
for k, v := c.Prev(); k != nil; k, v = c.Next() {
for ; k != nil; k, v = c.Next() {
op := &v1.Operation{}
if err := proto.Unmarshal(v, op); err != nil {
zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err))
Expand Down Expand Up @@ -124,7 +126,7 @@ func (o *OpLog) Scan(onIncomplete func(op *v1.Operation)) error {
if err != nil {
return fmt.Errorf("scanning log: %v", err)
}
zap.L().Debug("scan complete")
zap.L().Debug("scan complete", zap.Duration("duration", time.Since(t)))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func chunkBy[T any](items []T, chunkSize int) (chunks [][]T) {
}

type callbackWriter struct {
callback func([]byte)
callback func([]byte) // note: callback must not retain the byte slice
}

func (w *callbackWriter) Write(p []byte) (n int, err error) {
Expand Down
4 changes: 2 additions & 2 deletions webui/src/state/oplog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ export const colorForStatus = (status: OperationStatus) => {
case OperationStatus.STATUS_SUCCESS:
return "green";
case OperationStatus.STATUS_USER_CANCELLED:
return "orange";
return "yellow";
default:
return "grey";
}
Expand Down Expand Up @@ -432,7 +432,7 @@ export const detailsForOperation = (
break;
case OperationStatus.STATUS_USER_CANCELLED:
state = "cancelled";
color = "orange";
color = "yellow";
break;
default:
state = "";
Expand Down

0 comments on commit 411a4fb

Please sign in to comment.