Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(postgres): added postgres support for schedules #162

Merged
merged 2 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/notification"
"github.com/resonatehq/resonate/pkg/promise"
"github.com/resonatehq/resonate/pkg/schedule"
"github.com/resonatehq/resonate/pkg/subscription"
"github.com/resonatehq/resonate/pkg/timeout"

Expand Down Expand Up @@ -46,6 +47,23 @@
CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id);
CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises(invocation);

CREATE TABLE IF NOT EXISTS schedules (
id TEXT,
description TEXT,
guergabo marked this conversation as resolved.
Show resolved Hide resolved
cron TEXT,
promise_id TEXT,
promise_param_headers JSONB,
promise_param_data BYTEA,
promise_timeout BIGINT,
last_run_time BIGINT,
next_run_time BIGINT,
created_on BIGINT,
idempotency_key TEXT,
PRIMARY KEY(id)
);

CREATE INDEX IF NOT EXISTS idx_next_run_time ON schedules (next_run_time);

CREATE TABLE IF NOT EXISTS timeouts (
id TEXT,
time BIGINT,
Expand Down Expand Up @@ -127,6 +145,40 @@
WHERE
state = 1 AND timeout <= $1`

SCHEDULE_SELECT_STATEMENT = `
SELECT
id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key
FROM
schedules
WHERE
id = $1`

SCHEDULE_SELECT_ALL_STATEMENT = `
SELECT
id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key
FROM
schedules
WHERE
next_run_time <= $1`

SCHEDULE_INSERT_STATEMENT = `
INSERT INTO schedules
(id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT(id) DO NOTHING`

SCHEDULE_UPDATE_STATEMENT = `
UPDATE
schedules
SET
last_run_time = next_run_time, next_run_time = $1
WHERE
id = $2 AND next_run_time = $3`

SCHEDULE_DELETE_STATEMENT = `
DELETE FROM schedules WHERE id = $1`

TIMEOUT_SELECT_STATEMENT = `
SELECT
id, time
Expand Down Expand Up @@ -331,6 +383,8 @@
}

func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.Transaction) ([][]*t_aio.Result, error) {
// PROMISES

promiseInsertStmt, err := tx.Prepare(PROMISE_INSERT_STATEMENT)
if err != nil {
return nil, err
Expand All @@ -349,6 +403,28 @@
}
defer promiseUpdateTimeoutStmt.Close()

// SCHEDULES

scheduleInsertStmt, err := tx.Prepare(SCHEDULE_INSERT_STATEMENT)
if err != nil {
return nil, err
}

Check warning on line 411 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L410-L411

Added lines #L410 - L411 were not covered by tests
defer scheduleInsertStmt.Close()

scheduleUpdateStmt, err := tx.Prepare(SCHEDULE_UPDATE_STATEMENT)
if err != nil {
return nil, err
}

Check warning on line 417 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L416-L417

Added lines #L416 - L417 were not covered by tests
defer scheduleUpdateStmt.Close()

scheduleDeleteStmt, err := tx.Prepare(SCHEDULE_DELETE_STATEMENT)
if err != nil {
return nil, err
}

Check warning on line 423 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L422-L423

Added lines #L422 - L423 were not covered by tests
defer scheduleDeleteStmt.Close()

// TIMEOUTS

timeoutInsertStmt, err := tx.Prepare(TIMEOUT_INSERT_STATEMENT)
if err != nil {
return nil, err
Expand All @@ -361,6 +437,8 @@
}
defer timeoutDeleteStmt.Close()

// SUBSCRIPTIONS

subscriptionInsertStmt, err := tx.Prepare(SUBSCRIPTION_INSERT_STATEMENT)
if err != nil {
return nil, err
Expand All @@ -385,6 +463,8 @@
}
defer subscriptionDeleteAllTimeoutStmt.Close()

// NOTIFICATIONS

notificationInsertStmt, err := tx.Prepare(NOTIFICATION_INSERT_STATEMENT)
if err != nil {
return nil, err
Expand Down Expand Up @@ -484,6 +564,23 @@
util.Assert(command.TimeoutCreateNotifications != nil, "command must not be nil")
results[i][j], err = w.timeoutCreateNotifications(tx, notificationInsertTimeoutStmt, command.TimeoutCreateNotifications)

// Schedule
case t_aio.ReadSchedule:
util.Assert(command.ReadSchedule != nil, "command must not be nil")
results[i][j], err = w.readSchedule(tx, command.ReadSchedule)
case t_aio.ReadSchedules:
util.Assert(command.ReadSchedules != nil, "command must not be nil")
results[i][j], err = w.readSchedules(tx, command.ReadSchedules)
case t_aio.CreateSchedule:
util.Assert(command.CreateSchedule != nil, "command must not be nil")
results[i][j], err = w.createSchedule(tx, scheduleInsertStmt, command.CreateSchedule)
case t_aio.UpdateSchedule:
util.Assert(command.UpdateSchedule != nil, "command must not be nil")
results[i][j], err = w.updateSchedule(tx, scheduleUpdateStmt, command.UpdateSchedule)
case t_aio.DeleteSchedule:
util.Assert(command.DeleteSchedule != nil, "command must not be nil")
results[i][j], err = w.deleteSchedule(tx, scheduleDeleteStmt, command.DeleteSchedule)

default:
panic("invalid command")
}
Expand All @@ -497,6 +594,8 @@
return results, nil
}

// PROMISES

func (w *PostgresStoreWorker) readPromise(tx *sql.Tx, cmd *t_aio.ReadPromiseCommand) (*t_aio.Result, error) {
// select
row := tx.QueryRow(PROMISE_SELECT_STATEMENT, cmd.Id)
Expand Down Expand Up @@ -702,6 +801,152 @@
}, nil
}

// SCHEDULES

func (w *PostgresStoreWorker) readSchedule(tx *sql.Tx, cmd *t_aio.ReadScheduleCommand) (*t_aio.Result, error) {
row := tx.QueryRow(SCHEDULE_SELECT_STATEMENT, cmd.Id)
record := &schedule.ScheduleRecord{}
rowsReturned := int64(1)

if err := row.Scan(
&record.Id,
&record.Desc,
&record.Cron,
&record.PromiseId,
&record.PromiseParamHeaders,
&record.PromiseParamData,
&record.PromiseTimeout,
&record.LastRunTime,
&record.NextRunTime,
&record.CreatedOn,
&record.IdempotencyKey,
); err != nil {
if err == sql.ErrNoRows {
rowsReturned = 0
} else {
return nil, err
}

Check warning on line 828 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L827-L828

Added lines #L827 - L828 were not covered by tests
}

var records []*schedule.ScheduleRecord
if rowsReturned == 1 {
records = append(records, record)
}

return &t_aio.Result{
Kind: t_aio.ReadSchedule,
ReadSchedule: &t_aio.QuerySchedulesResult{
RowsReturned: rowsReturned,
Records: records,
},
}, nil
}

func (w *PostgresStoreWorker) readSchedules(tx *sql.Tx, cmd *t_aio.ReadSchedulesCommand) (*t_aio.Result, error) {
rows, err := tx.Query(SCHEDULE_SELECT_ALL_STATEMENT, cmd.NextRunTime)
if err != nil {
return nil, err
}

Check warning on line 849 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L848-L849

Added lines #L848 - L849 were not covered by tests
defer rows.Close()

rowsReturned := int64(0)
var records []*schedule.ScheduleRecord

for rows.Next() {
record := &schedule.ScheduleRecord{}
if err := rows.Scan(
&record.Id,
&record.Desc,
&record.Cron,
&record.PromiseId,
&record.PromiseParamHeaders,
&record.PromiseParamData,
&record.PromiseTimeout,
&record.LastRunTime,
&record.NextRunTime,
&record.CreatedOn,
&record.IdempotencyKey,
); err != nil {
return nil, err
}

Check warning on line 871 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L870-L871

Added lines #L870 - L871 were not covered by tests

records = append(records, record)
rowsReturned++
}

return &t_aio.Result{
Kind: t_aio.ReadSchedules,
ReadSchedules: &t_aio.QuerySchedulesResult{
RowsReturned: rowsReturned,
Records: records,
},
}, nil
}

func (w *PostgresStoreWorker) createSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreateScheduleCommand) (*t_aio.Result, error) {
headers, err := json.Marshal(cmd.PromiseParam.Headers)
if err != nil {
return nil, err
}

Check warning on line 890 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L889-L890

Added lines #L889 - L890 were not covered by tests

res, err := stmt.Exec(cmd.Id, cmd.Desc, cmd.Cron, cmd.PromiseId, headers, cmd.PromiseParam.Data, cmd.PromiseTimeout, cmd.LastRunTime, cmd.NextRunTime, cmd.CreatedOn, cmd.IdempotencyKey)
if err != nil {
return nil, err
}

Check warning on line 895 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L894-L895

Added lines #L894 - L895 were not covered by tests

rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, err
}

Check warning on line 900 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L899-L900

Added lines #L899 - L900 were not covered by tests

return &t_aio.Result{
Kind: t_aio.CreateSchedule,
CreateSchedule: &t_aio.AlterSchedulesResult{
RowsAffected: rowsAffected,
},
}, nil
}

func (w *PostgresStoreWorker) updateSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.UpdateScheduleCommand) (*t_aio.Result, error) {
res, err := stmt.Exec(cmd.NextRunTime, cmd.Id, cmd.LastRunTime)
if err != nil {
return nil, err
}

Check warning on line 914 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L913-L914

Added lines #L913 - L914 were not covered by tests

rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, err
}

Check warning on line 919 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L918-L919

Added lines #L918 - L919 were not covered by tests

return &t_aio.Result{
Kind: t_aio.UpdateSchedule,
UpdateSchedule: &t_aio.AlterSchedulesResult{
RowsAffected: rowsAffected,
},
}, nil
}

func (w *PostgresStoreWorker) deleteSchedule(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.DeleteScheduleCommand) (*t_aio.Result, error) {
res, err := stmt.Exec(cmd.Id)
if err != nil {
return nil, err
}

Check warning on line 933 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L932-L933

Added lines #L932 - L933 were not covered by tests

rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, err
}

Check warning on line 938 in internal/app/subsystems/aio/store/postgres/postgres.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/postgres/postgres.go#L937-L938

Added lines #L937 - L938 were not covered by tests

return &t_aio.Result{
Kind: t_aio.DeleteSchedule,
DeleteSchedule: &t_aio.AlterSchedulesResult{
RowsAffected: rowsAffected,
},
}, nil
}

// TIMEOUTS

func (w *PostgresStoreWorker) readTimeouts(tx *sql.Tx, cmd *t_aio.ReadTimeoutsCommand) (*t_aio.Result, error) {
// select
rows, err := tx.Query(TIMEOUT_SELECT_STATEMENT, cmd.N)
Expand Down
8 changes: 4 additions & 4 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (

CREATE TABLE IF NOT EXISTS schedules (
id TEXT PRIMARY KEY,
desc TEXT,
description TEXT,
cron TEXT,
promise_id TEXT,
promise_param_headers BLOB,
Expand Down Expand Up @@ -139,23 +139,23 @@ const (

SCHEDULE_SELECT_STATEMENT = `
SELECT
id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key
id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key
FROM
schedules
WHERE
id = ?`

SCHEDULE_SELECT_ALL_STATEMENT = `
SELECT
id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key
id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key
FROM
schedules
WHERE
next_run_time <= ?`

SCHEDULE_INSERT_STATEMENT = `
INSERT INTO schedules
(id, desc, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key)
(id, description, cron, promise_id, promise_param_headers, promise_param_data, promise_timeout, last_run_time, next_run_time, created_on, idempotency_key)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING`
Expand Down
Loading