Skip to content

Commit

Permalink
fix: rare deadlock in GetOperationEvents (#319)
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge authored Jun 6, 2024
1 parent 2d0c29e commit f42df20
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func (s *BackrestHandler) ListSnapshotFiles(ctx context.Context, req *connect.Re

// GetOperationEvents implements GET /v1/events/operations
func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.Request[emptypb.Empty], resp *connect.ServerStream[v1.OperationEvent]) error {

errChan := make(chan error, 1)
events := make(chan *v1.OperationEvent, 100)

Expand Down Expand Up @@ -221,23 +220,36 @@ func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.R
select {
case events <- event:
default:
errChan <- errors.New("event buffer overflow, closing stream for client retry and catchup")
select {
case errChan <- errors.New("event buffer overflow, closing stream for client retry and catchup"):
default:
}
}
}
s.oplog.Subscribe(&callback)
defer s.oplog.Unsubscribe(&callback)

for {
select {
case err := <-errChan:
return err
case <-ctx.Done():
return nil
case event := <-events:
if err := resp.Send(event); err != nil {
return fmt.Errorf("failed to write event: %w", err)
go func() {
for {
select {
case <-ctx.Done():
return
case event := <-events:
if err := resp.Send(event); err != nil {
select {
case errChan <- errors.New("failed to send event"):
default:
}
}
}
}
}()

select {
case err := <-errChan:
return err
case <-ctx.Done():
return nil
}
}

Expand Down

0 comments on commit f42df20

Please sign in to comment.