From ed9fdc0e48a8292b0ba5e44d9046cb74add5d780 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Tue, 19 Dec 2023 19:55:41 -0500 Subject: [PATCH 1/2] feat(postgres): added postgres support for schedules --- .../subsystems/aio/store/postgres/postgres.go | 236 ++++++++++++++++++ .../app/subsystems/aio/store/test/cases.go | 220 ++++++++++++++++ 2 files changed, 456 insertions(+) diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 0c6d30e1..1303d043 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -17,6 +17,7 @@ import ( "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/notification" "github.com/resonatehq/resonate/pkg/promise" + "github.com/resonatehq/resonate/pkg/schedule" "github.com/resonatehq/resonate/pkg/subscription" "github.com/resonatehq/resonate/pkg/timeout" @@ -46,6 +47,23 @@ const ( CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id); CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises(invocation); + CREATE TABLE IF NOT EXISTS schedules ( + id TEXT, + description TEXT, + cron TEXT, + promise_id TEXT, + promise_param_headers JSONB, + promise_param_data BYTEA, + promise_timeout BIGINT, + last_run_time BIGINT, + next_run_time BIGINT, + created_on BIGINT, + idempotency_key TEXT, + PRIMARY KEY(id) + ); + + CREATE INDEX IF NOT EXISTS idx_next_run_time ON schedules (next_run_time); + CREATE TABLE IF NOT EXISTS timeouts ( id TEXT, time BIGINT, @@ -224,6 +242,40 @@ const ( NOTIFICATION_DELETE_STATEMENT = ` DELETE FROM notifications WHERE id = $1 AND promise_id = $2` + + SCHEDULE_SELECT_STATEMENT = ` + SELECT + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + FROM + schedules + WHERE + id = $1` + + SCHEDULE_SELECT_ALL_STATEMENT = ` + SELECT + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + FROM + schedules + WHERE + next_run_time <= $1` + + SCHEDULE_INSERT_STATEMENT = ` + INSERT INTO schedules + (id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key) + VALUES + ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT(id) DO NOTHING` + + SCHEDULE_UPDATE_STATEMENT = ` + UPDATE + schedules + SET + last_run_time = next_run_time, next_run_time = $1 + WHERE + id = $2 AND next_run_time = $3` + + SCHEDULE_DELETE_STATEMENT = ` + DELETE FROM schedules WHERE id = $1` ) type Config struct { @@ -331,6 +383,27 @@ func (w *PostgresStoreWorker) Execute(transactions []*t_aio.Transaction) ([][]*t } func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.Transaction) ([][]*t_aio.Result, error) { + // SCHEDULES + scheduleInsertStmt, err := tx.Prepare(SCHEDULE_INSERT_STATEMENT) + if err != nil { + return nil, err + } + defer scheduleInsertStmt.Close() + + scheduleUpdateStmt, err := tx.Prepare(SCHEDULE_UPDATE_STATEMENT) + if err != nil { + return nil, err + } + defer scheduleUpdateStmt.Close() + + scheduleDeleteStmt, err := tx.Prepare(SCHEDULE_DELETE_STATEMENT) + if err != nil { + return nil, err + } + defer scheduleDeleteStmt.Close() + + // PROMISES + promiseInsertStmt, err := tx.Prepare(PROMISE_INSERT_STATEMENT) if err != nil { return nil, err @@ -484,6 +557,23 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. util.Assert(command.TimeoutCreateNotifications != nil, "command must not be nil") results[i][j], err = w.timeoutCreateNotifications(tx, notificationInsertTimeoutStmt, command.TimeoutCreateNotifications) + // Schedule + case t_aio.ReadSchedule: + util.Assert(command.ReadSchedule != nil, "command must not be nil") + results[i][j], err = w.readSchedule(tx, command.ReadSchedule) + case t_aio.ReadSchedules: + util.Assert(command.ReadSchedules != nil, "command must not be nil") + results[i][j], err = w.readSchedules(tx, command.ReadSchedules) + case t_aio.CreateSchedule: + util.Assert(command.CreateSchedule != nil, "command must not be nil") + results[i][j], err = w.createSchedule(tx, scheduleInsertStmt, command.CreateSchedule) + case t_aio.UpdateSchedule: + util.Assert(command.UpdateSchedule != nil, "command must not be nil") + results[i][j], err = w.updateSchedule(tx, scheduleUpdateStmt, command.UpdateSchedule) + case t_aio.DeleteSchedule: + util.Assert(command.DeleteSchedule != nil, "command must not be nil") + results[i][j], err = w.deleteSchedule(tx, scheduleDeleteStmt, command.DeleteSchedule) + default: panic("invalid command") } @@ -497,6 +587,152 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. return results, nil } +// SCHEDULES + +func (w *PostgresStoreWorker) readSchedule(tx *sql.Tx, cmd *t_aio.ReadScheduleCommand) (*t_aio.Result, error) { + row := tx.QueryRow(SCHEDULE_SELECT_STATEMENT, cmd.Id) + record := &schedule.ScheduleRecord{} + rowsReturned := int64(1) + + if err := row.Scan( + &record.Id, + &record.Desc, + &record.Cron, + &record.PromiseId, + &record.PromiseParamHeaders, + &record.PromiseParamData, + &record.PromiseTimeout, + &record.LastRunTime, + &record.NextRunTime, + &record.CreatedOn, + &record.IdempotencyKey, + ); err != nil { + if err == sql.ErrNoRows { + rowsReturned = 0 + } else { + return nil, err + } + } + + var records []*schedule.ScheduleRecord + if rowsReturned == 1 { + records = append(records, record) + } + + return &t_aio.Result{ + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: rowsReturned, + Records: records, + }, + }, nil +} + +func (w *PostgresStoreWorker) readSchedules(tx *sql.Tx, cmd *t_aio.ReadSchedulesCommand) (*t_aio.Result, error) { + rows, err := tx.Query(SCHEDULE_SELECT_ALL_STATEMENT, cmd.NextRunTime) + if err != nil { + return nil, err + } + defer rows.Close() + + rowsReturned := int64(0) + var records []*schedule.ScheduleRecord + + for rows.Next() { + record := &schedule.ScheduleRecord{} + if err := rows.Scan( + &record.Id, + &record.Desc, + &record.Cron, + &record.PromiseId, + &record.PromiseParamHeaders, + &record.PromiseParamData, + &record.PromiseTimeout, + &record.LastRunTime, + &record.NextRunTime, + &record.CreatedOn, + &record.IdempotencyKey, + ); err != nil { + return nil, err + } + + records = append(records, record) + rowsReturned++ + } + + return &t_aio.Result{ + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.QuerySchedulesResult{ + RowsReturned: rowsReturned, + Records: records, + }, + }, nil +} + +func (w *PostgresStoreWorker) createSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreateScheduleCommand) (*t_aio.Result, error) { + headers, err := json.Marshal(cmd.PromiseParam.Headers) + if err != nil { + return nil, err + } + + res, err := stmt.Exec(cmd.Id, cmd.Desc, cmd.Cron, cmd.PromiseId, headers, cmd.PromiseParam.Data, cmd.PromiseTimeout, cmd.LastRunTime, cmd.NextRunTime, cmd.CreatedOn, cmd.IdempotencyKey) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +func (w *PostgresStoreWorker) updateSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.UpdateScheduleCommand) (*t_aio.Result, error) { + res, err := stmt.Exec(cmd.NextRunTime, cmd.Id, cmd.LastRunTime) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +func (w *PostgresStoreWorker) deleteSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.DeleteScheduleCommand) (*t_aio.Result, error) { + res, err := stmt.Exec(cmd.Id) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +// PROMISES + func (w *PostgresStoreWorker) readPromise(tx *sql.Tx, cmd *t_aio.ReadPromiseCommand) (*t_aio.Result, error) { // select row := tx.QueryRow(PROMISE_SELECT_STATEMENT, cmd.Id) diff --git a/internal/app/subsystems/aio/store/test/cases.go b/internal/app/subsystems/aio/store/test/cases.go index a3ed64ce..836ae0c8 100644 --- a/internal/app/subsystems/aio/store/test/cases.go +++ b/internal/app/subsystems/aio/store/test/cases.go @@ -6,8 +6,10 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_aio" + "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/notification" "github.com/resonatehq/resonate/pkg/promise" + "github.com/resonatehq/resonate/pkg/schedule" "github.com/resonatehq/resonate/pkg/subscription" "github.com/resonatehq/resonate/pkg/timeout" @@ -65,6 +67,10 @@ func (c *testCase) Run(t *testing.T, subsystem aio.Subsystem) { record.ValueHeaders = normalizeJSON(record.ValueHeaders) record.Tags = normalizeJSON(record.Tags) } + case t_aio.ReadSchedule: + for _, record := range result.ReadSchedule.Records { + record.PromiseParamHeaders = normalizeJSON(record.PromiseParamHeaders) + } } } @@ -78,6 +84,220 @@ func (c *testCase) Panic() bool { } var TestCases = []*testCase{ + { + name: "CreateUpdateDeleteSchedule", + commands: []*t_aio.Command{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.UpdateScheduleCommand{ + Id: "foo", + LastRunTime: util.ToPointer[int64](1000), + NextRunTime: 1500, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.ReadScheduleCommand{ + Id: "foo", + }, + }, + { + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.DeleteScheduleCommand{ + Id: "foo", + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.ReadScheduleCommand{ + Id: "foo", + }, + }, + }, + expected: []*t_aio.Result{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: 1, + Records: []*schedule.ScheduleRecord{{ + Id: "foo", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: util.ToPointer[int64](1000), + NextRunTime: 1500, + CreatedOn: 500, + IdempotencyKey: nil, + }}, + }, + }, + { + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: 0, + Records: nil, + }, + }, + }, + }, + { + name: "ReadSchedules", + commands: []*t_aio.Command{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-1", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-2", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 2000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-3", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 3000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.ReadSchedulesCommand{ + NextRunTime: 2500, + }, + }, + }, + expected: []*t_aio.Result{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.QuerySchedulesResult{ + RowsReturned: 2, + Records: []*schedule.ScheduleRecord{ + { + Id: "foo-1", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + { + Id: "foo-2", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 2000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + }, + }, + }, + }, { name: "CreatePromise", commands: []*t_aio.Command{ From d34652c241be23b0584b8352f4d73c5d9bf44568 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Tue, 19 Dec 2023 20:17:26 -0500 Subject: [PATCH 2/2] feat(postgres): addressed feedback from David --- .../subsystems/aio/store/postgres/postgres.go | 395 ++++++++-------- .../app/subsystems/aio/store/sqlite/sqlite.go | 8 +- .../app/subsystems/aio/store/test/cases.go | 441 +++++++++--------- 3 files changed, 433 insertions(+), 411 deletions(-) diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 1303d043..f255fb04 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -145,6 +145,40 @@ const ( WHERE state = 1 AND timeout <= $1` + SCHEDULE_SELECT_STATEMENT = ` + SELECT + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + FROM + schedules + WHERE + id = $1` + + SCHEDULE_SELECT_ALL_STATEMENT = ` + SELECT + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + FROM + schedules + WHERE + next_run_time <= $1` + + SCHEDULE_INSERT_STATEMENT = ` + INSERT INTO schedules + (id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key) + VALUES + ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT(id) DO NOTHING` + + SCHEDULE_UPDATE_STATEMENT = ` + UPDATE + schedules + SET + last_run_time = next_run_time, next_run_time = $1 + WHERE + id = $2 AND next_run_time = $3` + + SCHEDULE_DELETE_STATEMENT = ` + DELETE FROM schedules WHERE id = $1` + TIMEOUT_SELECT_STATEMENT = ` SELECT id, time @@ -242,40 +276,6 @@ const ( NOTIFICATION_DELETE_STATEMENT = ` DELETE FROM notifications WHERE id = $1 AND promise_id = $2` - - SCHEDULE_SELECT_STATEMENT = ` - SELECT - id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key - FROM - schedules - WHERE - id = $1` - - SCHEDULE_SELECT_ALL_STATEMENT = ` - SELECT - id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key - FROM - schedules - WHERE - next_run_time <= $1` - - SCHEDULE_INSERT_STATEMENT = ` - INSERT INTO schedules - (id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key) - VALUES - ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - ON CONFLICT(id) DO NOTHING` - - SCHEDULE_UPDATE_STATEMENT = ` - UPDATE - schedules - SET - last_run_time = next_run_time, next_run_time = $1 - WHERE - id = $2 AND next_run_time = $3` - - SCHEDULE_DELETE_STATEMENT = ` - DELETE FROM schedules WHERE id = $1` ) type Config struct { @@ -383,44 +383,47 @@ func (w *PostgresStoreWorker) Execute(transactions []*t_aio.Transaction) ([][]*t } func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.Transaction) ([][]*t_aio.Result, error) { - // SCHEDULES - scheduleInsertStmt, err := tx.Prepare(SCHEDULE_INSERT_STATEMENT) + // PROMISES + + promiseInsertStmt, err := tx.Prepare(PROMISE_INSERT_STATEMENT) if err != nil { return nil, err } - defer scheduleInsertStmt.Close() + defer promiseInsertStmt.Close() - scheduleUpdateStmt, err := tx.Prepare(SCHEDULE_UPDATE_STATEMENT) + promiseUpdateStmt, err := tx.Prepare(PROMISE_UPDATE_STATEMENT) if err != nil { return nil, err } - defer scheduleUpdateStmt.Close() + defer promiseUpdateStmt.Close() - scheduleDeleteStmt, err := tx.Prepare(SCHEDULE_DELETE_STATEMENT) + promiseUpdateTimeoutStmt, err := tx.Prepare(PROMISE_UPDATE_TIMEOUT_STATEMENT) if err != nil { return nil, err } - defer scheduleDeleteStmt.Close() + defer promiseUpdateTimeoutStmt.Close() - // PROMISES + // SCHEDULES - promiseInsertStmt, err := tx.Prepare(PROMISE_INSERT_STATEMENT) + scheduleInsertStmt, err := tx.Prepare(SCHEDULE_INSERT_STATEMENT) if err != nil { return nil, err } - defer promiseInsertStmt.Close() + defer scheduleInsertStmt.Close() - promiseUpdateStmt, err := tx.Prepare(PROMISE_UPDATE_STATEMENT) + scheduleUpdateStmt, err := tx.Prepare(SCHEDULE_UPDATE_STATEMENT) if err != nil { return nil, err } - defer promiseUpdateStmt.Close() + defer scheduleUpdateStmt.Close() - promiseUpdateTimeoutStmt, err := tx.Prepare(PROMISE_UPDATE_TIMEOUT_STATEMENT) + scheduleDeleteStmt, err := tx.Prepare(SCHEDULE_DELETE_STATEMENT) if err != nil { return nil, err } - defer promiseUpdateTimeoutStmt.Close() + defer scheduleDeleteStmt.Close() + + // TIMEOUTS timeoutInsertStmt, err := tx.Prepare(TIMEOUT_INSERT_STATEMENT) if err != nil { @@ -434,6 +437,8 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. } defer timeoutDeleteStmt.Close() + // SUBSCRIPTIONS + subscriptionInsertStmt, err := tx.Prepare(SUBSCRIPTION_INSERT_STATEMENT) if err != nil { return nil, err @@ -458,6 +463,8 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. } defer subscriptionDeleteAllTimeoutStmt.Close() + // NOTIFICATIONS + notificationInsertStmt, err := tx.Prepare(NOTIFICATION_INSERT_STATEMENT) if err != nil { return nil, err @@ -557,7 +564,7 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. util.Assert(command.TimeoutCreateNotifications != nil, "command must not be nil") results[i][j], err = w.timeoutCreateNotifications(tx, notificationInsertTimeoutStmt, command.TimeoutCreateNotifications) - // Schedule + // Schedule case t_aio.ReadSchedule: util.Assert(command.ReadSchedule != nil, "command must not be nil") results[i][j], err = w.readSchedule(tx, command.ReadSchedule) @@ -587,150 +594,6 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. return results, nil } -// SCHEDULES - -func (w *PostgresStoreWorker) readSchedule(tx *sql.Tx, cmd *t_aio.ReadScheduleCommand) (*t_aio.Result, error) { - row := tx.QueryRow(SCHEDULE_SELECT_STATEMENT, cmd.Id) - record := &schedule.ScheduleRecord{} - rowsReturned := int64(1) - - if err := row.Scan( - &record.Id, - &record.Desc, - &record.Cron, - &record.PromiseId, - &record.PromiseParamHeaders, - &record.PromiseParamData, - &record.PromiseTimeout, - &record.LastRunTime, - &record.NextRunTime, - &record.CreatedOn, - &record.IdempotencyKey, - ); err != nil { - if err == sql.ErrNoRows { - rowsReturned = 0 - } else { - return nil, err - } - } - - var records []*schedule.ScheduleRecord - if rowsReturned == 1 { - records = append(records, record) - } - - return &t_aio.Result{ - Kind: t_aio.ReadSchedule, - ReadSchedule: &t_aio.QuerySchedulesResult{ - RowsReturned: rowsReturned, - Records: records, - }, - }, nil -} - -func (w *PostgresStoreWorker) readSchedules(tx *sql.Tx, cmd *t_aio.ReadSchedulesCommand) (*t_aio.Result, error) { - rows, err := tx.Query(SCHEDULE_SELECT_ALL_STATEMENT, cmd.NextRunTime) - if err != nil { - return nil, err - } - defer rows.Close() - - rowsReturned := int64(0) - var records []*schedule.ScheduleRecord - - for rows.Next() { - record := &schedule.ScheduleRecord{} - if err := rows.Scan( - &record.Id, - &record.Desc, - &record.Cron, - &record.PromiseId, - &record.PromiseParamHeaders, - &record.PromiseParamData, - &record.PromiseTimeout, - &record.LastRunTime, - &record.NextRunTime, - &record.CreatedOn, - &record.IdempotencyKey, - ); err != nil { - return nil, err - } - - records = append(records, record) - rowsReturned++ - } - - return &t_aio.Result{ - Kind: t_aio.ReadSchedules, - ReadSchedules: &t_aio.QuerySchedulesResult{ - RowsReturned: rowsReturned, - Records: records, - }, - }, nil -} - -func (w *PostgresStoreWorker) createSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreateScheduleCommand) (*t_aio.Result, error) { - headers, err := json.Marshal(cmd.PromiseParam.Headers) - if err != nil { - return nil, err - } - - res, err := stmt.Exec(cmd.Id, cmd.Desc, cmd.Cron, cmd.PromiseId, headers, cmd.PromiseParam.Data, cmd.PromiseTimeout, cmd.LastRunTime, cmd.NextRunTime, cmd.CreatedOn, cmd.IdempotencyKey) - if err != nil { - return nil, err - } - - rowsAffected, err := res.RowsAffected() - if err != nil { - return nil, err - } - - return &t_aio.Result{ - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: rowsAffected, - }, - }, nil -} - -func (w *PostgresStoreWorker) updateSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.UpdateScheduleCommand) (*t_aio.Result, error) { - res, err := stmt.Exec(cmd.NextRunTime, cmd.Id, cmd.LastRunTime) - if err != nil { - return nil, err - } - - rowsAffected, err := res.RowsAffected() - if err != nil { - return nil, err - } - - return &t_aio.Result{ - Kind: t_aio.UpdateSchedule, - UpdateSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: rowsAffected, - }, - }, nil -} - -func (w *PostgresStoreWorker) deleteSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.DeleteScheduleCommand) (*t_aio.Result, error) { - res, err := stmt.Exec(cmd.Id) - if err != nil { - return nil, err - } - - rowsAffected, err := res.RowsAffected() - if err != nil { - return nil, err - } - - return &t_aio.Result{ - Kind: t_aio.DeleteSchedule, - DeleteSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: rowsAffected, - }, - }, nil -} - // PROMISES func (w *PostgresStoreWorker) readPromise(tx *sql.Tx, cmd *t_aio.ReadPromiseCommand) (*t_aio.Result, error) { @@ -938,6 +801,152 @@ func (w *PostgresStoreWorker) timeoutPromises(tx *sql.Tx, stmt *sql.Stmt, cmd *t }, nil } +// SCHEDULES + +func (w *PostgresStoreWorker) readSchedule(tx *sql.Tx, cmd *t_aio.ReadScheduleCommand) (*t_aio.Result, error) { + row := tx.QueryRow(SCHEDULE_SELECT_STATEMENT, cmd.Id) + record := &schedule.ScheduleRecord{} + rowsReturned := int64(1) + + if err := row.Scan( + &record.Id, + &record.Desc, + &record.Cron, + &record.PromiseId, + &record.PromiseParamHeaders, + &record.PromiseParamData, + &record.PromiseTimeout, + &record.LastRunTime, + &record.NextRunTime, + &record.CreatedOn, + &record.IdempotencyKey, + ); err != nil { + if err == sql.ErrNoRows { + rowsReturned = 0 + } else { + return nil, err + } + } + + var records []*schedule.ScheduleRecord + if rowsReturned == 1 { + records = append(records, record) + } + + return &t_aio.Result{ + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: rowsReturned, + Records: records, + }, + }, nil +} + +func (w *PostgresStoreWorker) readSchedules(tx *sql.Tx, cmd *t_aio.ReadSchedulesCommand) (*t_aio.Result, error) { + rows, err := tx.Query(SCHEDULE_SELECT_ALL_STATEMENT, cmd.NextRunTime) + if err != nil { + return nil, err + } + defer rows.Close() + + rowsReturned := int64(0) + var records []*schedule.ScheduleRecord + + for rows.Next() { + record := &schedule.ScheduleRecord{} + if err := rows.Scan( + &record.Id, + &record.Desc, + &record.Cron, + &record.PromiseId, + &record.PromiseParamHeaders, + &record.PromiseParamData, + &record.PromiseTimeout, + &record.LastRunTime, + &record.NextRunTime, + &record.CreatedOn, + &record.IdempotencyKey, + ); err != nil { + return nil, err + } + + records = append(records, record) + rowsReturned++ + } + + return &t_aio.Result{ + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.QuerySchedulesResult{ + RowsReturned: rowsReturned, + Records: records, + }, + }, nil +} + +func (w *PostgresStoreWorker) createSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreateScheduleCommand) (*t_aio.Result, error) { + headers, err := json.Marshal(cmd.PromiseParam.Headers) + if err != nil { + return nil, err + } + + res, err := stmt.Exec(cmd.Id, cmd.Desc, cmd.Cron, cmd.PromiseId, headers, cmd.PromiseParam.Data, cmd.PromiseTimeout, cmd.LastRunTime, cmd.NextRunTime, cmd.CreatedOn, cmd.IdempotencyKey) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +func (w *PostgresStoreWorker) updateSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.UpdateScheduleCommand) (*t_aio.Result, error) { + res, err := stmt.Exec(cmd.NextRunTime, cmd.Id, cmd.LastRunTime) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +func (w *PostgresStoreWorker) deleteSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.DeleteScheduleCommand) (*t_aio.Result, error) { + res, err := stmt.Exec(cmd.Id) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +// TIMEOUTS + func (w *PostgresStoreWorker) readTimeouts(tx *sql.Tx, cmd *t_aio.ReadTimeoutsCommand) (*t_aio.Result, error) { // select rows, err := tx.Query(TIMEOUT_SELECT_STATEMENT, cmd.N) diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 7d42b449..82073333 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -48,7 +48,7 @@ const ( CREATE TABLE IF NOT EXISTS schedules ( id TEXT PRIMARY KEY, - desc TEXT, + description TEXT, cron TEXT, promise_id TEXT, promise_param_headers BLOB, @@ -139,7 +139,7 @@ const ( SCHEDULE_SELECT_STATEMENT = ` SELECT - id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key FROM schedules WHERE @@ -147,7 +147,7 @@ const ( SCHEDULE_SELECT_ALL_STATEMENT = ` SELECT - id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key FROM schedules WHERE @@ -155,7 +155,7 @@ const ( SCHEDULE_INSERT_STATEMENT = ` INSERT INTO schedules - (id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key) + (id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO NOTHING` diff --git a/internal/app/subsystems/aio/store/test/cases.go b/internal/app/subsystems/aio/store/test/cases.go index 836ae0c8..1cb4de6d 100644 --- a/internal/app/subsystems/aio/store/test/cases.go +++ b/internal/app/subsystems/aio/store/test/cases.go @@ -71,6 +71,10 @@ func (c *testCase) Run(t *testing.T, subsystem aio.Subsystem) { for _, record := range result.ReadSchedule.Records { record.PromiseParamHeaders = normalizeJSON(record.PromiseParamHeaders) } + case t_aio.ReadSchedules: + for _, record := range result.ReadSchedules.Records { + record.PromiseParamHeaders = normalizeJSON(record.PromiseParamHeaders) + } } } @@ -84,220 +88,9 @@ func (c *testCase) Panic() bool { } var TestCases = []*testCase{ - { - name: "CreateUpdateDeleteSchedule", - commands: []*t_aio.Command{ - { - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.CreateScheduleCommand{ - Id: "foo", - Desc: "this is a test schedule", - Cron: "* * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseParam: promise.Value{ - Headers: map[string]string{}, - Data: []byte("Created Durable Promise"), - }, - PromiseTimeout: 1000000, - LastRunTime: nil, - NextRunTime: 1000, - CreatedOn: 500, - IdempotencyKey: nil, - }, - }, - { - Kind: t_aio.UpdateSchedule, - UpdateSchedule: &t_aio.UpdateScheduleCommand{ - Id: "foo", - LastRunTime: util.ToPointer[int64](1000), - NextRunTime: 1500, - }, - }, - { - Kind: t_aio.ReadSchedule, - ReadSchedule: &t_aio.ReadScheduleCommand{ - Id: "foo", - }, - }, - { - Kind: t_aio.DeleteSchedule, - DeleteSchedule: &t_aio.DeleteScheduleCommand{ - Id: "foo", - }, - }, - { - Kind: t_aio.ReadSchedule, - ReadSchedule: &t_aio.ReadScheduleCommand{ - Id: "foo", - }, - }, - }, - expected: []*t_aio.Result{ - { - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: 1, - }, - }, - { - Kind: t_aio.UpdateSchedule, - UpdateSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: 1, - }, - }, - { - Kind: t_aio.ReadSchedule, - ReadSchedule: &t_aio.QuerySchedulesResult{ - RowsReturned: 1, - Records: []*schedule.ScheduleRecord{{ - Id: "foo", - Desc: "this is a test schedule", - Cron: "* * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseParamHeaders: []byte("{}"), - PromiseParamData: []byte("Created Durable Promise"), - PromiseTimeout: 1000000, - LastRunTime: util.ToPointer[int64](1000), - NextRunTime: 1500, - CreatedOn: 500, - IdempotencyKey: nil, - }}, - }, - }, - { - Kind: t_aio.DeleteSchedule, - DeleteSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: 1, - }, - }, - { - Kind: t_aio.ReadSchedule, - ReadSchedule: &t_aio.QuerySchedulesResult{ - RowsReturned: 0, - Records: nil, - }, - }, - }, - }, - { - name: "ReadSchedules", - commands: []*t_aio.Command{ - { - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.CreateScheduleCommand{ - Id: "foo-1", - Desc: "this is a test schedule", - Cron: "* * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseParam: promise.Value{ - Headers: map[string]string{}, - Data: []byte("Created Durable Promise"), - }, - PromiseTimeout: 1000000, - LastRunTime: nil, - NextRunTime: 1000, - CreatedOn: 500, - IdempotencyKey: nil, - }, - }, - { - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.CreateScheduleCommand{ - Id: "foo-2", - Desc: "this is a test schedule", - Cron: "* * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseParam: promise.Value{ - Headers: map[string]string{}, - Data: []byte("Created Durable Promise"), - }, - PromiseTimeout: 1000000, - LastRunTime: nil, - NextRunTime: 2000, - CreatedOn: 500, - IdempotencyKey: nil, - }, - }, - { - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.CreateScheduleCommand{ - Id: "foo-3", - Desc: "this is a test schedule", - Cron: "* * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseParam: promise.Value{ - Headers: map[string]string{}, - Data: []byte("Created Durable Promise"), - }, - PromiseTimeout: 1000000, - LastRunTime: nil, - NextRunTime: 3000, - CreatedOn: 500, - IdempotencyKey: nil, - }, - }, - { - Kind: t_aio.ReadSchedules, - ReadSchedules: &t_aio.ReadSchedulesCommand{ - NextRunTime: 2500, - }, - }, - }, - expected: []*t_aio.Result{ - { - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: 1, - }, - }, - { - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: 1, - }, - }, - { - Kind: t_aio.CreateSchedule, - CreateSchedule: &t_aio.AlterSchedulesResult{ - RowsAffected: 1, - }, - }, - { - Kind: t_aio.ReadSchedules, - ReadSchedules: &t_aio.QuerySchedulesResult{ - RowsReturned: 2, - Records: []*schedule.ScheduleRecord{ - { - Id: "foo-1", - Desc: "this is a test schedule", - Cron: "* * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseParamHeaders: []byte("{}"), - PromiseParamData: []byte("Created Durable Promise"), - PromiseTimeout: 1000000, - LastRunTime: nil, - NextRunTime: 1000, - CreatedOn: 500, - IdempotencyKey: nil, - }, - { - Id: "foo-2", - Desc: "this is a test schedule", - Cron: "* * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseParamHeaders: []byte("{}"), - PromiseParamData: []byte("Created Durable Promise"), - PromiseTimeout: 1000000, - LastRunTime: nil, - NextRunTime: 2000, - CreatedOn: 500, - IdempotencyKey: nil, - }, - }, - }, - }, - }, - }, + + // PROMISES + { name: "CreatePromise", commands: []*t_aio.Command{ @@ -2176,6 +1969,226 @@ var TestCases = []*testCase{ }, }, }, + + // SCHEDULES + + { + name: "CreateUpdateDeleteSchedule", + commands: []*t_aio.Command{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.UpdateScheduleCommand{ + Id: "foo", + LastRunTime: util.ToPointer[int64](1000), + NextRunTime: 1500, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.ReadScheduleCommand{ + Id: "foo", + }, + }, + { + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.DeleteScheduleCommand{ + Id: "foo", + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.ReadScheduleCommand{ + Id: "foo", + }, + }, + }, + expected: []*t_aio.Result{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: 1, + Records: []*schedule.ScheduleRecord{{ + Id: "foo", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: util.ToPointer[int64](1000), + NextRunTime: 1500, + CreatedOn: 500, + IdempotencyKey: nil, + }}, + }, + }, + { + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: 0, + Records: nil, + }, + }, + }, + }, + { + name: "ReadSchedules", + commands: []*t_aio.Command{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-1", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-2", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 2000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-3", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 3000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.ReadSchedulesCommand{ + NextRunTime: 2500, + }, + }, + }, + expected: []*t_aio.Result{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.QuerySchedulesResult{ + RowsReturned: 2, + Records: []*schedule.ScheduleRecord{ + { + Id: "foo-1", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + { + Id: "foo-2", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 2000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + }, + }, + }, + }, + + // TIMEOUTS + { name: "CreateTimeout", commands: []*t_aio.Command{