diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index b2c8f23..ab253a2 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -12,22 +12,20 @@ import { IStore, IPromiseStore, IScheduleStore } from "../store"; import { IEncoder } from "../encoder"; import { Base64Encoder } from "../encoders/base64"; import { ErrorCodes, ResonateError } from "../error"; +import { ILockStore } from "../store"; import { ILogger } from "../logger"; import { Schedule, isSchedule } from "../schedule"; import { Logger } from "../loggers/logger"; -import { LocalLockStore } from "./local"; export class RemoteStore implements IStore { public promises: RemotePromiseStore; public schedules: RemoteScheduleStore; - public locks: LocalLockStore; + public locks: RemoteLockStore; constructor(url: string, logger: ILogger, encoder: IEncoder = new Base64Encoder()) { this.promises = new RemotePromiseStore(url, logger, encoder); this.schedules = new RemoteScheduleStore(url, logger, encoder); - - // temp - this.locks = new LocalLockStore(); + this.locks = new RemoteLockStore(url, logger); } } @@ -496,3 +494,97 @@ function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules: obj.schedules.every(isSchedule) ); } + +export class RemoteLockStore implements ILockStore { + private url: string; + private heartbeatInterval: number | null = null; + + constructor( + url: string, + private logger: ILogger = new Logger(), + ) { + this.url = url; + } + + async tryAcquire(resourceId: string, processId: string, executionId: string): Promise { + const lockRequest = { + resourceId: resourceId, + processId: processId, + executionId: executionId, + }; + + const acquired = call( + `${this.url}/locks/acquire`, + (b: unknown): b is boolean => typeof b === "boolean", + { + method: "post", + body: JSON.stringify(lockRequest), + }, + this.logger, + ); + + if (await acquired) { + if (this.heartbeatInterval === null) { + this.startHeartbeat(processId); + } + return true; + } + return false; + } + + async release(resourceId: string, executionId: string): Promise { + const releaseLockRequest = { + resource_id: resourceId, + execution_id: executionId, + }; + + call( + `${this.url}/locks/release`, + (response: unknown): response is void => response === undefined, + { + method: "post", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(releaseLockRequest), + }, + this.logger, + ); + } + + private startHeartbeat(processId: string): void { + this.heartbeatInterval = +setInterval(async () => { + const locksAffected = await this.heartbeat(processId); + if (locksAffected === 0) { + this.stopHeartbeat(); + } + }, 5000); // desired heartbeat interval (in ms) + } + + private stopHeartbeat(): void { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + } + + private async heartbeat(processId: string): Promise { + const heartbeatRequest = { + process_id: processId, + timeout: 0, + }; + + return call( + `${this.url}/locks/heartbeat`, + (locksAffected: unknown): locksAffected is number => typeof locksAffected === "number", + { + method: "post", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(heartbeatRequest), + }, + this.logger, + ); + } +} diff --git a/test/lock.test.ts b/test/lock.test.ts index 60fd9e4..9f65b91 100644 --- a/test/lock.test.ts +++ b/test/lock.test.ts @@ -35,7 +35,7 @@ describe("Lock", () => { expect(sharedResource.length).toBe(1); // release lock so p2 can run - store.locks.release("write/id", sharedResource[0]); + await store.locks.release("write/id", sharedResource[0]); const r = await p2;