Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dequeue from api immediately if sqes are available #88

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/metrics"
"github.com/resonatehq/resonate/internal/util"
)

type API interface {
Expand Down Expand Up @@ -132,23 +133,35 @@
}

func (a *api) Dequeue(n int, timeoutCh <-chan time.Time) []*bus.SQE[t_api.Request, t_api.Response] {
util.Assert(n > 0, "submission batch size must be greater than 0")
sqes := []*bus.SQE[t_api.Request, t_api.Response]{}

if timeoutCh != nil {
// collects n entries or until a timeout occurs,
// collects n entries (if immediately available) or until a timeout occurs,
// whichever happens first
for i := 0; i < n; i++ {
select {
case sqe, ok := <-a.sq:
if !ok {
select {
case sqe, ok := <-a.sq:
if !ok {
return sqes
}

Check warning on line 146 in internal/api/api.go

View check run for this annotation

Codecov / codecov/patch

internal/api/api.go#L145-L146

Added lines #L145 - L146 were not covered by tests
slog.Debug("api:dequeue", "sqe", sqe)
sqes = append(sqes, sqe)

for i := 0; i < n-1; i++ {
select {
case sqe, ok := <-a.sq:
if !ok {
return sqes
}
slog.Debug("api:dequeue", "sqe", sqe)
sqes = append(sqes, sqe)
default:

Check warning on line 158 in internal/api/api.go

View check run for this annotation

Codecov / codecov/patch

internal/api/api.go#L151-L158

Added lines #L151 - L158 were not covered by tests
return sqes
}

slog.Debug("api:dequeue", "sqe", sqe)
sqes = append(sqes, sqe)
case <-timeoutCh:
return sqes
}
return sqes
case <-timeoutCh:
return sqes

Check warning on line 164 in internal/api/api.go

View check run for this annotation

Codecov / codecov/patch

internal/api/api.go#L163-L164

Added lines #L163 - L164 were not covered by tests
}
} else {
// collects n entries or until the channel is
Expand Down