From 7992c6fbf48bfe2a11682f0d9254aae3201cd24e Mon Sep 17 00:00:00 2001 From: David Farr Date: Wed, 18 Oct 2023 18:12:56 -0700 Subject: [PATCH] Dequeue from api immediately if sqes are available --- internal/api/api.go | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index 87eb8834..46d7c750 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -11,6 +11,7 @@ import ( "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_api" "github.com/resonatehq/resonate/internal/metrics" + "github.com/resonatehq/resonate/internal/util" ) type API interface { @@ -132,23 +133,35 @@ func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) { } func (a *api) Dequeue(n int, timeoutCh <-chan time.Time) []*bus.SQE[t_api.Request, t_api.Response] { + util.Assert(n > 0, "submission batch size must be greater than 0") sqes := []*bus.SQE[t_api.Request, t_api.Response]{} if timeoutCh != nil { - // collects n entries or until a timeout occurs, + // collects n entries (if immediately available) or until a timeout occurs, // whichever happens first - for i := 0; i < n; i++ { - select { - case sqe, ok := <-a.sq: - if !ok { + select { + case sqe, ok := <-a.sq: + if !ok { + return sqes + } + slog.Debug("api:dequeue", "sqe", sqe) + sqes = append(sqes, sqe) + + for i := 0; i < n-1; i++ { + select { + case sqe, ok := <-a.sq: + if !ok { + return sqes + } + slog.Debug("api:dequeue", "sqe", sqe) + sqes = append(sqes, sqe) + default: return sqes } - - slog.Debug("api:dequeue", "sqe", sqe) - sqes = append(sqes, sqe) - case <-timeoutCh: - return sqes } + return sqes + case <-timeoutCh: + return sqes } } else { // collects n entries or until the channel is