Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Lock Interface #29

Merged
merged 10 commits into from
Jan 25, 2024
113 changes: 110 additions & 3 deletions lib/core/stores/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ import { IStore, IPromiseStore, IScheduleStore } from "../store";
import { IEncoder } from "../encoder";
import { Base64Encoder } from "../encoders/base64";
import { ErrorCodes, ResonateError } from "../error";
import { ILockStore } from "../store";
import { ILogger } from "../logger";
import { Schedule, isSchedule } from "../schedule";
import { Logger } from "../loggers/logger";
import { LocalLockStore } from "./local";

export class RemoteStore implements IStore {
public promises: RemotePromiseStore;
public schedules: RemoteScheduleStore;
public locks: LocalLockStore;
public locks: RemoteLockStore;

constructor(url: string, logger: ILogger, encoder: IEncoder<string, string> = new Base64Encoder()) {
this.promises = new RemotePromiseStore(url, logger, encoder);
this.schedules = new RemoteScheduleStore(url, logger, encoder);

// temp
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved
this.locks = new LocalLockStore();
this.locks = new RemoteLockStore(url, logger);
}
}

Expand Down Expand Up @@ -496,3 +496,110 @@ function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules:
obj.schedules.every(isSchedule)
);
}

export class RemoteLockStore implements ILockStore {
private url: string;
// simple lock counter - tryAcquire increments, release decrements
// counter > 0 we call heartbeat api in a loop, setInterval
private lockCounter: number = 0;
private heartbeatInterval: NodeJS.Timeout | number | null = null;
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved

constructor(
url: string,
private logger: ILogger = new Logger(),
) {
this.url = url;
this.heartbeatInterval = 100;
}

async tryAcquire(resourceId: string, processId: string, executionId: string): Promise<boolean> {
const lockRequest = {
resourceId: resourceId,
processId: processId,
executionId: executionId,
};

const acquired = call<boolean>(
`${this.url}/locks/acquire`,
(b: unknown): b is boolean => typeof b === "boolean",
{
method: "post",
body: JSON.stringify(lockRequest),
},
this.logger,
);

if (await acquired) {
this.lockCounter++;
// Start the heartbeat if it's not already running
if (this.lockCounter === 1) {
this.startHeartbeat(processId);
}
return true;
}
return false;
}

async release(resourceId: string, executionId: string): Promise<void> {
const releaseLockRequest = {
resource_id: resourceId,
execution_id: executionId,
};

call<void>(
`${this.url}/locks/release`,
(response: unknown): response is void => response === undefined,
{
method: "post",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(releaseLockRequest),
},
this.logger,
);

this.lockCounter--;
// Stop the heartbeat if there are no more locks
if (this.lockCounter === 0) {
this.stopHeartbeat();
}
}

private startHeartbeat(processId: string): void {
this.heartbeatInterval = setInterval(async () => {
const locksAffected = await this.heartbeat(processId);
this.lockCounter = locksAffected; // Set lockCounter based on the response
if (this.lockCounter === 0) {
this.stopHeartbeat();
}
}, 5000); // desired heartbeat interval (in ms)
}

private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = 0;
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved
}
}

private async heartbeat(processId: string): Promise<number> {
const heartbeatRequest = {
process_id: processId,
timeout: 0,
};

return call<number>(
`${this.url}/locks/heartbeat`,
(locksAffected: unknown): locksAffected is number => typeof locksAffected === "number",
{
method: "post",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(heartbeatRequest),
},
this.logger,
);
}
}
2 changes: 1 addition & 1 deletion test/lock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe("Lock", () => {
expect(sharedResource.length).toBe(1);

// release lock so p2 can run
store.locks.release("write/id", sharedResource[0]);
await store.locks.release("write/id", sharedResource[0]);

const r = await p2;

Expand Down
Loading