Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Lock Interface #29

Merged
merged 10 commits into from
Jan 25, 2024
102 changes: 97 additions & 5 deletions lib/core/stores/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,20 @@ import { IStore, IPromiseStore, IScheduleStore } from "../store";
import { IEncoder } from "../encoder";
import { Base64Encoder } from "../encoders/base64";
import { ErrorCodes, ResonateError } from "../error";
import { ILockStore } from "../store";
import { ILogger } from "../logger";
import { Schedule, isSchedule } from "../schedule";
import { Logger } from "../loggers/logger";
import { LocalLockStore } from "./local";

export class RemoteStore implements IStore {
public promises: RemotePromiseStore;
public schedules: RemoteScheduleStore;
public locks: LocalLockStore;
public locks: RemoteLockStore;

constructor(url: string, logger: ILogger, encoder: IEncoder<string, string> = new Base64Encoder()) {
this.promises = new RemotePromiseStore(url, logger, encoder);
this.schedules = new RemoteScheduleStore(url, logger, encoder);

// temp
this.locks = new LocalLockStore();
this.locks = new RemoteLockStore(url, logger);
}
}

Expand Down Expand Up @@ -496,3 +494,97 @@ function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules:
obj.schedules.every(isSchedule)
);
}

export class RemoteLockStore implements ILockStore {
private url: string;
private heartbeatInterval: number | null = null;

constructor(
url: string,
private logger: ILogger = new Logger(),
) {
this.url = url;
}

async tryAcquire(resourceId: string, processId: string, executionId: string): Promise<boolean> {
const lockRequest = {
resourceId: resourceId,
processId: processId,
executionId: executionId,
};

const acquired = call<boolean>(
`${this.url}/locks/acquire`,
(b: unknown): b is boolean => typeof b === "boolean",
{
method: "post",
body: JSON.stringify(lockRequest),
},
this.logger,
);

if (await acquired) {
if (this.heartbeatInterval === null) {
this.startHeartbeat(processId);
}
return true;
}
return false;
}

async release(resourceId: string, executionId: string): Promise<void> {
const releaseLockRequest = {
resource_id: resourceId,
execution_id: executionId,
};

call<void>(
`${this.url}/locks/release`,
(response: unknown): response is void => response === undefined,
{
method: "post",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(releaseLockRequest),
},
this.logger,
);
}

private startHeartbeat(processId: string): void {
this.heartbeatInterval = +setInterval(async () => {
const locksAffected = await this.heartbeat(processId);
if (locksAffected === 0) {
this.stopHeartbeat();
}
}, 5000); // desired heartbeat interval (in ms)
}

private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = 0;
vaibhawvipul marked this conversation as resolved.
Show resolved Hide resolved
}
}

private async heartbeat(processId: string): Promise<number> {
const heartbeatRequest = {
process_id: processId,
timeout: 0,
};

return call<number>(
`${this.url}/locks/heartbeat`,
(locksAffected: unknown): locksAffected is number => typeof locksAffected === "number",
{
method: "post",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(heartbeatRequest),
},
this.logger,
);
}
}
2 changes: 1 addition & 1 deletion test/lock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe("Lock", () => {
expect(sharedResource.length).toBe(1);

// release lock so p2 can run
store.locks.release("write/id", sharedResource[0]);
await store.locks.release("write/id", sharedResource[0]);

const r = await p2;

Expand Down
Loading