Skip to content

Commit

Permalink
fix: improve concurrency handling in RunCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed May 8, 2024
1 parent a821580 commit 07b0950
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 15 deletions.
46 changes: 34 additions & 12 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,34 +420,56 @@ func (s *BackrestHandler) RunCommand(ctx context.Context, req *connect.Request[v
ctx, cancel := context.WithCancel(ctx)

errChan := make(chan error, 1)
var outputBuf []byte

outputs := make(chan []byte, 100)
go func() {
if err := repo.RunCommand(ctx, req.Msg.Command, func(output []byte) {
outputBuf = append(outputBuf, output...)
outputs <- output
}); err != nil {
errChan <- err
}
cancel()
}()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

bufSize := 32 * 1024
buf := make([]byte, 0, bufSize)

flush := func() error {
if len(buf) > 0 {
if err := resp.Send(&types.BytesValue{Value: buf}); err != nil {
return fmt.Errorf("failed to write output: %w", err)
}
buf = buf[:0]
}
return nil
}

for {
select {
case err := <-errChan:
if err := resp.Send(&types.BytesValue{Value: outputBuf}); err != nil {
return fmt.Errorf("failed to write output: %w", err)
if err := flush(); err != nil {
return err
}
return err
case <-ctx.Done():
if err := resp.Send(&types.BytesValue{Value: outputBuf}); err != nil {
return fmt.Errorf("failed to write output: %w", err)
return flush()
case output := <-outputs:
if len(output)+len(buf) > bufSize {
flush()
}
return nil
case <-time.After(100 * time.Millisecond):
if err := resp.Send(&types.BytesValue{Value: outputBuf}); err != nil {
return fmt.Errorf("failed to write output: %w", err)
if len(output) > bufSize {
if err := resp.Send(&types.BytesValue{Value: output}); err != nil {
return fmt.Errorf("failed to write output: %w", err)
}
continue
}
buf = append(buf, output...)
case <-ticker.C:
if len(buf) > 0 {
flush()
}
outputBuf = outputBuf[:0] // clear the buffer and continue
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/tasks/taskforget.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner)
func useLegacyCompatMode(oplog *oplog.OpLog, planID string) (bool, error) {
instanceIDs := make(map[string]struct{})
if err := oplog.ForEachByPlan(planID, indexutil.CollectAll(), func(op *v1.Operation) error {
if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok && !snapshotOp.OperationIndexSnapshot.GetForgot() {
tags := snapshotOp.OperationIndexSnapshot.GetSnapshot().GetTags()
instanceIDs[repo.InstanceIDFromTags(tags)] = struct{}{}
}
Expand Down
4 changes: 2 additions & 2 deletions webui/src/views/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export const App: React.FC = () => {
) {
alertApi.error(
"Failed to fetch initial config, typically this means the UI could not connect to the backend",
0,
0
);
return;
}
Expand All @@ -80,7 +80,7 @@ export const App: React.FC = () => {
alertApi.error(err.message, 0);
alertApi.error(
"Failed to fetch initial config, typically this means the UI could not connect to the backend",
0,
0
);
});
}, []);
Expand Down

0 comments on commit 07b0950

Please sign in to comment.