From f164256b469123b06b5b415740d92346eb977c58 Mon Sep 17 00:00:00 2001 From: David Farr Date: Tue, 26 Sep 2023 12:07:59 -0700 Subject: [PATCH] Finish pagination --- cmd/dst.go | 5 + cmd/serve.go | 4 + internal/app/coroutines/searchPromises.go | 22 +- .../subsystems/aio/store/postgres/postgres.go | 50 +- .../aio/store/postgres/postgres_test.go | 34 +- .../app/subsystems/aio/store/sqlite/sqlite.go | 15 +- .../aio/store/sqlite/sqlite_test.go | 6 +- .../app/subsystems/aio/store/test/util.go | 1436 +++++++++-------- pkg/promise/promise.go | 1 + pkg/promise/record.go | 2 + test/dst/dst.go | 2 +- test/dst/dst_test.go | 3 +- test/dst/generator.go | 17 +- test/dst/model.go | 16 + 14 files changed, 834 insertions(+), 779 deletions(-) diff --git a/cmd/dst.go b/cmd/dst.go index 4c672717..15f8fa6b 100644 --- a/cmd/dst.go +++ b/cmd/dst.go @@ -10,6 +10,7 @@ import ( netHttp "net/http" "os" "strings" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/resonatehq/resonate/internal/aio" @@ -229,21 +230,25 @@ func init() { dstRunCmd.Flags().Var(&rangeIntFlag{Min: 1, Max: 1000000}, "aio-size", "size of the completion queue buffered channel") dstRunCmd.Flags().String("aio-store", "sqlite", "promise store type") dstRunCmd.Flags().String("aio-store-sqlite-path", ":memory:", "sqlite database path") + dstRunCmd.Flags().Duration("aio-store-sqlite-tx-timeout", 1000*time.Millisecond, "sqlite transaction timeout") dstRunCmd.Flags().String("aio-store-postgres-host", "localhost", "postgres host") dstRunCmd.Flags().String("aio-store-postgres-port", "5432", "postgres port") dstRunCmd.Flags().String("aio-store-postgres-username", "", "postgres username") dstRunCmd.Flags().String("aio-store-postgres-password", "", "postgres password") dstRunCmd.Flags().String("aio-store-postgres-database", "resonate_dst", "postgres database name") + dstRunCmd.Flags().Duration("aio-store-postgres-tx-timeout", 1000*time.Millisecond, "postgres transaction timeout") dstRunCmd.Flags().Float32("aio-network-success-rate", 0.5, "simulated success rate of http requests") _ = viper.BindPFlag("dst.aio.size", dstRunCmd.Flags().Lookup("aio-size")) _ = viper.BindPFlag("dst.aio.subsystems.store.config.kind", dstRunCmd.Flags().Lookup("aio-store")) _ = viper.BindPFlag("dst.aio.subsystems.store.config.sqlite.path", dstRunCmd.Flags().Lookup("aio-store-sqlite-path")) + _ = viper.BindPFlag("dst.aio.subsystems.store.config.sqlite.txTimeout", dstRunCmd.Flags().Lookup("aio-store-sqlite-tx-timeout")) _ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.host", dstRunCmd.Flags().Lookup("aio-store-postgres-host")) _ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.port", dstRunCmd.Flags().Lookup("aio-store-postgres-port")) _ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.username", dstRunCmd.Flags().Lookup("aio-store-postgres-username")) _ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.password", dstRunCmd.Flags().Lookup("aio-store-postgres-password")) _ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.database", dstRunCmd.Flags().Lookup("aio-store-postgres-database")) + _ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.txTimeout", dstRunCmd.Flags().Lookup("aio-store-postgres-tx-timeout")) _ = viper.BindPFlag("dst.aio.subsystems.networkDST.config.p", dstRunCmd.Flags().Lookup("aio-network-success-rate")) // system diff --git a/cmd/serve.go b/cmd/serve.go index c57a59ef..f0fa9676 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -180,11 +180,13 @@ func init() { serveCmd.Flags().Int("aio-store-workers", 1, "number of concurrent connections to the store") serveCmd.Flags().Int("aio-store-batch-size", 100, "max submissions processed each tick by a store worker") serveCmd.Flags().String("aio-store-sqlite-path", "resonate.db", "sqlite database path") + serveCmd.Flags().Duration("aio-store-sqlite-tx-timeout", 250*time.Millisecond, "sqlite transaction timeout") serveCmd.Flags().String("aio-store-postgres-host", "localhost", "postgres host") serveCmd.Flags().String("aio-store-postgres-port", "5432", "postgres port") serveCmd.Flags().String("aio-store-postgres-username", "", "postgres username") serveCmd.Flags().String("aio-store-postgres-password", "", "postgres password") serveCmd.Flags().String("aio-store-postgres-database", "resonate", "postgres database name") + serveCmd.Flags().Duration("aio-store-postgres-tx-timeout", 250*time.Millisecond, "postgres transaction timeout") serveCmd.Flags().Int("aio-network-size", 100, "size of network submission queue buffered channel") serveCmd.Flags().Int("aio-network-workers", 3, "number of concurrent http requests") serveCmd.Flags().Int("aio-network-batch-size", 100, "max submissions processed each tick by a network worker") @@ -196,12 +198,14 @@ func init() { _ = viper.BindPFlag("aio.subsystems.store.workers", serveCmd.Flags().Lookup("aio-store-workers")) _ = viper.BindPFlag("aio.subsystems.store.batchSize", serveCmd.Flags().Lookup("aio-store-batch-size")) _ = viper.BindPFlag("aio.subsystems.store.config.sqlite.path", serveCmd.Flags().Lookup("aio-store-sqlite-path")) + _ = viper.BindPFlag("aio.subsystems.store.config.sqlite.txTimeout", serveCmd.Flags().Lookup("aio-store-sqlite-tx-timeout")) _ = viper.BindPFlag("aio.subsystems.store.config.postgres.host", serveCmd.Flags().Lookup("aio-store-postgres-host")) _ = viper.BindPFlag("aio.subsystems.store.config.postgres.port", serveCmd.Flags().Lookup("aio-store-postgres-port")) _ = viper.BindPFlag("aio.subsystems.store.config.postgres.username", serveCmd.Flags().Lookup("aio-store-postgres-username")) _ = viper.BindPFlag("aio.subsystems.store.config.postgres.password", serveCmd.Flags().Lookup("aio-store-postgres-password")) _ = viper.BindPFlag("aio.subsystems.store.config.postgres.database", serveCmd.Flags().Lookup("aio-store-postgres-database")) _ = viper.BindPFlag("aio.subsystems.store.config.postgres.database", serveCmd.Flags().Lookup("aio-store-postgres-database")) + _ = viper.BindPFlag("aio.subsystems.store.config.postgres.txTimeout", serveCmd.Flags().Lookup("aio-store-postgres-tx-timeout")) _ = viper.BindPFlag("aio.subsystems.network.size", serveCmd.Flags().Lookup("aio-network-size")) _ = viper.BindPFlag("aio.subsystems.network.workers", serveCmd.Flags().Lookup("aio-network-workers")) _ = viper.BindPFlag("aio.subsystems.network.batchSize", serveCmd.Flags().Lookup("aio-network-batch-size")) diff --git a/internal/app/coroutines/searchPromises.go b/internal/app/coroutines/searchPromises.go index dac6a8ba..bc612889 100644 --- a/internal/app/coroutines/searchPromises.go +++ b/internal/app/coroutines/searchPromises.go @@ -77,19 +77,25 @@ func SearchPromises(t1 int64, req *types.Request, res func(int64, *types.Respons promises = append(promises, p) } + // set cursor only if there are more results + var cursor *types.Cursor[types.SearchPromisesRequest] + if result.RowsReturned == int64(req.SearchPromises.Limit) { + cursor = &types.Cursor[types.SearchPromisesRequest]{ + Next: &types.SearchPromisesRequest{ + Q: req.SearchPromises.Q, + States: req.SearchPromises.States, + Limit: req.SearchPromises.Limit, + SortId: &result.LastSortId, + }, + } + } + res(t2, &types.Response{ Kind: types.SearchPromises, SearchPromises: &types.SearchPromisesResponse{ Status: types.ResponseOK, Promises: promises, - Cursor: &types.Cursor[types.SearchPromisesRequest]{ - Next: &types.SearchPromisesRequest{ - Q: req.SearchPromises.Q, - States: req.SearchPromises.States, - Limit: req.SearchPromises.Limit, - SortId: &result.LastSortId, - }, - }, + Cursor: cursor, }, }, nil) }) diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index d3ceb44f..fe7c0f3d 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -1,10 +1,12 @@ package postgres import ( + "context" "database/sql" "encoding/json" "fmt" "net/url" + "time" "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" @@ -66,15 +68,11 @@ const ( PRIMARY KEY(id, promise_id) );` - // DROP_TABLE_STATEMENT = ` - // DROP TABLE notifications; - // DROP TABLE subscriptions; - // DROP TABLE timeouts; - // DROP TABLE promises;` - DROP_TABLE_STATEMENT = ` - DROP DATABASE resonate_test; - CREATE DATABASE resonate_test;` + DROP TABLE notifications; + DROP TABLE subscriptions; + DROP TABLE timeouts; + DROP TABLE promises;` PROMISE_SELECT_STATEMENT = ` SELECT @@ -188,12 +186,12 @@ const ( ) type Config struct { - Host string - Port string - Username string - Password string - Database string - schema string + Host string + Port string + Username string + Password string + Database string + TxTimeout time.Duration } type PostgresStore struct { @@ -235,16 +233,6 @@ func (s *PostgresStore) String() string { } func (s *PostgresStore) Start() error { - // schema for unit tests - if s.config.schema != "" { - q := ` - CREATE SCHEMA %s; - ALTER USER %s SET search_path TO %s;` - if _, err := s.db.Exec(fmt.Sprintf(q, pq.QuoteIdentifier(s.config.schema), pq.QuoteIdentifier(s.config.Username), pq.QuoteIdentifier(s.config.schema))); err != nil { - return err - } - } - if _, err := s.db.Exec(CREATE_TABLE_STATEMENT); err != nil { return err } @@ -257,6 +245,10 @@ func (s *PostgresStore) Stop() error { } func (s *PostgresStore) Reset() error { + if _, err := s.db.Exec(DROP_TABLE_STATEMENT); err != nil { + return err + } + return nil } @@ -274,7 +266,10 @@ func (w *PostgresStoreWorker) Process(sqes []*bus.SQE[types.Submission, types.Co func (w *PostgresStoreWorker) Execute(transactions []*types.Transaction) ([][]*types.Result, error) { util.Assert(len(transactions) > 0, "expected a transaction") - tx, err := w.db.Begin() + ctx, cancel := context.WithTimeout(context.Background(), w.config.TxTimeout) + defer cancel() + + tx, err := w.db.BeginTx(ctx, nil) if err != nil { return nil, err } @@ -507,13 +502,14 @@ func (w *PostgresStoreWorker) searchPromises(tx *sql.Tx, cmd *types.SearchPromis &record.Tags, &record.CreatedOn, &record.CompletedOn, - &lastSortId, // keep track of the last sort id + &record.SortId, ); err != nil { return nil, err } - rowsReturned++ records = append(records, record) + lastSortId = record.SortId + rowsReturned++ } return &types.Result{ diff --git a/internal/app/subsystems/aio/store/postgres/postgres_test.go b/internal/app/subsystems/aio/store/postgres/postgres_test.go index c8841418..02a9519f 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres_test.go +++ b/internal/app/subsystems/aio/store/postgres/postgres_test.go @@ -1,32 +1,32 @@ package postgres import ( - "fmt" "os" "testing" + "time" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/test" ) func TestPostgresStore(t *testing.T) { - host := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_HOST", "") - port := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_PORT", "localhost") - username := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_USERNAME", "username") - password := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_PASSWORD", "password") - database := withDefault("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_DATABASE", "resonate_test") + host := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_HOST") + port := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_PORT") + username := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_USERNAME") + password := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_PASSWORD") + database := os.Getenv("TEST_AIO_SUBSYSTEMS_STORE_CONFIG_POSTGRES_DATABASE") if host == "" { t.Skip("Postgres is not configured, skipping") } - for i, tc := range test.TestCases { + for _, tc := range test.TestCases { store, err := New(&Config{ - Host: host, - Port: port, - Username: username, - Password: password, - Database: database, - schema: fmt.Sprintf("test_%d", i), + Host: host, + Port: port, + Username: username, + Password: password, + Database: database, + TxTimeout: 250 * time.Millisecond, }, 1) if err != nil { t.Fatal(err) @@ -47,11 +47,3 @@ func TestPostgresStore(t *testing.T) { } } } - -func withDefault(key, defaultValue string) string { - value, exists := os.LookupEnv(key) - if !exists { - return defaultValue - } - return value -} diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 438319e5..31df778f 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -1,10 +1,12 @@ package sqlite import ( + "context" "database/sql" "encoding/json" "fmt" "os" + "time" "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" @@ -177,7 +179,8 @@ const ( ) type Config struct { - Path string + Path string + TxTimeout time.Duration } type SqliteStore struct { @@ -236,7 +239,10 @@ func (w *SqliteStoreWorker) Process(sqes []*bus.SQE[types.Submission, types.Comp func (w *SqliteStoreWorker) Execute(transactions []*types.Transaction) ([][]*types.Result, error) { util.Assert(len(transactions) > 0, "expected a transaction") - tx, err := w.db.Begin() + ctx, cancel := context.WithTimeout(context.Background(), w.config.TxTimeout) + defer cancel() + + tx, err := w.db.BeginTx(ctx, nil) if err != nil { return nil, err } @@ -469,13 +475,14 @@ func (w *SqliteStoreWorker) searchPromises(tx *sql.Tx, cmd *types.SearchPromises &record.Tags, &record.CreatedOn, &record.CompletedOn, - &lastSortId, // keep track of the last sort id + &record.SortId, ); err != nil { return nil, err } - rowsReturned++ records = append(records, record) + lastSortId = record.SortId + rowsReturned++ } return &types.Result{ diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite_test.go b/internal/app/subsystems/aio/store/sqlite/sqlite_test.go index 41355ee3..50fec54a 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite_test.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite_test.go @@ -2,13 +2,17 @@ package sqlite import ( "testing" + "time" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/test" ) func TestSqliteStore(t *testing.T) { for _, tc := range test.TestCases { - store, err := New(&Config{Path: ":memory:"}) + store, err := New(&Config{ + Path: ":memory:", + TxTimeout: 250 * time.Millisecond, + }) if err != nil { t.Fatal(err) } diff --git a/internal/app/subsystems/aio/store/test/util.go b/internal/app/subsystems/aio/store/test/util.go index 2e5b6c8d..ad410d1f 100644 --- a/internal/app/subsystems/aio/store/test/util.go +++ b/internal/app/subsystems/aio/store/test/util.go @@ -61,13 +61,13 @@ func (c *testCase) Panic() bool { var TestCases = []*testCase{ { - name: "SearchPromises", + name: "CreatePromise", commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ Id: "foo", - Timeout: 3, + Timeout: 1, Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, @@ -76,13 +76,48 @@ var TestCases = []*testCase{ CreatedOn: 1, }, }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.ReadPromiseCommand{ + Id: "foo", + }, + }, + }, + expected: []*types.Result{ + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.QueryPromisesResult{ + RowsReturned: 1, + Records: []*promise.PromiseRecord{{ + Id: "foo", + State: 1, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + Timeout: 1, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + }}, + }, + }, + }, + }, + { + name: "CreatePromiseWithIdKey", + commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ Id: "bar", - Timeout: 3, + Timeout: 2, Param: promise.Value{ Headers: map[string]string{}, + Ikey: ikey("bar"), Data: []byte{}, }, Tags: map[string]string{}, @@ -90,17 +125,40 @@ var TestCases = []*testCase{ }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "bar", - State: 2, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - CompletedOn: 2, + Kind: types.StoreReadPromise, + ReadPromise: &types.ReadPromiseCommand{ + Id: "bar", + }, + }, + }, + expected: []*types.Result{ + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.QueryPromisesResult{ + RowsReturned: 1, + Records: []*promise.PromiseRecord{{ + Id: "bar", + State: 1, + ParamHeaders: []byte("{}"), + ParamIkey: ikey("bar"), + ParamData: []byte{}, + Timeout: 2, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + }}, }, }, + }, + }, + { + name: "CreatePromiseWithIdKeyAndParam", + commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ @@ -108,133 +166,127 @@ var TestCases = []*testCase{ Timeout: 3, Param: promise.Value{ Headers: map[string]string{}, - Data: []byte{}, + Ikey: ikey("baz"), + Data: []byte("baz"), }, Tags: map[string]string{}, CreatedOn: 1, }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "baz", - State: 4, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - CompletedOn: 2, + Kind: types.StoreReadPromise, + ReadPromise: &types.ReadPromiseCommand{ + Id: "baz", }, }, + }, + expected: []*types.Result{ { Kind: types.StoreCreatePromise, - CreatePromise: &types.CreatePromiseCommand{ - Id: "qux", - Timeout: 3, - Param: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - Tags: map[string]string{}, - CreatedOn: 1, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "qux", - State: 8, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - CompletedOn: 2, + Kind: types.StoreReadPromise, + ReadPromise: &types.QueryPromisesResult{ + RowsReturned: 1, + Records: []*promise.PromiseRecord{{ + Id: "baz", + State: 1, + ParamHeaders: []byte("{}"), + ParamIkey: ikey("baz"), + ParamData: []byte("baz"), + Timeout: 3, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + }}, }, }, + }, + }, + { + name: "CreatePromiseWithIdKeyAndParamAndHeaders", + commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "quy", + Id: "baz", Timeout: 3, Param: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, + Headers: map[string]string{ + "a": "a", + "b": "b", + "c": "c", + }, + Ikey: ikey("baz"), + Data: []byte("baz"), }, Tags: map[string]string{}, CreatedOn: 1, }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "quy", - State: 16, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - CompletedOn: 3, + Kind: types.StoreReadPromise, + ReadPromise: &types.ReadPromiseCommand{ + Id: "baz", }, }, + }, + expected: []*types.Result{ { - Kind: types.StoreSearchPromises, - SearchPromises: &types.SearchPromisesCommand{ - Q: "*", - States: []promise.State{ - promise.Pending, - }, - Limit: 3, + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, }, }, { - Kind: types.StoreSearchPromises, - SearchPromises: &types.SearchPromisesCommand{ - Q: "*", - States: []promise.State{ - promise.Resolved, - }, - Limit: 3, + Kind: types.StoreReadPromise, + ReadPromise: &types.QueryPromisesResult{ + RowsReturned: 1, + Records: []*promise.PromiseRecord{{ + Id: "baz", + State: 1, + ParamHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), + ParamIkey: ikey("baz"), + ParamData: []byte("baz"), + Timeout: 3, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + }}, }, }, + }, + }, + { + name: "CreatePromiseWithIdKeyAndParamAndHeadersAndTags", + commands: []*types.Command{ { - Kind: types.StoreSearchPromises, - SearchPromises: &types.SearchPromisesCommand{ - Q: "*", - States: []promise.State{ - promise.Rejected, - promise.Timedout, - promise.Canceled, + Kind: types.StoreCreatePromise, + CreatePromise: &types.CreatePromiseCommand{ + Id: "baz", + Timeout: 3, + Param: promise.Value{ + Headers: map[string]string{ + "a": "a", + "b": "b", + "c": "c", + }, + Ikey: ikey("baz"), + Data: []byte("baz"), }, - Limit: 3, - }, - }, - { - Kind: types.StoreSearchPromises, - SearchPromises: &types.SearchPromisesCommand{ - Q: "*", - States: []promise.State{ - promise.Pending, - promise.Resolved, - promise.Rejected, - promise.Timedout, - promise.Canceled, + Tags: map[string]string{ + "x": "x", + "y": "y", + "z": "z", }, - Limit: 3, + CreatedOn: 1, }, }, { - Kind: types.StoreSearchPromises, - SearchPromises: &types.SearchPromisesCommand{ - Q: "*", - States: []promise.State{ - promise.Pending, - promise.Resolved, - promise.Rejected, - promise.Timedout, - promise.Canceled, - }, - SortId: int64ToPointer(3), - Limit: 3, + Kind: types.StoreReadPromise, + ReadPromise: &types.ReadPromiseCommand{ + Id: "baz", }, }, }, @@ -246,222 +298,30 @@ var TestCases = []*testCase{ }, }, { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreSearchPromises, - SearchPromises: &types.QueryPromisesResult{ - RowsReturned: 1, - LastSortId: 1, - Records: []*promise.PromiseRecord{ - { - Id: "foo", - State: 1, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - Tags: []byte("{}"), - }, - }, - }, - }, - { - Kind: types.StoreSearchPromises, - SearchPromises: &types.QueryPromisesResult{ + Kind: types.StoreReadPromise, + ReadPromise: &types.QueryPromisesResult{ RowsReturned: 1, - LastSortId: 2, - Records: []*promise.PromiseRecord{ - { - Id: "bar", - State: 2, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - Tags: []byte("{}"), - }, - }, - }, - }, - { - Kind: types.StoreSearchPromises, - SearchPromises: &types.QueryPromisesResult{ - RowsReturned: 3, - LastSortId: 3, - Records: []*promise.PromiseRecord{ - { - Id: "quy", - State: 16, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(3), - Tags: []byte("{}"), - }, - { - Id: "qux", - State: 8, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - Tags: []byte("{}"), - }, - { - Id: "baz", - State: 4, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - Tags: []byte("{}"), - }, - }, - }, - }, - { - Kind: types.StoreSearchPromises, - SearchPromises: &types.QueryPromisesResult{ - RowsReturned: 3, - LastSortId: 3, - Records: []*promise.PromiseRecord{ - { - Id: "quy", - State: 16, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(3), - Tags: []byte("{}"), - }, - { - Id: "qux", - State: 8, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - Tags: []byte("{}"), - }, - { - Id: "baz", - State: 4, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - Tags: []byte("{}"), - }, - }, - }, - }, - { - Kind: types.StoreSearchPromises, - SearchPromises: &types.QueryPromisesResult{ - RowsReturned: 2, - LastSortId: 1, - Records: []*promise.PromiseRecord{ - { - Id: "bar", - State: 2, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - Tags: []byte("{}"), - }, - { - Id: "foo", - State: 1, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - Timeout: 3, - CreatedOn: int64ToPointer(1), - Tags: []byte("{}"), - }, - }, + Records: []*promise.PromiseRecord{{ + Id: "baz", + State: 1, + ParamHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), + ParamIkey: ikey("baz"), + ParamData: []byte("baz"), + Timeout: 3, + Tags: []byte(`{"x":"x","y":"y","z":"z"}`), + CreatedOn: int64ToPointer(1), + }}, }, }, }, }, { - name: "CreatePromise", + name: "CreatePromiseTwice", commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "foo", - Timeout: 1, + Id: "foo", Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, @@ -470,60 +330,18 @@ var TestCases = []*testCase{ CreatedOn: 1, }, }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "foo", - }, - }, - }, - expected: []*types.Result{ - { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.QueryPromisesResult{ - RowsReturned: 1, - Records: []*promise.PromiseRecord{{ - Id: "foo", - State: 1, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - Timeout: 1, - Tags: []byte("{}"), - CreatedOn: int64ToPointer(1), - }}, - }, - }, - }, - }, - { - name: "CreatePromiseWithIdKey", - commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "bar", - Timeout: 2, + Id: "foo", Param: promise.Value{ Headers: map[string]string{}, - Ikey: ikey("bar"), Data: []byte{}, }, Tags: map[string]string{}, CreatedOn: 1, }, }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "bar", - }, - }, }, expected: []*types.Result{ { @@ -533,97 +351,76 @@ var TestCases = []*testCase{ }, }, { - Kind: types.StoreReadPromise, - ReadPromise: &types.QueryPromisesResult{ - RowsReturned: 1, - Records: []*promise.PromiseRecord{{ - Id: "bar", - State: 1, - ParamHeaders: []byte("{}"), - ParamIkey: ikey("bar"), - ParamData: []byte{}, - Timeout: 2, - Tags: []byte("{}"), - CreatedOn: int64ToPointer(1), - }}, + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 0, }, }, }, }, { - name: "CreatePromiseWithIdKeyAndParam", + name: "UpdatePromise", commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "baz", - Timeout: 3, + Id: "foo", + Timeout: 1, Param: promise.Value{ Headers: map[string]string{}, - Ikey: ikey("baz"), - Data: []byte("baz"), + Data: []byte{}, }, Tags: map[string]string{}, CreatedOn: 1, }, }, { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "baz", - }, - }, - }, - expected: []*types.Result{ - { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "foo", + State: 2, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, }, }, { Kind: types.StoreReadPromise, - ReadPromise: &types.QueryPromisesResult{ - RowsReturned: 1, - Records: []*promise.PromiseRecord{{ - Id: "baz", - State: 1, - ParamHeaders: []byte("{}"), - ParamIkey: ikey("baz"), - ParamData: []byte("baz"), - Timeout: 3, - Tags: []byte("{}"), - CreatedOn: int64ToPointer(1), - }}, + ReadPromise: &types.ReadPromiseCommand{ + Id: "foo", }, }, - }, - }, - { - name: "CreatePromiseWithIdKeyAndParamAndHeaders", - commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "baz", - Timeout: 3, + Id: "bar", + Timeout: 2, Param: promise.Value{ - Headers: map[string]string{ - "a": "a", - "b": "b", - "c": "c", - }, - Ikey: ikey("baz"), - Data: []byte("baz"), + Headers: map[string]string{}, + Data: []byte{}, }, Tags: map[string]string{}, CreatedOn: 1, }, }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "bar", + State: 4, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, + }, + }, { Kind: types.StoreReadPromise, ReadPromise: &types.ReadPromiseCommand{ - Id: "baz", + Id: "bar", }, }, }, @@ -634,60 +431,39 @@ var TestCases = []*testCase{ RowsAffected: 1, }, }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, { Kind: types.StoreReadPromise, ReadPromise: &types.QueryPromisesResult{ RowsReturned: 1, Records: []*promise.PromiseRecord{{ - Id: "baz", - State: 1, - ParamHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), - ParamIkey: ikey("baz"), - ParamData: []byte("baz"), - Timeout: 3, + Id: "foo", + State: 2, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 1, Tags: []byte("{}"), CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, - }, - }, - { - name: "CreatePromiseWithIdKeyAndParamAndHeadersAndTags", - commands: []*types.Command{ { Kind: types.StoreCreatePromise, - CreatePromise: &types.CreatePromiseCommand{ - Id: "baz", - Timeout: 3, - Param: promise.Value{ - Headers: map[string]string{ - "a": "a", - "b": "b", - "c": "c", - }, - Ikey: ikey("baz"), - Data: []byte("baz"), - }, - Tags: map[string]string{ - "x": "x", - "y": "y", - "z": "z", - }, - CreatedOn: 1, - }, - }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "baz", + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, }, }, - }, - expected: []*types.Result{ { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ RowsAffected: 1, }, }, @@ -696,26 +472,29 @@ var TestCases = []*testCase{ ReadPromise: &types.QueryPromisesResult{ RowsReturned: 1, Records: []*promise.PromiseRecord{{ - Id: "baz", - State: 1, - ParamHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), - ParamIkey: ikey("baz"), - ParamData: []byte("baz"), - Timeout: 3, - Tags: []byte(`{"x":"x","y":"y","z":"z"}`), + Id: "bar", + State: 4, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 2, + Tags: []byte("{}"), CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, }, }, { - name: "CreatePromiseTwice", + name: "UpdatePromiseWithIdKey", commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "foo", + Id: "foo", + Timeout: 1, Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, @@ -724,10 +503,30 @@ var TestCases = []*testCase{ CreatedOn: 1, }, }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "foo", + State: 2, + Value: promise.Value{ + Headers: map[string]string{}, + Ikey: ikey("foo"), + Data: []byte{}, + }, + CompletedOn: 2, + }, + }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.ReadPromiseCommand{ + Id: "foo", + }, + }, { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "foo", + Id: "bar", + Timeout: 2, Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, @@ -736,6 +535,25 @@ var TestCases = []*testCase{ CreatedOn: 1, }, }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "bar", + State: 4, + Value: promise.Value{ + Headers: map[string]string{}, + Ikey: ikey("bar"), + Data: []byte{}, + }, + CompletedOn: 2, + }, + }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.ReadPromiseCommand{ + Id: "bar", + }, + }, }, expected: []*types.Result{ { @@ -744,16 +562,66 @@ var TestCases = []*testCase{ RowsAffected: 1, }, }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.QueryPromisesResult{ + RowsReturned: 1, + Records: []*promise.PromiseRecord{{ + Id: "foo", + State: 2, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + ValueIkey: ikey("foo"), + Timeout: 1, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + }}, + }, + }, { Kind: types.StoreCreatePromise, CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 0, + RowsAffected: 1, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.QueryPromisesResult{ + RowsReturned: 1, + Records: []*promise.PromiseRecord{{ + Id: "bar", + State: 4, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + ValueIkey: ikey("bar"), + Timeout: 2, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + }}, }, }, }, }, { - name: "UpdatePromise", + name: "UpdatePromiseWithIdKeyAndValue", commands: []*types.Command{ { Kind: types.StoreCreatePromise, @@ -775,7 +643,8 @@ var TestCases = []*testCase{ State: 2, Value: promise.Value{ Headers: map[string]string{}, - Data: []byte{}, + Ikey: ikey("foo"), + Data: []byte("foo"), }, CompletedOn: 2, }, @@ -806,7 +675,8 @@ var TestCases = []*testCase{ State: 4, Value: promise.Value{ Headers: map[string]string{}, - Data: []byte{}, + Ikey: ikey("bar"), + Data: []byte("bar"), }, CompletedOn: 2, }, @@ -841,7 +711,8 @@ var TestCases = []*testCase{ ParamHeaders: []byte("{}"), ParamData: []byte{}, ValueHeaders: []byte("{}"), - ValueData: []byte{}, + ValueIkey: ikey("foo"), + ValueData: []byte("foo"), Timeout: 1, Tags: []byte("{}"), CreatedOn: int64ToPointer(1), @@ -871,7 +742,8 @@ var TestCases = []*testCase{ ParamHeaders: []byte("{}"), ParamData: []byte{}, ValueHeaders: []byte("{}"), - ValueData: []byte{}, + ValueIkey: ikey("bar"), + ValueData: []byte("bar"), Timeout: 2, Tags: []byte("{}"), CreatedOn: int64ToPointer(1), @@ -882,7 +754,7 @@ var TestCases = []*testCase{ }, }, { - name: "UpdatePromiseWithIdKey", + name: "UpdatePromiseWithIdKeyAndValueAndHeaders", commands: []*types.Command{ { Kind: types.StoreCreatePromise, @@ -903,9 +775,13 @@ var TestCases = []*testCase{ Id: "foo", State: 2, Value: promise.Value{ - Headers: map[string]string{}, - Ikey: ikey("foo"), - Data: []byte{}, + Headers: map[string]string{ + "a": "a", + "b": "b", + "c": "c", + }, + Ikey: ikey("foo"), + Data: []byte("foo"), }, CompletedOn: 2, }, @@ -935,9 +811,13 @@ var TestCases = []*testCase{ Id: "bar", State: 4, Value: promise.Value{ - Headers: map[string]string{}, - Ikey: ikey("bar"), - Data: []byte{}, + Headers: map[string]string{ + "a": "a", + "b": "b", + "c": "c", + }, + Ikey: ikey("bar"), + Data: []byte("bar"), }, CompletedOn: 2, }, @@ -971,9 +851,9 @@ var TestCases = []*testCase{ State: 2, ParamHeaders: []byte("{}"), ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, + ValueHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), ValueIkey: ikey("foo"), + ValueData: []byte("foo"), Timeout: 1, Tags: []byte("{}"), CreatedOn: int64ToPointer(1), @@ -1002,9 +882,9 @@ var TestCases = []*testCase{ State: 4, ParamHeaders: []byte("{}"), ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueData: []byte{}, + ValueHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), ValueIkey: ikey("bar"), + ValueData: []byte("bar"), Timeout: 2, Tags: []byte("{}"), CreatedOn: int64ToPointer(1), @@ -1015,13 +895,12 @@ var TestCases = []*testCase{ }, }, { - name: "UpdatePromiseWithIdKeyAndValue", + name: "UpdatePromiseTwice", commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "foo", - Timeout: 1, + Id: "foo", Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, @@ -1037,29 +916,112 @@ var TestCases = []*testCase{ State: 2, Value: promise.Value{ Headers: map[string]string{}, - Ikey: ikey("foo"), - Data: []byte("foo"), + Data: []byte{}, }, CompletedOn: 2, }, }, { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "foo", + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "foo", + State: 2, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, }, }, { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "bar", - Timeout: 2, + Id: "bar", Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, }, - Tags: map[string]string{}, - CreatedOn: 1, + Tags: map[string]string{}, + CreatedOn: 1, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "bar", + State: 4, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "bar", + State: 4, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, + }, + }, + }, + expected: []*types.Result{ + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 0, + }, + }, + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 0, + }, + }, + }, + }, + { + name: "UpdatePromiseBeforeCreatePromise", + commands: []*types.Command{ + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "foo", + State: 2, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, }, }, { @@ -1069,92 +1031,54 @@ var TestCases = []*testCase{ State: 4, Value: promise.Value{ Headers: map[string]string{}, - Ikey: ikey("bar"), - Data: []byte("bar"), + Data: []byte{}, }, CompletedOn: 2, }, }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "bar", - }, - }, }, expected: []*types.Result{ { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 0, }, }, { Kind: types.StoreUpdatePromise, UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, + RowsAffected: 0, }, }, + }, + }, + { + name: "ReadPromiseThatDoesNotExist", + commands: []*types.Command{ { Kind: types.StoreReadPromise, - ReadPromise: &types.QueryPromisesResult{ - RowsReturned: 1, - Records: []*promise.PromiseRecord{{ - Id: "foo", - State: 2, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueIkey: ikey("foo"), - ValueData: []byte("foo"), - Timeout: 1, - Tags: []byte("{}"), - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - }}, - }, - }, - { - Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, + ReadPromise: &types.ReadPromiseCommand{ + Id: "foo", }, }, + }, + expected: []*types.Result{ { Kind: types.StoreReadPromise, ReadPromise: &types.QueryPromisesResult{ - RowsReturned: 1, - Records: []*promise.PromiseRecord{{ - Id: "bar", - State: 4, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte("{}"), - ValueIkey: ikey("bar"), - ValueData: []byte("bar"), - Timeout: 2, - Tags: []byte("{}"), - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - }}, + RowsReturned: 0, }, }, }, }, { - name: "UpdatePromiseWithIdKeyAndValueAndHeaders", + name: "SearchPromises", commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ Id: "foo", - Timeout: 1, + Timeout: 3, Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, @@ -1163,34 +1087,11 @@ var TestCases = []*testCase{ CreatedOn: 1, }, }, - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "foo", - State: 2, - Value: promise.Value{ - Headers: map[string]string{ - "a": "a", - "b": "b", - "c": "c", - }, - Ikey: ikey("foo"), - Data: []byte("foo"), - }, - CompletedOn: 2, - }, - }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "foo", - }, - }, { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ Id: "bar", - Timeout: 2, + Timeout: 3, Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, @@ -1203,98 +1104,69 @@ var TestCases = []*testCase{ Kind: types.StoreUpdatePromise, UpdatePromise: &types.UpdatePromiseCommand{ Id: "bar", - State: 4, + State: 2, Value: promise.Value{ - Headers: map[string]string{ - "a": "a", - "b": "b", - "c": "c", - }, - Ikey: ikey("bar"), - Data: []byte("bar"), + Headers: map[string]string{}, + Data: []byte{}, }, CompletedOn: 2, }, }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "bar", - }, - }, - }, - expected: []*types.Result{ { Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, + CreatePromise: &types.CreatePromiseCommand{ + Id: "baz", + Timeout: 3, + Param: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.QueryPromisesResult{ - RowsReturned: 1, - Records: []*promise.PromiseRecord{{ - Id: "foo", - State: 2, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), - ValueIkey: ikey("foo"), - ValueData: []byte("foo"), - Timeout: 1, - Tags: []byte("{}"), - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - }}, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "baz", + State: 4, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, }, }, { Kind: types.StoreCreatePromise, - CreatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, + CreatePromise: &types.CreatePromiseCommand{ + Id: "qux", + Timeout: 3, + Param: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 1, - }, - }, - { - Kind: types.StoreReadPromise, - ReadPromise: &types.QueryPromisesResult{ - RowsReturned: 1, - Records: []*promise.PromiseRecord{{ - Id: "bar", - State: 4, - ParamHeaders: []byte("{}"), - ParamData: []byte{}, - ValueHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), - ValueIkey: ikey("bar"), - ValueData: []byte("bar"), - Timeout: 2, - Tags: []byte("{}"), - CreatedOn: int64ToPointer(1), - CompletedOn: int64ToPointer(2), - }}, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "qux", + State: 8, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, }, }, - }, - }, - { - name: "UpdatePromiseTwice", - commands: []*types.Command{ { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: "foo", + Id: "quy", + Timeout: 3, Param: promise.Value{ Headers: map[string]string{}, Data: []byte{}, @@ -1306,65 +1178,84 @@ var TestCases = []*testCase{ { Kind: types.StoreUpdatePromise, UpdatePromise: &types.UpdatePromiseCommand{ - Id: "foo", - State: 2, + Id: "quy", + State: 16, Value: promise.Value{ Headers: map[string]string{}, Data: []byte{}, }, - CompletedOn: 2, + CompletedOn: 3, }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "foo", - State: 2, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, + Kind: types.StoreSearchPromises, + SearchPromises: &types.SearchPromisesCommand{ + Q: "*", + States: []promise.State{ + promise.Pending, + }, + Limit: 3, + }, + }, + { + Kind: types.StoreSearchPromises, + SearchPromises: &types.SearchPromisesCommand{ + Q: "*", + States: []promise.State{ + promise.Resolved, + }, + Limit: 3, + }, + }, + { + Kind: types.StoreSearchPromises, + SearchPromises: &types.SearchPromisesCommand{ + Q: "*", + States: []promise.State{ + promise.Rejected, + promise.Timedout, + promise.Canceled, }, - CompletedOn: 2, + Limit: 3, }, }, { - Kind: types.StoreCreatePromise, - CreatePromise: &types.CreatePromiseCommand{ - Id: "bar", - Param: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, + Kind: types.StoreSearchPromises, + SearchPromises: &types.SearchPromisesCommand{ + Q: "*", + States: []promise.State{ + promise.Pending, + promise.Resolved, + promise.Rejected, + promise.Timedout, + promise.Canceled, }, - Tags: map[string]string{}, - CreatedOn: 1, + Limit: 3, }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "bar", - State: 4, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, + Kind: types.StoreSearchPromises, + SearchPromises: &types.SearchPromisesCommand{ + Q: "*", + States: []promise.State{ + promise.Pending, + promise.Resolved, + promise.Rejected, + promise.Timedout, + promise.Canceled, }, - CompletedOn: 2, + SortId: int64ToPointer(3), + Limit: 3, }, }, + }, + expected: []*types.Result{ { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "bar", - State: 4, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - CompletedOn: 2, + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, }, }, - }, - expected: []*types.Result{ { Kind: types.StoreCreatePromise, CreatePromise: &types.AlterPromisesResult{ @@ -1377,10 +1268,16 @@ var TestCases = []*testCase{ RowsAffected: 1, }, }, + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, { Kind: types.StoreUpdatePromise, UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 0, + RowsAffected: 1, }, }, { @@ -1396,71 +1293,184 @@ var TestCases = []*testCase{ }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 0, + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, }, }, - }, - }, - { - name: "UpdatePromiseBeforeCreatePromise", - commands: []*types.Command{ { Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "foo", - State: 2, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - CompletedOn: 2, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: "bar", - State: 4, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, + Kind: types.StoreSearchPromises, + SearchPromises: &types.QueryPromisesResult{ + RowsReturned: 1, + LastSortId: 1, + Records: []*promise.PromiseRecord{ + { + Id: "foo", + State: 1, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + Tags: []byte("{}"), + SortId: 1, + }, }, - CompletedOn: 2, }, }, - }, - expected: []*types.Result{ { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 0, + Kind: types.StoreSearchPromises, + SearchPromises: &types.QueryPromisesResult{ + RowsReturned: 1, + LastSortId: 2, + Records: []*promise.PromiseRecord{ + { + Id: "bar", + State: 2, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 2, + }, + }, }, }, { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.AlterPromisesResult{ - RowsAffected: 0, + Kind: types.StoreSearchPromises, + SearchPromises: &types.QueryPromisesResult{ + RowsReturned: 3, + LastSortId: 3, + Records: []*promise.PromiseRecord{ + { + Id: "quy", + State: 16, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(3), + Tags: []byte("{}"), + SortId: 5, + }, + { + Id: "qux", + State: 8, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 4, + }, + { + Id: "baz", + State: 4, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 3, + }, + }, }, }, - }, - }, - { - name: "ReadPromiseThatDoesNotExist", - commands: []*types.Command{ { - Kind: types.StoreReadPromise, - ReadPromise: &types.ReadPromiseCommand{ - Id: "foo", + Kind: types.StoreSearchPromises, + SearchPromises: &types.QueryPromisesResult{ + RowsReturned: 3, + LastSortId: 3, + Records: []*promise.PromiseRecord{ + { + Id: "quy", + State: 16, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(3), + Tags: []byte("{}"), + SortId: 5, + }, + { + Id: "qux", + State: 8, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 4, + }, + { + Id: "baz", + State: 4, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 3, + }, + }, }, }, - }, - expected: []*types.Result{ { - Kind: types.StoreReadPromise, - ReadPromise: &types.QueryPromisesResult{ - RowsReturned: 0, + Kind: types.StoreSearchPromises, + SearchPromises: &types.QueryPromisesResult{ + RowsReturned: 2, + LastSortId: 1, + Records: []*promise.PromiseRecord{ + { + Id: "bar", + State: 2, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + ValueHeaders: []byte("{}"), + ValueData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 2, + }, + { + Id: "foo", + State: 1, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + Timeout: 3, + CreatedOn: int64ToPointer(1), + Tags: []byte("{}"), + SortId: 1, + }, + }, }, }, }, diff --git a/pkg/promise/promise.go b/pkg/promise/promise.go index eb7587a8..4b1aa173 100644 --- a/pkg/promise/promise.go +++ b/pkg/promise/promise.go @@ -15,6 +15,7 @@ type Promise struct { CreatedOn *int64 `json:"createdOn,omitempty"` CompletedOn *int64 `json:"completedOn,omitempty"` Tags map[string]string `json:"tags"` + SortId int64 `json:"-"` // unexported } func (p *Promise) String() string { diff --git a/pkg/promise/record.go b/pkg/promise/record.go index 4dc9309a..50923dc7 100644 --- a/pkg/promise/record.go +++ b/pkg/promise/record.go @@ -17,6 +17,7 @@ type PromiseRecord struct { CreatedOn *int64 CompletedOn *int64 Tags []byte + SortId int64 } func (r *PromiseRecord) Promise() (*Promise, error) { @@ -44,6 +45,7 @@ func (r *PromiseRecord) Promise() (*Promise, error) { CreatedOn: r.CreatedOn, CompletedOn: r.CompletedOn, Tags: tags, + SortId: r.SortId, }, nil } diff --git a/test/dst/dst.go b/test/dst/dst.go index 94a9fc38..a362031b 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -81,7 +81,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System) // test loop for t := int64(0); t < d.config.Ticks; t++ { - for _, req := range generator.Generate(r, t, d.config.Reqs()) { + for _, req := range generator.Generate(r, t, d.config.Reqs(), model.cursors) { req := req reqTime := t api.Enqueue(&bus.SQE[types.Request, types.Response]{ diff --git a/test/dst/dst_test.go b/test/dst/dst_test.go index f5943782..7a29a67e 100644 --- a/test/dst/dst_test.go +++ b/test/dst/dst_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" // nosemgrep "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/resonatehq/resonate/internal/aio" @@ -37,7 +38,7 @@ func TestDST(t *testing.T) { // instatiate aio subsystems network := network.NewDST(&network.ConfigDST{P: 0.5}, r) - store, err := sqlite.New(&sqlite.Config{Path: ":memory:"}) + store, err := sqlite.New(&sqlite.Config{Path: ":memory:", TxTimeout: 250 * time.Millisecond}) if err != nil { t.Fatal(err) } diff --git a/test/dst/generator.go b/test/dst/generator.go index 19411746..e7b32a25 100644 --- a/test/dst/generator.go +++ b/test/dst/generator.go @@ -88,12 +88,23 @@ func (g *Generator) AddRequest(request RequestGenerator) { g.requests = append(g.requests, request) } -func (g *Generator) Generate(r *rand.Rand, t int64, n int) []*types.Request { +func (g *Generator) Generate(r *rand.Rand, t int64, n int, cursors []*types.Request) []*types.Request { reqs := []*types.Request{} for i := 0; i < n; i++ { - f := g.requests[r.Intn(len(g.requests))] - reqs = append(reqs, f(r, t)) + bound := len(g.requests) + if len(cursors) > 0 { + bound = bound + 1 + } + + switch j := r.Intn(bound); j { + case len(g.requests): + reqs = append(reqs, cursors[r.Intn(len(cursors))]) + default: + f := g.requests[j] + reqs = append(reqs, f(r, t)) + } + } return reqs diff --git a/test/dst/model.go b/test/dst/model.go index e76bc53b..514fe0dd 100644 --- a/test/dst/model.go +++ b/test/dst/model.go @@ -10,6 +10,7 @@ import ( type Model struct { promises Promises responses map[types.APIKind]ResponseValidator + cursors []*types.Request } func NewModel() *Model { @@ -23,6 +24,10 @@ func (m *Model) AddResponse(kind types.APIKind, response ResponseValidator) { m.responses[kind] = response } +func (m *Model) addCursor(next *types.Request) { + m.cursors = append(m.cursors, next) +} + func (m *Model) Step(req *types.Request, res *types.Response, err error) error { if err != nil { switch err.Error() { @@ -59,11 +64,22 @@ func (m *Model) ValidateSearchPromises(req *types.Request, res *types.Response) states[state] = true } + if res.SearchPromises.Cursor != nil { + m.addCursor(&types.Request{ + Kind: types.SearchPromises, + SearchPromises: res.SearchPromises.Cursor.Next, + }) + } + for _, p := range res.SearchPromises.Promises { if _, ok := states[p.State]; !ok { return fmt.Errorf("unexpected state %s, searched for %s", p.State, req.SearchPromises.States) } + if req.SearchPromises.SortId != nil && *req.SearchPromises.SortId <= p.SortId { + return fmt.Errorf("unexpected sortId, promise sortId %d is greater than the request sortId %d", *req.SearchPromises.SortId, p.SortId) + } + pm := m.promises.Get(p.Id) if err := pm.searchPromise(req.SearchPromises, res.SearchPromises, p); err != nil { return err