diff --git a/internal/api/api.go b/internal/api/api.go index 87eb8834..46d7c750 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -11,6 +11,7 @@ import ( "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_api" "github.com/resonatehq/resonate/internal/metrics" + "github.com/resonatehq/resonate/internal/util" ) type API interface { @@ -132,23 +133,35 @@ func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) { } func (a *api) Dequeue(n int, timeoutCh <-chan time.Time) []*bus.SQE[t_api.Request, t_api.Response] { + util.Assert(n > 0, "submission batch size must be greater than 0") sqes := []*bus.SQE[t_api.Request, t_api.Response]{} if timeoutCh != nil { - // collects n entries or until a timeout occurs, + // collects n entries (if immediately available) or until a timeout occurs, // whichever happens first - for i := 0; i < n; i++ { - select { - case sqe, ok := <-a.sq: - if !ok { + select { + case sqe, ok := <-a.sq: + if !ok { + return sqes + } + slog.Debug("api:dequeue", "sqe", sqe) + sqes = append(sqes, sqe) + + for i := 0; i < n-1; i++ { + select { + case sqe, ok := <-a.sq: + if !ok { + return sqes + } + slog.Debug("api:dequeue", "sqe", sqe) + sqes = append(sqes, sqe) + default: return sqes } - - slog.Debug("api:dequeue", "sqe", sqe) - sqes = append(sqes, sqe) - case <-timeoutCh: - return sqes } + return sqes + case <-timeoutCh: + return sqes } } else { // collects n entries or until the channel is