Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pagination to subscriptions #71

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions internal/app/coroutines/readSubscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func ReadSubscriptions(t int64, req *types.Request, res func(int64, *types.Respo
{
Kind: types.StoreReadSubscriptions,
ReadSubscriptions: &types.ReadSubscriptionsCommand{
PromiseIds: []string{req.ReadSubscriptions.PromiseId},
PromiseId: req.ReadSubscriptions.PromiseId,
Limit: req.ReadSubscriptions.Limit,
SortId: req.ReadSubscriptions.SortId,
},
},
},
Expand All @@ -37,10 +39,10 @@ func ReadSubscriptions(t int64, req *types.Request, res func(int64, *types.Respo

util.Assert(completion.Store != nil, "completion must not be nil")

records := completion.Store.Results[0].ReadSubscriptions.Records
result := completion.Store.Results[0].ReadSubscriptions
subscriptions := []*subscription.Subscription{}

for _, record := range records {
for _, record := range result.Records {
subscription, err := record.Subscription()
if err != nil {
slog.Warn("failed to parse subscription record", "record", record, "err", err)
Expand All @@ -50,10 +52,23 @@ func ReadSubscriptions(t int64, req *types.Request, res func(int64, *types.Respo
subscriptions = append(subscriptions, subscription)
}

// set cursor only if there are more results
var cursor *types.Cursor[types.ReadSubscriptionsRequest]
if result.RowsReturned == int64(req.ReadSubscriptions.Limit) {
cursor = &types.Cursor[types.ReadSubscriptionsRequest]{
Next: &types.ReadSubscriptionsRequest{
PromiseId: req.ReadSubscriptions.PromiseId,
Limit: req.ReadSubscriptions.Limit,
SortId: &result.LastSortId,
},
}
}

res(t, &types.Response{
Kind: types.ReadSubscriptions,
ReadSubscriptions: &types.ReadSubscriptionsResponse{
Status: types.ResponseOK,
Cursor: cursor,
Subscriptions: subscriptions,
},
}, nil)
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/searchPromises.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func SearchPromises(t int64, req *types.Request, res func(int64, *types.Response
Kind: types.SearchPromises,
SearchPromises: &types.SearchPromisesResponse{
Status: types.ResponseOK,
Promises: promises,
Cursor: cursor,
Promises: promises,
},
}, nil)
})
Expand Down
35 changes: 21 additions & 14 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/resonatehq/resonate/pkg/subscription"
"github.com/resonatehq/resonate/pkg/timeout"

"github.com/lib/pq"
_ "github.com/lib/pq"
)

const (
Expand All @@ -41,7 +41,7 @@ const (
PRIMARY KEY(id)
);

CREATE INDEX IF NOT EXISTS idx_sort_id ON promises(sort_id);
CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id);

CREATE TABLE IF NOT EXISTS timeouts (
id TEXT,
Expand All @@ -51,13 +51,16 @@ const (

CREATE TABLE IF NOT EXISTS subscriptions (
id TEXT,
sort_id SERIAL,
promise_id TEXT,
url TEXT,
retry_policy BYTEA,
created_on BIGINT,
PRIMARY KEY(id, promise_id)
);

CREATE INDEX IF NOT EXISTS idx_subscriptions_sort_id ON subscriptions(sort_id);

CREATE TABLE IF NOT EXISTS notifications (
id TEXT,
promise_id TEXT,
Expand Down Expand Up @@ -147,13 +150,16 @@ const (

SUBSCRIPTION_SELECT_ALL_STATEMENT = `
SELECT
id, promise_id, url, retry_policy, created_on
FROM
subscriptions
WHERE
promise_id = ANY($1)
id, promise_id, url, retry_policy, created_on, sort_id
FROM
subscriptions
WHERE
($1::int IS NULL OR sort_id < $1) AND
promise_id = $2
ORDER BY
id, promise_id`
sort_id DESC
LIMIT
$3`

SUBSCRIPTION_INSERT_STATEMENT = `
INSERT INTO subscriptions
Expand All @@ -180,7 +186,7 @@ const (
FROM
notifications
ORDER BY
time ASC, id, promise_id
time ASC, promise_id, id
LIMIT $1`

NOTIFICATION_INSERT_STATEMENT = `
Expand Down Expand Up @@ -765,32 +771,33 @@ func (w *PostgresStoreWorker) readSubscription(tx *sql.Tx, cmd *types.ReadSubscr
}

func (w *PostgresStoreWorker) readSubscriptions(tx *sql.Tx, cmd *types.ReadSubscriptionsCommand) (*types.Result, error) {
util.Assert(len(cmd.PromiseIds) > 0, "expected a promise id")

// select
rows, err := tx.Query(SUBSCRIPTION_SELECT_ALL_STATEMENT, pq.Array(cmd.PromiseIds))
rows, err := tx.Query(SUBSCRIPTION_SELECT_ALL_STATEMENT, cmd.SortId, cmd.PromiseId, cmd.Limit)
if err != nil {
return nil, err
}
defer rows.Close()

rowsReturned := int64(0)
var records []*subscription.SubscriptionRecord
var lastSortId int64

for rows.Next() {
record := &subscription.SubscriptionRecord{}
if err := rows.Scan(&record.Id, &record.PromiseId, &record.Url, &record.RetryPolicy, &record.CreatedOn); err != nil {
if err := rows.Scan(&record.Id, &record.PromiseId, &record.Url, &record.RetryPolicy, &record.CreatedOn, &record.SortId); err != nil {
return nil, err
}

rowsReturned++
records = append(records, record)
lastSortId = record.SortId
rowsReturned++
}

return &types.Result{
Kind: types.StoreReadSubscriptions,
ReadSubscriptions: &types.QuerySubscriptionsResult{
RowsReturned: rowsReturned,
LastSortId: lastSortId,
Records: records,
},
}, nil
Expand Down
44 changes: 18 additions & 26 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -40,7 +39,7 @@ const (
completed_on INTEGER
);

CREATE INDEX IF NOT EXISTS idx_id ON promises(id);
CREATE INDEX IF NOT EXISTS idx_promises_id ON promises(id);

CREATE TABLE IF NOT EXISTS timeouts (
id TEXT,
Expand All @@ -51,12 +50,15 @@ const (
CREATE TABLE IF NOT EXISTS subscriptions (
id TEXT,
promise_id TEXT,
sort_id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
retry_policy BLOB,
created_on INTEGER,
PRIMARY KEY(id, promise_id)
UNIQUE(id, promise_id)
);

CREATE INDEX IF NOT EXISTS idx_subscriptions_id ON subscriptions(id);

CREATE TABLE IF NOT EXISTS notifications (
id TEXT,
promise_id TEXT,
Expand Down Expand Up @@ -140,13 +142,16 @@ const (

SUBSCRIPTION_SELECT_ALL_STATEMENT = `
SELECT
id, promise_id, url, retry_policy, created_on
id, promise_id, url, retry_policy, created_on, sort_id
FROM
subscriptions
WHERE
promise_id IN (%s)
(? IS NULL OR sort_id < ?) AND
promise_id = ?
ORDER BY
id, promise_id`
sort_id DESC
LIMIT
?`

SUBSCRIPTION_INSERT_STATEMENT = `
INSERT INTO subscriptions
Expand All @@ -173,7 +178,7 @@ const (
FROM
notifications
ORDER BY
time ASC, id, promise_id
time ASC, promise_id, id
LIMIT ?`

NOTIFICATION_INSERT_STATEMENT = `
Expand Down Expand Up @@ -741,46 +746,33 @@ func (w *SqliteStoreWorker) readSubscription(tx *sql.Tx, cmd *types.ReadSubscrip
}

func (w *SqliteStoreWorker) readSubscriptions(tx *sql.Tx, cmd *types.ReadSubscriptionsCommand) (*types.Result, error) {
util.Assert(len(cmd.PromiseIds) > 0, "expected a promise id")

// select
var placeholders string
promiseIds := make([]interface{}, len(cmd.PromiseIds))

for i, promiseId := range cmd.PromiseIds {
if i == len(cmd.PromiseIds)-1 {
placeholders += "?"
} else {
placeholders += "?,"
}

promiseIds[i] = promiseId
}

stmt := fmt.Sprintf(SUBSCRIPTION_SELECT_ALL_STATEMENT, placeholders)
rows, err := tx.Query(stmt, promiseIds...)
rows, err := tx.Query(SUBSCRIPTION_SELECT_ALL_STATEMENT, cmd.SortId, cmd.SortId, cmd.PromiseId, cmd.Limit)
if err != nil {
return nil, err
}
defer rows.Close()

rowsReturned := int64(0)
var records []*subscription.SubscriptionRecord
var lastSortId int64

for rows.Next() {
record := &subscription.SubscriptionRecord{}
if err := rows.Scan(&record.Id, &record.PromiseId, &record.Url, &record.RetryPolicy, &record.CreatedOn); err != nil {
if err := rows.Scan(&record.Id, &record.PromiseId, &record.Url, &record.RetryPolicy, &record.CreatedOn, &record.SortId); err != nil {
return nil, err
}

rowsReturned++
records = append(records, record)
lastSortId = record.SortId
rowsReturned++
}

return &types.Result{
Kind: types.StoreReadSubscriptions,
ReadSubscriptions: &types.QuerySubscriptionsResult{
RowsReturned: rowsReturned,
LastSortId: lastSortId,
Records: records,
},
}, nil
Expand Down
Loading