From 8948af7173069e124bb0f5bb68fa1b26b2834713 Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Tue, 23 Jan 2024 13:55:27 +0530 Subject: [PATCH 1/9] initial implementation of lock interface in remote and local mode --- lib/core/lock.ts | 8 +++ lib/core/locks/local.ts | 20 ++++++ lib/core/locks/remote.ts | 136 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 164 insertions(+) create mode 100644 lib/core/lock.ts create mode 100644 lib/core/locks/local.ts create mode 100644 lib/core/locks/remote.ts diff --git a/lib/core/lock.ts b/lib/core/lock.ts new file mode 100644 index 0000000..092a28b --- /dev/null +++ b/lib/core/lock.ts @@ -0,0 +1,8 @@ +export interface ILock { + tryAcquire(resourceId: string, processId: string, executionId: string, timeout: number | undefined): Promise; + releaseLock(resourceId: string, executionId: string): Promise; +} + +type WithHeartbeat = T extends { heartbeat(processId: string, timeout: number): Promise } ? T : never; + +export type ILockRemote = ILock & WithHeartbeat<{ heartbeat(processId: string, timeout: number): Promise }>; diff --git a/lib/core/locks/local.ts b/lib/core/locks/local.ts new file mode 100644 index 0000000..bd5ab40 --- /dev/null +++ b/lib/core/locks/local.ts @@ -0,0 +1,20 @@ +import { ILock } from "../lock"; + +export class LocalLock implements ILock { + private locks: Map = new Map(); + + async tryAcquire(resourceId: string, processId: string, executionId: string, timeout?: number): Promise { + const lockKey = `${resourceId}-${executionId}`; + if (!this.locks.has(lockKey)) { + this.locks.set(lockKey, { processId, executionId }); + return true; + } else { + return false; + } + } + + async releaseLock(resourceId: string, executionId: string): Promise { + const lockKey = `${resourceId}-${executionId}`; + this.locks.delete(lockKey); + } +} diff --git a/lib/core/locks/remote.ts b/lib/core/locks/remote.ts new file mode 100644 index 0000000..04ce1ca --- /dev/null +++ b/lib/core/locks/remote.ts @@ -0,0 +1,136 @@ +import { ErrorCodes, ResonateError } from "../error"; +import { ILockRemote } from "../lock"; +import { ILogger } from "../logger"; +import { Logger } from "../loggers/logger"; + +export class RemoteLock implements ILockRemote { + private url: string; + + constructor( + url: string, + private logger: ILogger = new Logger(), + ) { + this.url = url; + } + + async tryAcquire(resourceId: string, processId: string, executionId: string, timeout?: number): Promise { + const lockRequest = { + resource_id: resourceId, + process_id: processId, + execution_id: executionId, + timeout: timeout || 0, + }; + + return call( + `${this.url}/locks/acquire`, + (b: unknown): b is boolean => typeof b === "boolean", + { + method: "post", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(lockRequest), + }, + this.logger, + ); + } + + async releaseLock(resourceId: string, executionId: string): Promise { + const releaseLockRequest = { + resource_id: resourceId, + execution_id: executionId, + }; + + await call( + `${this.url}/locks/release`, + (response: unknown): response is void => response === undefined, + { + method: "post", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(releaseLockRequest), + }, + this.logger, + ); + } + + async heartbeat(processId: string, timeout: number): Promise { + const heartbeatRequest = { + process_id: processId, + timeout: timeout, + }; + + return call( + `${this.url}/locks/heartbeat`, + (locksAffected: unknown): locksAffected is number => typeof locksAffected === "number", + { + method: "post", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(heartbeatRequest), + }, + this.logger, + ); + } +} + +async function call( + url: string, + guard: (b: unknown) => b is T, + options: RequestInit, + logger: ILogger, + retries: number = 3, +): Promise { + let error: unknown; + + for (let i = 0; i < retries; i++) { + try { + const r = await fetch(url, options); + const body: unknown = await r.json(); + + logger.debug("store", { + req: { + method: options.method, + url: url, + headers: options.headers, + body: options.body, + }, + res: { + status: r.status, + body: body, + }, + }); + + if (!r.ok) { + switch (r.status) { + case 400: + throw new ResonateError(ErrorCodes.PAYLOAD, "Invalid request", body); + case 403: + throw new ResonateError(ErrorCodes.FORBIDDEN, "Forbidden request", body); + case 404: + throw new ResonateError(ErrorCodes.NOT_FOUND, "Not found", body); + case 409: + throw new ResonateError(ErrorCodes.ALREADY_EXISTS, "Already exists", body); + default: + throw new ResonateError(ErrorCodes.SERVER, "Server error", body, true); + } + } + + if (!guard(body)) { + throw new ResonateError(ErrorCodes.PAYLOAD, "Invalid response", body); + } + + return body; + } catch (e: unknown) { + if (e instanceof ResonateError && !e.retryable) { + throw e; + } else { + error = e; + } + } + } + + throw ResonateError.fromError(error); +} From e77c88e7317e2e4669622a950b322b042a634e29 Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Wed, 24 Jan 2024 11:26:19 +0530 Subject: [PATCH 2/9] making the interface async to support remote lock class --- lib/core/lock.ts | 8 ----- lib/core/locks/local.ts | 20 ------------ lib/core/locks/remote.ts | 62 ++++++++++++++++++++++++++++--------- lib/core/storages/memory.ts | 4 +-- lib/core/store.ts | 4 +-- lib/resonate.ts | 4 +-- test/lock.test.ts | 2 +- 7 files changed, 54 insertions(+), 50 deletions(-) delete mode 100644 lib/core/lock.ts delete mode 100644 lib/core/locks/local.ts diff --git a/lib/core/lock.ts b/lib/core/lock.ts deleted file mode 100644 index 092a28b..0000000 --- a/lib/core/lock.ts +++ /dev/null @@ -1,8 +0,0 @@ -export interface ILock { - tryAcquire(resourceId: string, processId: string, executionId: string, timeout: number | undefined): Promise; - releaseLock(resourceId: string, executionId: string): Promise; -} - -type WithHeartbeat = T extends { heartbeat(processId: string, timeout: number): Promise } ? T : never; - -export type ILockRemote = ILock & WithHeartbeat<{ heartbeat(processId: string, timeout: number): Promise }>; diff --git a/lib/core/locks/local.ts b/lib/core/locks/local.ts deleted file mode 100644 index bd5ab40..0000000 --- a/lib/core/locks/local.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { ILock } from "../lock"; - -export class LocalLock implements ILock { - private locks: Map = new Map(); - - async tryAcquire(resourceId: string, processId: string, executionId: string, timeout?: number): Promise { - const lockKey = `${resourceId}-${executionId}`; - if (!this.locks.has(lockKey)) { - this.locks.set(lockKey, { processId, executionId }); - return true; - } else { - return false; - } - } - - async releaseLock(resourceId: string, executionId: string): Promise { - const lockKey = `${resourceId}-${executionId}`; - this.locks.delete(lockKey); - } -} diff --git a/lib/core/locks/remote.ts b/lib/core/locks/remote.ts index 04ce1ca..93f44f7 100644 --- a/lib/core/locks/remote.ts +++ b/lib/core/locks/remote.ts @@ -1,10 +1,14 @@ import { ErrorCodes, ResonateError } from "../error"; -import { ILockRemote } from "../lock"; import { ILogger } from "../logger"; import { Logger } from "../loggers/logger"; +import { ILockStore } from "../store"; -export class RemoteLock implements ILockRemote { +export class RemoteLock implements ILockStore { private url: string; + // simple lock counter - tryAcquire increments, release decrements + // counter > 0 we call heartbeat api in a loop, setInterval + private lockCounter: number = 0; + private heartbeatInterval: NodeJS.Timer | null = null; constructor( url: string, @@ -13,35 +17,41 @@ export class RemoteLock implements ILockRemote { this.url = url; } - async tryAcquire(resourceId: string, processId: string, executionId: string, timeout?: number): Promise { + async tryAcquire(resourceId: string, processId: string, executionId: string): Promise { const lockRequest = { - resource_id: resourceId, - process_id: processId, - execution_id: executionId, - timeout: timeout || 0, + resourceId: resourceId, + processId: processId, + executionId: executionId, }; - return call( + const acquired = call( `${this.url}/locks/acquire`, (b: unknown): b is boolean => typeof b === "boolean", { method: "post", - headers: { - "Content-Type": "application/json", - }, body: JSON.stringify(lockRequest), }, this.logger, ); + + if (await acquired) { + this.lockCounter++; + // Start the heartbeat if it's not already running + if (this.lockCounter === 1) { + this.startHeartbeat(processId); + } + return true; + } + return false; } - async releaseLock(resourceId: string, executionId: string): Promise { + async release(resourceId: string, executionId: string): Promise { const releaseLockRequest = { resource_id: resourceId, execution_id: executionId, }; - await call( + call( `${this.url}/locks/release`, (response: unknown): response is void => response === undefined, { @@ -53,12 +63,34 @@ export class RemoteLock implements ILockRemote { }, this.logger, ); + + this.lockCounter--; + // Stop the heartbeat if there are no more locks + if (this.lockCounter === 0) { + this.stopHeartbeat(); + } + } + + private startHeartbeat(processId: string): void { + this.heartbeatInterval = setInterval(async () => { + const locksAffected = await this.heartbeat(processId); + if (locksAffected === 0) { + this.stopHeartbeat(); + } + }, 5000); // desired heartbeat interval (in ms) + } + + private stopHeartbeat(): void { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval as any); + this.heartbeatInterval = null; + } } - async heartbeat(processId: string, timeout: number): Promise { + private async heartbeat(processId: string): Promise { const heartbeatRequest = { process_id: processId, - timeout: timeout, + timeout: 0, }; return call( diff --git a/lib/core/storages/memory.ts b/lib/core/storages/memory.ts index efff90b..7f073f6 100644 --- a/lib/core/storages/memory.ts +++ b/lib/core/storages/memory.ts @@ -76,7 +76,7 @@ export class MemoryScheduleStorage implements IScheduleStorage { export class MemoryLockStore implements ILockStore { private locks: Record = {}; - tryAcquire(id: string, pid: string, eid: string): boolean { + async tryAcquire(id: string, pid: string, eid: string): Promise { if (!this.locks[id] || (this.locks[id] && this.locks[id].eid === eid)) { // Lock is available, acquire it this.locks[id] = { pid, eid }; @@ -87,7 +87,7 @@ export class MemoryLockStore implements ILockStore { } } - release(id: string, eid: string): void { + async release(id: string, eid: string): Promise { if (this.locks[id] && this.locks[id].eid === eid) { // Release the lock delete this.locks[id]; diff --git a/lib/core/store.ts b/lib/core/store.ts index dc56211..e823cd6 100644 --- a/lib/core/store.ts +++ b/lib/core/store.ts @@ -182,6 +182,6 @@ export interface IScheduleStore { * Lock Store API */ export interface ILockStore { - tryAcquire(id: string, pid: string, eid: string): boolean; - release(id: string, eid: string): void; + tryAcquire(id: string, pid: string, eid: string): Promise; + release(id: string, eid: string): Promise; } diff --git a/lib/resonate.ts b/lib/resonate.ts index 74db1ae..0bd736f 100644 --- a/lib/resonate.ts +++ b/lib/resonate.ts @@ -213,7 +213,7 @@ export class Resonate { if (!this.cache.has(id)) { const promise = new Promise(async (resolve, reject) => { // lock - while (!locks.tryAcquire(id, this.pid, opts.eid)) { + while (!(await locks.tryAcquire(id, this.pid, opts.eid))) { // sleep await new Promise((r) => setTimeout(r, 1000)); } @@ -225,7 +225,7 @@ export class Resonate { } catch (e) { reject(e); } finally { - locks.release(id, opts.eid); + await locks.release(id, opts.eid); } }); diff --git a/test/lock.test.ts b/test/lock.test.ts index 13d0117..fac0a20 100644 --- a/test/lock.test.ts +++ b/test/lock.test.ts @@ -36,7 +36,7 @@ describe("Lock", () => { expect(sharedResource.length).toBe(1); // release lock so p2 can run - store.locks.release("write/id", sharedResource[0]); + await store.locks.release("write/id", sharedResource[0]); const r = await p2; From 6a43c3d36d81082f70c24f06090704b0571014ff Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Thu, 25 Jan 2024 07:57:36 +0530 Subject: [PATCH 3/9] moving remotelockstore to stores class --- lib/core/locks/remote.ts | 168 -------------------------------------- lib/core/stores/remote.ts | 111 ++++++++++++++++++++++++- 2 files changed, 109 insertions(+), 170 deletions(-) delete mode 100644 lib/core/locks/remote.ts diff --git a/lib/core/locks/remote.ts b/lib/core/locks/remote.ts deleted file mode 100644 index 93f44f7..0000000 --- a/lib/core/locks/remote.ts +++ /dev/null @@ -1,168 +0,0 @@ -import { ErrorCodes, ResonateError } from "../error"; -import { ILogger } from "../logger"; -import { Logger } from "../loggers/logger"; -import { ILockStore } from "../store"; - -export class RemoteLock implements ILockStore { - private url: string; - // simple lock counter - tryAcquire increments, release decrements - // counter > 0 we call heartbeat api in a loop, setInterval - private lockCounter: number = 0; - private heartbeatInterval: NodeJS.Timer | null = null; - - constructor( - url: string, - private logger: ILogger = new Logger(), - ) { - this.url = url; - } - - async tryAcquire(resourceId: string, processId: string, executionId: string): Promise { - const lockRequest = { - resourceId: resourceId, - processId: processId, - executionId: executionId, - }; - - const acquired = call( - `${this.url}/locks/acquire`, - (b: unknown): b is boolean => typeof b === "boolean", - { - method: "post", - body: JSON.stringify(lockRequest), - }, - this.logger, - ); - - if (await acquired) { - this.lockCounter++; - // Start the heartbeat if it's not already running - if (this.lockCounter === 1) { - this.startHeartbeat(processId); - } - return true; - } - return false; - } - - async release(resourceId: string, executionId: string): Promise { - const releaseLockRequest = { - resource_id: resourceId, - execution_id: executionId, - }; - - call( - `${this.url}/locks/release`, - (response: unknown): response is void => response === undefined, - { - method: "post", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(releaseLockRequest), - }, - this.logger, - ); - - this.lockCounter--; - // Stop the heartbeat if there are no more locks - if (this.lockCounter === 0) { - this.stopHeartbeat(); - } - } - - private startHeartbeat(processId: string): void { - this.heartbeatInterval = setInterval(async () => { - const locksAffected = await this.heartbeat(processId); - if (locksAffected === 0) { - this.stopHeartbeat(); - } - }, 5000); // desired heartbeat interval (in ms) - } - - private stopHeartbeat(): void { - if (this.heartbeatInterval) { - clearInterval(this.heartbeatInterval as any); - this.heartbeatInterval = null; - } - } - - private async heartbeat(processId: string): Promise { - const heartbeatRequest = { - process_id: processId, - timeout: 0, - }; - - return call( - `${this.url}/locks/heartbeat`, - (locksAffected: unknown): locksAffected is number => typeof locksAffected === "number", - { - method: "post", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(heartbeatRequest), - }, - this.logger, - ); - } -} - -async function call( - url: string, - guard: (b: unknown) => b is T, - options: RequestInit, - logger: ILogger, - retries: number = 3, -): Promise { - let error: unknown; - - for (let i = 0; i < retries; i++) { - try { - const r = await fetch(url, options); - const body: unknown = await r.json(); - - logger.debug("store", { - req: { - method: options.method, - url: url, - headers: options.headers, - body: options.body, - }, - res: { - status: r.status, - body: body, - }, - }); - - if (!r.ok) { - switch (r.status) { - case 400: - throw new ResonateError(ErrorCodes.PAYLOAD, "Invalid request", body); - case 403: - throw new ResonateError(ErrorCodes.FORBIDDEN, "Forbidden request", body); - case 404: - throw new ResonateError(ErrorCodes.NOT_FOUND, "Not found", body); - case 409: - throw new ResonateError(ErrorCodes.ALREADY_EXISTS, "Already exists", body); - default: - throw new ResonateError(ErrorCodes.SERVER, "Server error", body, true); - } - } - - if (!guard(body)) { - throw new ResonateError(ErrorCodes.PAYLOAD, "Invalid response", body); - } - - return body; - } catch (e: unknown) { - if (e instanceof ResonateError && !e.retryable) { - throw e; - } else { - error = e; - } - } - } - - throw ResonateError.fromError(error); -} diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index b2c8f23..0aada35 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -12,22 +12,24 @@ import { IStore, IPromiseStore, IScheduleStore } from "../store"; import { IEncoder } from "../encoder"; import { Base64Encoder } from "../encoders/base64"; import { ErrorCodes, ResonateError } from "../error"; +import { ILockStore } from "../store"; import { ILogger } from "../logger"; import { Schedule, isSchedule } from "../schedule"; import { Logger } from "../loggers/logger"; import { LocalLockStore } from "./local"; + export class RemoteStore implements IStore { public promises: RemotePromiseStore; public schedules: RemoteScheduleStore; - public locks: LocalLockStore; + public locks: RemoteLockStore; constructor(url: string, logger: ILogger, encoder: IEncoder = new Base64Encoder()) { this.promises = new RemotePromiseStore(url, logger, encoder); this.schedules = new RemoteScheduleStore(url, logger, encoder); // temp - this.locks = new LocalLockStore(); + this.locks = new RemoteLockStore(url, logger); } } @@ -496,3 +498,108 @@ function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules: obj.schedules.every(isSchedule) ); } + +export class RemoteLockStore implements ILockStore { + private url: string; + // simple lock counter - tryAcquire increments, release decrements + // counter > 0 we call heartbeat api in a loop, setInterval + private lockCounter: number = 0; + private heartbeatInterval: NodeJS.Timer | null = null; + + constructor( + url: string, + private logger: ILogger = new Logger(), + ) { + this.url = url; + } + + async tryAcquire(resourceId: string, processId: string, executionId: string): Promise { + const lockRequest = { + resourceId: resourceId, + processId: processId, + executionId: executionId, + }; + + const acquired = call( + `${this.url}/locks/acquire`, + (b: unknown): b is boolean => typeof b === "boolean", + { + method: "post", + body: JSON.stringify(lockRequest), + }, + this.logger, + ); + + if (await acquired) { + this.lockCounter++; + // Start the heartbeat if it's not already running + if (this.lockCounter === 1) { + this.startHeartbeat(processId); + } + return true; + } + return false; + } + + async release(resourceId: string, executionId: string): Promise { + const releaseLockRequest = { + resource_id: resourceId, + execution_id: executionId, + }; + + call( + `${this.url}/locks/release`, + (response: unknown): response is void => response === undefined, + { + method: "post", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(releaseLockRequest), + }, + this.logger, + ); + + this.lockCounter--; + // Stop the heartbeat if there are no more locks + if (this.lockCounter === 0) { + this.stopHeartbeat(); + } + } + + private startHeartbeat(processId: string): void { + this.heartbeatInterval = setInterval(async () => { + const locksAffected = await this.heartbeat(processId); + if (locksAffected === 0) { + this.stopHeartbeat(); + } + }, 5000); // desired heartbeat interval (in ms) + } + + private stopHeartbeat(): void { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval as any); + this.heartbeatInterval = null; + } + } + + private async heartbeat(processId: string): Promise { + const heartbeatRequest = { + process_id: processId, + timeout: 0, + }; + + return call( + `${this.url}/locks/heartbeat`, + (locksAffected: unknown): locksAffected is number => typeof locksAffected === "number", + { + method: "post", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(heartbeatRequest), + }, + this.logger, + ); + } +} \ No newline at end of file From 8d3732b680e5eb315399422df1b924178d8cc7a5 Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Thu, 25 Jan 2024 07:58:01 +0530 Subject: [PATCH 4/9] lint fixes --- lib/core/stores/remote.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index 0aada35..10aa6cd 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -18,7 +18,6 @@ import { Schedule, isSchedule } from "../schedule"; import { Logger } from "../loggers/logger"; import { LocalLockStore } from "./local"; - export class RemoteStore implements IStore { public promises: RemotePromiseStore; public schedules: RemoteScheduleStore; @@ -602,4 +601,4 @@ export class RemoteLockStore implements ILockStore { this.logger, ); } -} \ No newline at end of file +} From dae25d3be14f30e1ec465fb1b99384655b0d4e85 Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Thu, 25 Jan 2024 08:25:29 +0530 Subject: [PATCH 5/9] removing unused imports --- lib/core/stores/remote.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index 10aa6cd..a11bc78 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -16,7 +16,6 @@ import { ILockStore } from "../store"; import { ILogger } from "../logger"; import { Schedule, isSchedule } from "../schedule"; import { Logger } from "../loggers/logger"; -import { LocalLockStore } from "./local"; export class RemoteStore implements IStore { public promises: RemotePromiseStore; From 7146e4328b9b84035fbc5bfb2f246255314c565a Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Thu, 25 Jan 2024 14:30:44 +0530 Subject: [PATCH 6/9] using remotelockstore in remote store --- lib/core/stores/remote.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index a11bc78..88d96df 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -502,13 +502,14 @@ export class RemoteLockStore implements ILockStore { // simple lock counter - tryAcquire increments, release decrements // counter > 0 we call heartbeat api in a loop, setInterval private lockCounter: number = 0; - private heartbeatInterval: NodeJS.Timer | null = null; + private heartbeatInterval: NodeJS.Timeout | number | null = null; constructor( url: string, private logger: ILogger = new Logger(), ) { this.url = url; + this.heartbeatInterval = 100; } async tryAcquire(resourceId: string, processId: string, executionId: string): Promise { @@ -568,7 +569,8 @@ export class RemoteLockStore implements ILockStore { private startHeartbeat(processId: string): void { this.heartbeatInterval = setInterval(async () => { const locksAffected = await this.heartbeat(processId); - if (locksAffected === 0) { + this.lockCounter = locksAffected; // Set lockCounter based on the response + if (this.lockCounter === 0) { this.stopHeartbeat(); } }, 5000); // desired heartbeat interval (in ms) @@ -576,8 +578,8 @@ export class RemoteLockStore implements ILockStore { private stopHeartbeat(): void { if (this.heartbeatInterval) { - clearInterval(this.heartbeatInterval as any); - this.heartbeatInterval = null; + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = 0; } } From 5982bb2f2877f36a36c93cd3fe0a030bba17912d Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Thu, 25 Jan 2024 22:31:36 +0530 Subject: [PATCH 7/9] removing comments --- lib/core/stores/remote.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index 88d96df..ad31718 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -25,8 +25,6 @@ export class RemoteStore implements IStore { constructor(url: string, logger: ILogger, encoder: IEncoder = new Base64Encoder()) { this.promises = new RemotePromiseStore(url, logger, encoder); this.schedules = new RemoteScheduleStore(url, logger, encoder); - - // temp this.locks = new RemoteLockStore(url, logger); } } @@ -499,8 +497,6 @@ function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules: export class RemoteLockStore implements ILockStore { private url: string; - // simple lock counter - tryAcquire increments, release decrements - // counter > 0 we call heartbeat api in a loop, setInterval private lockCounter: number = 0; private heartbeatInterval: NodeJS.Timeout | number | null = null; From 737fdb63d294727eace2acb577beae20cb9f8041 Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Thu, 25 Jan 2024 23:08:30 +0530 Subject: [PATCH 8/9] removing lock counter from memory and using the val from api response --- lib/core/stores/remote.ts | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index ad31718..9ad4830 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -497,15 +497,13 @@ function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules: export class RemoteLockStore implements ILockStore { private url: string; - private lockCounter: number = 0; - private heartbeatInterval: NodeJS.Timeout | number | null = null; + private heartbeatInterval: number | null = null; constructor( url: string, private logger: ILogger = new Logger(), ) { this.url = url; - this.heartbeatInterval = 100; } async tryAcquire(resourceId: string, processId: string, executionId: string): Promise { @@ -526,9 +524,7 @@ export class RemoteLockStore implements ILockStore { ); if (await acquired) { - this.lockCounter++; - // Start the heartbeat if it's not already running - if (this.lockCounter === 1) { + if (this.heartbeatInterval === null) { this.startHeartbeat(processId); } return true; @@ -554,19 +550,12 @@ export class RemoteLockStore implements ILockStore { }, this.logger, ); - - this.lockCounter--; - // Stop the heartbeat if there are no more locks - if (this.lockCounter === 0) { - this.stopHeartbeat(); - } } private startHeartbeat(processId: string): void { - this.heartbeatInterval = setInterval(async () => { + this.heartbeatInterval = +setInterval(async () => { const locksAffected = await this.heartbeat(processId); - this.lockCounter = locksAffected; // Set lockCounter based on the response - if (this.lockCounter === 0) { + if (locksAffected === 0) { this.stopHeartbeat(); } }, 5000); // desired heartbeat interval (in ms) From d2e2b525853368fca2144a4b45050c644d6934c6 Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Thu, 25 Jan 2024 23:18:57 +0530 Subject: [PATCH 9/9] nit fix --- lib/core/stores/remote.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index 9ad4830..ab253a2 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -564,7 +564,7 @@ export class RemoteLockStore implements ILockStore { private stopHeartbeat(): void { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); - this.heartbeatInterval = 0; + this.heartbeatInterval = null; } }