diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 0c6d30e1..f255fb04 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -17,6 +17,7 @@ import ( "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/notification" "github.com/resonatehq/resonate/pkg/promise" + "github.com/resonatehq/resonate/pkg/schedule" "github.com/resonatehq/resonate/pkg/subscription" "github.com/resonatehq/resonate/pkg/timeout" @@ -46,6 +47,23 @@ const ( CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id); CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises(invocation); + CREATE TABLE IF NOT EXISTS schedules ( + id TEXT, + description TEXT, + cron TEXT, + promise_id TEXT, + promise_param_headers JSONB, + promise_param_data BYTEA, + promise_timeout BIGINT, + last_run_time BIGINT, + next_run_time BIGINT, + created_on BIGINT, + idempotency_key TEXT, + PRIMARY KEY(id) + ); + + CREATE INDEX IF NOT EXISTS idx_next_run_time ON schedules (next_run_time); + CREATE TABLE IF NOT EXISTS timeouts ( id TEXT, time BIGINT, @@ -127,6 +145,40 @@ const ( WHERE state = 1 AND timeout <= $1` + SCHEDULE_SELECT_STATEMENT = ` + SELECT + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + FROM + schedules + WHERE + id = $1` + + SCHEDULE_SELECT_ALL_STATEMENT = ` + SELECT + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + FROM + schedules + WHERE + next_run_time <= $1` + + SCHEDULE_INSERT_STATEMENT = ` + INSERT INTO schedules + (id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key) + VALUES + ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT(id) DO NOTHING` + + SCHEDULE_UPDATE_STATEMENT = ` + UPDATE + schedules + SET + last_run_time = next_run_time, next_run_time = $1 + WHERE + id = $2 AND next_run_time = $3` + + SCHEDULE_DELETE_STATEMENT = ` + DELETE FROM schedules WHERE id = $1` + TIMEOUT_SELECT_STATEMENT = ` SELECT id, time @@ -331,6 +383,8 @@ func (w *PostgresStoreWorker) Execute(transactions []*t_aio.Transaction) ([][]*t } func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.Transaction) ([][]*t_aio.Result, error) { + // PROMISES + promiseInsertStmt, err := tx.Prepare(PROMISE_INSERT_STATEMENT) if err != nil { return nil, err @@ -349,6 +403,28 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. } defer promiseUpdateTimeoutStmt.Close() + // SCHEDULES + + scheduleInsertStmt, err := tx.Prepare(SCHEDULE_INSERT_STATEMENT) + if err != nil { + return nil, err + } + defer scheduleInsertStmt.Close() + + scheduleUpdateStmt, err := tx.Prepare(SCHEDULE_UPDATE_STATEMENT) + if err != nil { + return nil, err + } + defer scheduleUpdateStmt.Close() + + scheduleDeleteStmt, err := tx.Prepare(SCHEDULE_DELETE_STATEMENT) + if err != nil { + return nil, err + } + defer scheduleDeleteStmt.Close() + + // TIMEOUTS + timeoutInsertStmt, err := tx.Prepare(TIMEOUT_INSERT_STATEMENT) if err != nil { return nil, err @@ -361,6 +437,8 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. } defer timeoutDeleteStmt.Close() + // SUBSCRIPTIONS + subscriptionInsertStmt, err := tx.Prepare(SUBSCRIPTION_INSERT_STATEMENT) if err != nil { return nil, err @@ -385,6 +463,8 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. } defer subscriptionDeleteAllTimeoutStmt.Close() + // NOTIFICATIONS + notificationInsertStmt, err := tx.Prepare(NOTIFICATION_INSERT_STATEMENT) if err != nil { return nil, err @@ -484,6 +564,23 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. util.Assert(command.TimeoutCreateNotifications != nil, "command must not be nil") results[i][j], err = w.timeoutCreateNotifications(tx, notificationInsertTimeoutStmt, command.TimeoutCreateNotifications) + // Schedule + case t_aio.ReadSchedule: + util.Assert(command.ReadSchedule != nil, "command must not be nil") + results[i][j], err = w.readSchedule(tx, command.ReadSchedule) + case t_aio.ReadSchedules: + util.Assert(command.ReadSchedules != nil, "command must not be nil") + results[i][j], err = w.readSchedules(tx, command.ReadSchedules) + case t_aio.CreateSchedule: + util.Assert(command.CreateSchedule != nil, "command must not be nil") + results[i][j], err = w.createSchedule(tx, scheduleInsertStmt, command.CreateSchedule) + case t_aio.UpdateSchedule: + util.Assert(command.UpdateSchedule != nil, "command must not be nil") + results[i][j], err = w.updateSchedule(tx, scheduleUpdateStmt, command.UpdateSchedule) + case t_aio.DeleteSchedule: + util.Assert(command.DeleteSchedule != nil, "command must not be nil") + results[i][j], err = w.deleteSchedule(tx, scheduleDeleteStmt, command.DeleteSchedule) + default: panic("invalid command") } @@ -497,6 +594,8 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. return results, nil } +// PROMISES + func (w *PostgresStoreWorker) readPromise(tx *sql.Tx, cmd *t_aio.ReadPromiseCommand) (*t_aio.Result, error) { // select row := tx.QueryRow(PROMISE_SELECT_STATEMENT, cmd.Id) @@ -702,6 +801,152 @@ func (w *PostgresStoreWorker) timeoutPromises(tx *sql.Tx, stmt *sql.Stmt, cmd *t }, nil } +// SCHEDULES + +func (w *PostgresStoreWorker) readSchedule(tx *sql.Tx, cmd *t_aio.ReadScheduleCommand) (*t_aio.Result, error) { + row := tx.QueryRow(SCHEDULE_SELECT_STATEMENT, cmd.Id) + record := &schedule.ScheduleRecord{} + rowsReturned := int64(1) + + if err := row.Scan( + &record.Id, + &record.Desc, + &record.Cron, + &record.PromiseId, + &record.PromiseParamHeaders, + &record.PromiseParamData, + &record.PromiseTimeout, + &record.LastRunTime, + &record.NextRunTime, + &record.CreatedOn, + &record.IdempotencyKey, + ); err != nil { + if err == sql.ErrNoRows { + rowsReturned = 0 + } else { + return nil, err + } + } + + var records []*schedule.ScheduleRecord + if rowsReturned == 1 { + records = append(records, record) + } + + return &t_aio.Result{ + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: rowsReturned, + Records: records, + }, + }, nil +} + +func (w *PostgresStoreWorker) readSchedules(tx *sql.Tx, cmd *t_aio.ReadSchedulesCommand) (*t_aio.Result, error) { + rows, err := tx.Query(SCHEDULE_SELECT_ALL_STATEMENT, cmd.NextRunTime) + if err != nil { + return nil, err + } + defer rows.Close() + + rowsReturned := int64(0) + var records []*schedule.ScheduleRecord + + for rows.Next() { + record := &schedule.ScheduleRecord{} + if err := rows.Scan( + &record.Id, + &record.Desc, + &record.Cron, + &record.PromiseId, + &record.PromiseParamHeaders, + &record.PromiseParamData, + &record.PromiseTimeout, + &record.LastRunTime, + &record.NextRunTime, + &record.CreatedOn, + &record.IdempotencyKey, + ); err != nil { + return nil, err + } + + records = append(records, record) + rowsReturned++ + } + + return &t_aio.Result{ + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.QuerySchedulesResult{ + RowsReturned: rowsReturned, + Records: records, + }, + }, nil +} + +func (w *PostgresStoreWorker) createSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreateScheduleCommand) (*t_aio.Result, error) { + headers, err := json.Marshal(cmd.PromiseParam.Headers) + if err != nil { + return nil, err + } + + res, err := stmt.Exec(cmd.Id, cmd.Desc, cmd.Cron, cmd.PromiseId, headers, cmd.PromiseParam.Data, cmd.PromiseTimeout, cmd.LastRunTime, cmd.NextRunTime, cmd.CreatedOn, cmd.IdempotencyKey) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +func (w *PostgresStoreWorker) updateSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.UpdateScheduleCommand) (*t_aio.Result, error) { + res, err := stmt.Exec(cmd.NextRunTime, cmd.Id, cmd.LastRunTime) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +func (w *PostgresStoreWorker) deleteSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.DeleteScheduleCommand) (*t_aio.Result, error) { + res, err := stmt.Exec(cmd.Id) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &t_aio.Result{ + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +// TIMEOUTS + func (w *PostgresStoreWorker) readTimeouts(tx *sql.Tx, cmd *t_aio.ReadTimeoutsCommand) (*t_aio.Result, error) { // select rows, err := tx.Query(TIMEOUT_SELECT_STATEMENT, cmd.N) diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 7d42b449..82073333 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -48,7 +48,7 @@ const ( CREATE TABLE IF NOT EXISTS schedules ( id TEXT PRIMARY KEY, - desc TEXT, + description TEXT, cron TEXT, promise_id TEXT, promise_param_headers BLOB, @@ -139,7 +139,7 @@ const ( SCHEDULE_SELECT_STATEMENT = ` SELECT - id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key FROM schedules WHERE @@ -147,7 +147,7 @@ const ( SCHEDULE_SELECT_ALL_STATEMENT = ` SELECT - id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key + id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key FROM schedules WHERE @@ -155,7 +155,7 @@ const ( SCHEDULE_INSERT_STATEMENT = ` INSERT INTO schedules - (id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key) + (id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO NOTHING` diff --git a/internal/app/subsystems/aio/store/test/cases.go b/internal/app/subsystems/aio/store/test/cases.go index a3ed64ce..1cb4de6d 100644 --- a/internal/app/subsystems/aio/store/test/cases.go +++ b/internal/app/subsystems/aio/store/test/cases.go @@ -6,8 +6,10 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_aio" + "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/notification" "github.com/resonatehq/resonate/pkg/promise" + "github.com/resonatehq/resonate/pkg/schedule" "github.com/resonatehq/resonate/pkg/subscription" "github.com/resonatehq/resonate/pkg/timeout" @@ -65,6 +67,14 @@ func (c *testCase) Run(t *testing.T, subsystem aio.Subsystem) { record.ValueHeaders = normalizeJSON(record.ValueHeaders) record.Tags = normalizeJSON(record.Tags) } + case t_aio.ReadSchedule: + for _, record := range result.ReadSchedule.Records { + record.PromiseParamHeaders = normalizeJSON(record.PromiseParamHeaders) + } + case t_aio.ReadSchedules: + for _, record := range result.ReadSchedules.Records { + record.PromiseParamHeaders = normalizeJSON(record.PromiseParamHeaders) + } } } @@ -78,6 +88,9 @@ func (c *testCase) Panic() bool { } var TestCases = []*testCase{ + + // PROMISES + { name: "CreatePromise", commands: []*t_aio.Command{ @@ -1956,6 +1969,226 @@ var TestCases = []*testCase{ }, }, }, + + // SCHEDULES + + { + name: "CreateUpdateDeleteSchedule", + commands: []*t_aio.Command{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.UpdateScheduleCommand{ + Id: "foo", + LastRunTime: util.ToPointer[int64](1000), + NextRunTime: 1500, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.ReadScheduleCommand{ + Id: "foo", + }, + }, + { + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.DeleteScheduleCommand{ + Id: "foo", + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.ReadScheduleCommand{ + Id: "foo", + }, + }, + }, + expected: []*t_aio.Result{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.UpdateSchedule, + UpdateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: 1, + Records: []*schedule.ScheduleRecord{{ + Id: "foo", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: util.ToPointer[int64](1000), + NextRunTime: 1500, + CreatedOn: 500, + IdempotencyKey: nil, + }}, + }, + }, + { + Kind: t_aio.DeleteSchedule, + DeleteSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedule, + ReadSchedule: &t_aio.QuerySchedulesResult{ + RowsReturned: 0, + Records: nil, + }, + }, + }, + }, + { + name: "ReadSchedules", + commands: []*t_aio.Command{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-1", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-2", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 2000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.CreateScheduleCommand{ + Id: "foo-3", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParam: promise.Value{ + Headers: map[string]string{}, + Data: []byte("Created Durable Promise"), + }, + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 3000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + { + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.ReadSchedulesCommand{ + NextRunTime: 2500, + }, + }, + }, + expected: []*t_aio.Result{ + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.CreateSchedule, + CreateSchedule: &t_aio.AlterSchedulesResult{ + RowsAffected: 1, + }, + }, + { + Kind: t_aio.ReadSchedules, + ReadSchedules: &t_aio.QuerySchedulesResult{ + RowsReturned: 2, + Records: []*schedule.ScheduleRecord{ + { + Id: "foo-1", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 1000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + { + Id: "foo-2", + Desc: "this is a test schedule", + Cron: "* * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseParamHeaders: []byte("{}"), + PromiseParamData: []byte("Created Durable Promise"), + PromiseTimeout: 1000000, + LastRunTime: nil, + NextRunTime: 2000, + CreatedOn: 500, + IdempotencyKey: nil, + }, + }, + }, + }, + }, + }, + + // TIMEOUTS + { name: "CreateTimeout", commands: []*t_aio.Command{