Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(locks): improve usability by accepting delta #216

Merged
merged 4 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions api/locks-openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ paths:
/locks/heartbeat:
post:
summary: heartbeat
description: update heartbeat for all locks that match the process_id
description: update heartbeat for all locks that match the processId
operationId: heartbeat
requestBody:
content:
Expand Down Expand Up @@ -62,32 +62,32 @@ components:
Lock:
type: object
properties:
resource_id:
resourceId:
type: string
process_id:
processId:
type: string
execution_id:
executionId:
type: string
timeout:
expiryInSeconds:
type: integer
format: int64
expiresAt:
type: integer
format: int64
readOnly: true
required:
- resource_id
- process_id
- execution_id
- timeout
- resourceId
- processId
- executionId
- expiryInSeconds

HeartbeatRequest:
type: object
properties:
process_id:
processId:
type: string
timeout:
type: integer
format: int64
required:
- process_id
- timeout
- processId

HeartbeatResponse:
type: object
Expand All @@ -99,10 +99,10 @@ components:
ReleaseLockRequest:
type: object
properties:
resource_id:
resourceId:
type: string
execution_id:
executionId:
type: string
required:
- resource_id
- execution_id
- resourceId
- executionId
10 changes: 5 additions & 5 deletions internal/app/coroutines/acquireLock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func AcquireLock(metadata *metadata.Metadata, req *t_api.Request, res CallBackFn
ResourceId: req.AcquireLock.ResourceId,
ProcessId: req.AcquireLock.ProcessId,
ExecutionId: req.AcquireLock.ExecutionId,
Timeout: req.AcquireLock.Timeout,
Timeout: c.Time() + (req.AcquireLock.ExpiryInSeconds * 1000), // from s to ms
},
},
},
Expand Down Expand Up @@ -62,10 +62,10 @@ func AcquireLock(metadata *metadata.Metadata, req *t_api.Request, res CallBackFn
AcquireLock: &t_api.AcquireLockResponse{
Status: t_api.StatusCreated,
Lock: &lock.Lock{
ResourceId: req.AcquireLock.ResourceId,
ProcessId: req.AcquireLock.ProcessId,
ExecutionId: req.AcquireLock.ExecutionId,
Timeout: req.AcquireLock.Timeout,
ResourceId: req.AcquireLock.ResourceId,
ProcessId: req.AcquireLock.ProcessId,
ExecutionId: req.AcquireLock.ExecutionId,
ExpiryInSeconds: req.AcquireLock.ExpiryInSeconds,
},
},
}, nil)
Expand Down
1 change: 0 additions & 1 deletion internal/app/coroutines/heartbeatLocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func HeartbeatLocks(metadata *metadata.Metadata, req *t_api.Request, res CallBac
Kind: t_aio.HeartbeatLocks,
HeartbeatLocks: &t_aio.HeartbeatLocksCommand{
ProcessId: req.HeartbeatLocks.ProcessId,
Timeout: req.HeartbeatLocks.Timeout,
},
},
},
Expand Down
25 changes: 14 additions & 11 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
const (
CREATE_TABLE_STATEMENT = `
CREATE TABLE IF NOT EXISTS locks (
resource_id TEXT,
process_id TEXT,
execution_id TEXT,
timeout BIGINT,
resource_id TEXT,
process_id TEXT,
execution_id TEXT,
expiry_in_seconds BIGINT,
timeout BIGINT,
PRIMARY KEY(resource_id)
);

Expand Down Expand Up @@ -119,30 +120,31 @@ const (

LOCK_READ_STATEMENT = `
SELECT
resource_id, process_id, execution_id, timeout
resource_id, process_id, execution_id, expiry_in_seconds, timeout
FROM
locks
WHERE
resource_id = $1`

LOCK_ACQUIRE_STATEMENT = `
INSERT INTO locks
(resource_id, process_id, execution_id, timeout)
(resource_id, process_id, execution_id, expiry_in_seconds, timeout)
VALUES
($1, $2, $3, $4)
($1, $2, $3, $4, $5)
ON CONFLICT(resource_id)
DO UPDATE SET
process_id = EXCLUDED.process_id,
expiry_in_seconds = EXCLUDED.expiry_in_seconds,
timeout = EXCLUDED.timeout
WHERE locks.execution_id = EXCLUDED.execution_id`

LOCK_HEARTBEAT_STATEMENT = `
UPDATE
locks
SET
timeout = $1
timeout = timeout + (expiry_in_seconds * 1000)
WHERE
process_id = $2`
process_id = $1`

LOCK_RELEASE_STATEMENT = `
DELETE FROM locks WHERE resource_id = $1 AND execution_id = $2`
Expand Down Expand Up @@ -730,6 +732,7 @@ func (w *PostgresStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (
&record.ResourceId,
&record.ProcessId,
&record.ExecutionId,
&record.ExpiryInSeconds,
&record.Timeout,
); err != nil {
if err == sql.ErrNoRows {
Expand All @@ -755,7 +758,7 @@ func (w *PostgresStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (

func (w *PostgresStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.AcquireLockCommand) (*t_aio.Result, error) {
// insert
res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.Timeout)
res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.ExpiryInSeconds, cmd.Timeout)
if err != nil {
return nil, err
}
Expand All @@ -775,7 +778,7 @@ func (w *PostgresStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio

func (w *PostgresStoreWorker) hearbeatLocks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.HeartbeatLocksCommand) (*t_aio.Result, error) {
// update
res, err := stmt.Exec(cmd.Timeout, cmd.ProcessId)
res, err := stmt.Exec(cmd.ProcessId)
if err != nil {
return nil, err
}
Expand Down
23 changes: 13 additions & 10 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
const (
CREATE_TABLE_STATEMENT = `
CREATE TABLE IF NOT EXISTS locks (
resource_id TEXT UNIQUE,
process_id TEXT,
execution_id TEXT,
timeout INTEGER
resource_id TEXT UNIQUE,
process_id TEXT,
execution_id TEXT,
expiry_in_seconds INTEGER,
timeout INTEGER
);

CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id);
Expand Down Expand Up @@ -108,20 +109,21 @@ const (

LOCK_READ_STATEMENT = `
SELECT
resource_id, process_id, execution_id, timeout
resource_id, process_id, execution_id, expiry_in_seconds, timeout
FROM
locks
WHERE
resource_id = ?`

LOCK_ACQUIRE_STATEMENT = `
INSERT INTO locks
(resource_id, process_id, execution_id, timeout)
(resource_id, process_id, execution_id, expiry_in_seconds, timeout)
VALUES
(?, ?, ?, ?)
(?, ?, ?, ?, ?)
ON CONFLICT(resource_id)
DO UPDATE SET
process_id = excluded.process_id,
expiry_in_seconds = excluded.expiry_in_seconds,
timeout = excluded.timeout
WHERE
execution_id = excluded.execution_id`
Expand All @@ -130,7 +132,7 @@ const (
UPDATE
locks
SET
timeout = ?
timeout = timeout + (expiry_in_seconds * 1000)
WHERE
process_id = ?`

Expand Down Expand Up @@ -692,6 +694,7 @@ func (w *SqliteStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (*t
&record.ResourceId,
&record.ProcessId,
&record.ExecutionId,
&record.ExpiryInSeconds,
&record.Timeout,
); err != nil {
if err == sql.ErrNoRows {
Expand All @@ -717,7 +720,7 @@ func (w *SqliteStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (*t

func (w *SqliteStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.AcquireLockCommand) (*t_aio.Result, error) {
// insert
res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.Timeout)
res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.ExpiryInSeconds, cmd.Timeout)
if err != nil {
return nil, err
}
Expand All @@ -737,7 +740,7 @@ func (w *SqliteStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.A

func (w *SqliteStoreWorker) hearbeatLocks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.HeartbeatLocksCommand) (*t_aio.Result, error) {
// update
res, err := stmt.Exec(cmd.Timeout, cmd.ProcessId)
res, err := stmt.Exec(cmd.ProcessId)
if err != nil {
return nil, err
}
Expand Down
Loading