Skip to content

Commit

Permalink
fix(race): move double writes
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel Guerra committed Nov 9, 2023
1 parent 9b7740e commit 41048d7
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions internal/app/subsystems/api/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (s *Service) metadata(id string, name string) *metadata.Metadata {

func (s *Service) ReadPromise(id string, header *Header) (*t_api.ReadPromiseResponse, error) {
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)
defer close(cq)

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "read-promise"),
Expand Down Expand Up @@ -122,7 +121,6 @@ func (s *Service) SearchPromises(header *Header, params *SearchPromiseParams) (*
}

cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)
defer close(cq)

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "search-promises"),
Expand All @@ -146,7 +144,6 @@ func (s *Service) SearchPromises(header *Header, params *SearchPromiseParams) (*

func (s *Service) CreatePromise(id string, header *CreatePromiseHeader, body *CreatePromiseBody) (*t_api.CreatePromiseResponse, error) {
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)
defer close(cq)

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "create-promise"),
Expand Down Expand Up @@ -177,7 +174,6 @@ func (s *Service) CreatePromise(id string, header *CreatePromiseHeader, body *Cr

func (s *Service) CancelPromise(id string, header *CancelPromiseHeader, body *CancelPromiseBody) (*t_api.CancelPromiseResponse, error) {
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)
defer close(cq)

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "cancel-promise"),
Expand Down Expand Up @@ -206,7 +202,6 @@ func (s *Service) CancelPromise(id string, header *CancelPromiseHeader, body *Ca

func (s *Service) ResolvePromise(id string, header *ResolvePromiseHeader, body *ResolvePromiseBody) (*t_api.ResolvePromiseResponse, error) {
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)
defer close(cq)

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "resolve-promise"),
Expand Down Expand Up @@ -235,7 +230,6 @@ func (s *Service) ResolvePromise(id string, header *ResolvePromiseHeader, body *

func (s *Service) RejectPromise(id string, header *RejectPromiseHeader, body *RejectPromiseBody) (*t_api.RejectPromiseResponse, error) {
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)
defer close(cq)

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "reject-promise"),
Expand Down Expand Up @@ -268,6 +262,11 @@ func (s *Service) sendOrPanic(cq chan *bus.CQE[t_api.Request, t_api.Response]) f
Error: err,
}

cq <- cqe // writing to a closed channel panics
select {
case cq <- cqe:
close(cq) // prevent further writes
default:
panic("response channel must not block")
}
}
}

0 comments on commit 41048d7

Please sign in to comment.