Skip to content

Commit

Permalink
Finish pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
dfarr committed Sep 26, 2023
1 parent f44c1d1 commit f164256
Show file tree
Hide file tree
Showing 14 changed files with 834 additions and 779 deletions.
5 changes: 5 additions & 0 deletions cmd/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
netHttp "net/http"
"os"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/resonatehq/resonate/internal/aio"
Expand Down Expand Up @@ -229,21 +230,25 @@ func init() {
dstRunCmd.Flags().Var(&rangeIntFlag{Min: 1, Max: 1000000}, "aio-size", "size of the completion queue buffered channel")
dstRunCmd.Flags().String("aio-store", "sqlite", "promise store type")
dstRunCmd.Flags().String("aio-store-sqlite-path", ":memory:", "sqlite database path")
dstRunCmd.Flags().Duration("aio-store-sqlite-tx-timeout", 1000*time.Millisecond, "sqlite transaction timeout")
dstRunCmd.Flags().String("aio-store-postgres-host", "localhost", "postgres host")
dstRunCmd.Flags().String("aio-store-postgres-port", "5432", "postgres port")
dstRunCmd.Flags().String("aio-store-postgres-username", "", "postgres username")
dstRunCmd.Flags().String("aio-store-postgres-password", "", "postgres password")
dstRunCmd.Flags().String("aio-store-postgres-database", "resonate_dst", "postgres database name")
dstRunCmd.Flags().Duration("aio-store-postgres-tx-timeout", 1000*time.Millisecond, "postgres transaction timeout")
dstRunCmd.Flags().Float32("aio-network-success-rate", 0.5, "simulated success rate of http requests")

_ = viper.BindPFlag("dst.aio.size", dstRunCmd.Flags().Lookup("aio-size"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.kind", dstRunCmd.Flags().Lookup("aio-store"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.sqlite.path", dstRunCmd.Flags().Lookup("aio-store-sqlite-path"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.sqlite.txTimeout", dstRunCmd.Flags().Lookup("aio-store-sqlite-tx-timeout"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.host", dstRunCmd.Flags().Lookup("aio-store-postgres-host"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.port", dstRunCmd.Flags().Lookup("aio-store-postgres-port"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.username", dstRunCmd.Flags().Lookup("aio-store-postgres-username"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.password", dstRunCmd.Flags().Lookup("aio-store-postgres-password"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.database", dstRunCmd.Flags().Lookup("aio-store-postgres-database"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.txTimeout", dstRunCmd.Flags().Lookup("aio-store-postgres-tx-timeout"))
_ = viper.BindPFlag("dst.aio.subsystems.networkDST.config.p", dstRunCmd.Flags().Lookup("aio-network-success-rate"))

// system
Expand Down
4 changes: 4 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,13 @@ func init() {
serveCmd.Flags().Int("aio-store-workers", 1, "number of concurrent connections to the store")
serveCmd.Flags().Int("aio-store-batch-size", 100, "max submissions processed each tick by a store worker")
serveCmd.Flags().String("aio-store-sqlite-path", "resonate.db", "sqlite database path")
serveCmd.Flags().Duration("aio-store-sqlite-tx-timeout", 250*time.Millisecond, "sqlite transaction timeout")
serveCmd.Flags().String("aio-store-postgres-host", "localhost", "postgres host")
serveCmd.Flags().String("aio-store-postgres-port", "5432", "postgres port")
serveCmd.Flags().String("aio-store-postgres-username", "", "postgres username")
serveCmd.Flags().String("aio-store-postgres-password", "", "postgres password")
serveCmd.Flags().String("aio-store-postgres-database", "resonate", "postgres database name")
serveCmd.Flags().Duration("aio-store-postgres-tx-timeout", 250*time.Millisecond, "postgres transaction timeout")
serveCmd.Flags().Int("aio-network-size", 100, "size of network submission queue buffered channel")
serveCmd.Flags().Int("aio-network-workers", 3, "number of concurrent http requests")
serveCmd.Flags().Int("aio-network-batch-size", 100, "max submissions processed each tick by a network worker")
Expand All @@ -196,12 +198,14 @@ func init() {
_ = viper.BindPFlag("aio.subsystems.store.workers", serveCmd.Flags().Lookup("aio-store-workers"))
_ = viper.BindPFlag("aio.subsystems.store.batchSize", serveCmd.Flags().Lookup("aio-store-batch-size"))
_ = viper.BindPFlag("aio.subsystems.store.config.sqlite.path", serveCmd.Flags().Lookup("aio-store-sqlite-path"))
_ = viper.BindPFlag("aio.subsystems.store.config.sqlite.txTimeout", serveCmd.Flags().Lookup("aio-store-sqlite-tx-timeout"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.host", serveCmd.Flags().Lookup("aio-store-postgres-host"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.port", serveCmd.Flags().Lookup("aio-store-postgres-port"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.username", serveCmd.Flags().Lookup("aio-store-postgres-username"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.password", serveCmd.Flags().Lookup("aio-store-postgres-password"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.database", serveCmd.Flags().Lookup("aio-store-postgres-database"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.database", serveCmd.Flags().Lookup("aio-store-postgres-database"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.txTimeout", serveCmd.Flags().Lookup("aio-store-postgres-tx-timeout"))
_ = viper.BindPFlag("aio.subsystems.network.size", serveCmd.Flags().Lookup("aio-network-size"))
_ = viper.BindPFlag("aio.subsystems.network.workers", serveCmd.Flags().Lookup("aio-network-workers"))
_ = viper.BindPFlag("aio.subsystems.network.batchSize", serveCmd.Flags().Lookup("aio-network-batch-size"))
Expand Down
22 changes: 14 additions & 8 deletions internal/app/coroutines/searchPromises.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,25 @@ func SearchPromises(t1 int64, req *types.Request, res func(int64, *types.Respons
promises = append(promises, p)
}

// set cursor only if there are more results
var cursor *types.Cursor[types.SearchPromisesRequest]
if result.RowsReturned == int64(req.SearchPromises.Limit) {
cursor = &types.Cursor[types.SearchPromisesRequest]{
Next: &types.SearchPromisesRequest{
Q: req.SearchPromises.Q,
States: req.SearchPromises.States,
Limit: req.SearchPromises.Limit,
SortId: &result.LastSortId,
},
}
}

res(t2, &types.Response{
Kind: types.SearchPromises,
SearchPromises: &types.SearchPromisesResponse{
Status: types.ResponseOK,
Promises: promises,
Cursor: &types.Cursor[types.SearchPromisesRequest]{
Next: &types.SearchPromisesRequest{
Q: req.SearchPromises.Q,
States: req.SearchPromises.States,
Limit: req.SearchPromises.Limit,
SortId: &result.LastSortId,
},
},
Cursor: cursor,
},
}, nil)
})
Expand Down
50 changes: 23 additions & 27 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package postgres

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/url"
"time"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store"
Expand Down Expand Up @@ -66,15 +68,11 @@ const (
PRIMARY KEY(id, promise_id)
);`

// DROP_TABLE_STATEMENT = `
// DROP TABLE notifications;
// DROP TABLE subscriptions;
// DROP TABLE timeouts;
// DROP TABLE promises;`

DROP_TABLE_STATEMENT = `
DROP DATABASE resonate_test;
CREATE DATABASE resonate_test;`
DROP TABLE notifications;
DROP TABLE subscriptions;
DROP TABLE timeouts;
DROP TABLE promises;`

PROMISE_SELECT_STATEMENT = `
SELECT
Expand Down Expand Up @@ -188,12 +186,12 @@ const (
)

type Config struct {
Host string
Port string
Username string
Password string
Database string
schema string
Host string
Port string
Username string
Password string
Database string
TxTimeout time.Duration
}

type PostgresStore struct {
Expand Down Expand Up @@ -235,16 +233,6 @@ func (s *PostgresStore) String() string {
}

func (s *PostgresStore) Start() error {
// schema for unit tests
if s.config.schema != "" {
q := `
CREATE SCHEMA %s;
ALTER USER %s SET search_path TO %s;`
if _, err := s.db.Exec(fmt.Sprintf(q, pq.QuoteIdentifier(s.config.schema), pq.QuoteIdentifier(s.config.Username), pq.QuoteIdentifier(s.config.schema))); err != nil {
return err
}
}

if _, err := s.db.Exec(CREATE_TABLE_STATEMENT); err != nil {
return err
}
Expand All @@ -257,6 +245,10 @@ func (s *PostgresStore) Stop() error {
}

func (s *PostgresStore) Reset() error {
if _, err := s.db.Exec(DROP_TABLE_STATEMENT); err != nil {
return err
}

return nil
}

Expand All @@ -274,7 +266,10 @@ func (w *PostgresStoreWorker) Process(sqes []*bus.SQE[types.Submission, types.Co
func (w *PostgresStoreWorker) Execute(transactions []*types.Transaction) ([][]*types.Result, error) {
util.Assert(len(transactions) > 0, "expected a transaction")

tx, err := w.db.Begin()
ctx, cancel := context.WithTimeout(context.Background(), w.config.TxTimeout)
defer cancel()

tx, err := w.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -507,13 +502,14 @@ func (w *PostgresStoreWorker) searchPromises(tx *sql.Tx, cmd *types.SearchPromis
&record.Tags,
&record.CreatedOn,
&record.CompletedOn,
&lastSortId, // keep track of the last sort id
&record.SortId,
); err != nil {
return nil, err
}

rowsReturned++
records = append(records, record)
lastSortId = record.SortId
rowsReturned++
}

return &types.Result{
Expand Down
34 changes: 13 additions & 21 deletions internal/app/subsystems/aio/store/postgres/postgres_test.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
package postgres

import (
"fmt"
"os"
"testing"
"time"

"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/test"
)

func TestPostgresStore(t *testing.T) {
host := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_HOST", "")
port := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_PORT", "localhost")
username := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_USERNAME", "username")
password := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_PASSWORD", "password")
database := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_DATABASE", "resonate_test")
host := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_HOST")
port := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_PORT")
username := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_USERNAME")
password := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_PASSWORD")
database := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_DATABASE")

if host == "" {
t.Skip("Postgres is not configured, skipping")
}

for i, tc := range test.TestCases {
for _, tc := range test.TestCases {
store, err := New(&Config{
Host: host,
Port: port,
Username: username,
Password: password,
Database: database,
schema: fmt.Sprintf("test_%d", i),
Host: host,
Port: port,
Username: username,
Password: password,
Database: database,
TxTimeout: 250 * time.Millisecond,
}, 1)
if err != nil {
t.Fatal(err)
Expand All @@ -47,11 +47,3 @@ func TestPostgresStore(t *testing.T) {
}
}
}

func withDefault(key, defaultValue string) string {
value, exists := os.LookupEnv(key)
if !exists {
return defaultValue
}
return value
}
15 changes: 11 additions & 4 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package sqlite

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"time"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store"
Expand Down Expand Up @@ -177,7 +179,8 @@ const (
)

type Config struct {
Path string
Path string
TxTimeout time.Duration
}

type SqliteStore struct {
Expand Down Expand Up @@ -236,7 +239,10 @@ func (w *SqliteStoreWorker) Process(sqes []*bus.SQE[types.Submission, types.Comp
func (w *SqliteStoreWorker) Execute(transactions []*types.Transaction) ([][]*types.Result, error) {
util.Assert(len(transactions) > 0, "expected a transaction")

tx, err := w.db.Begin()
ctx, cancel := context.WithTimeout(context.Background(), w.config.TxTimeout)
defer cancel()

tx, err := w.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -469,13 +475,14 @@ func (w *SqliteStoreWorker) searchPromises(tx *sql.Tx, cmd *types.SearchPromises
&record.Tags,
&record.CreatedOn,
&record.CompletedOn,
&lastSortId, // keep track of the last sort id
&record.SortId,
); err != nil {
return nil, err
}

rowsReturned++
records = append(records, record)
lastSortId = record.SortId
rowsReturned++
}

return &types.Result{
Expand Down
6 changes: 5 additions & 1 deletion internal/app/subsystems/aio/store/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package sqlite

import (
"testing"
"time"

"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/test"
)

func TestSqliteStore(t *testing.T) {
for _, tc := range test.TestCases {
store, err := New(&Config{Path: ":memory:"})
store, err := New(&Config{
Path: ":memory:",
TxTimeout: 250 * time.Millisecond,
})
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit f164256

Please sign in to comment.