Skip to content

Commit

Permalink
Remote Lock Interface (#29)
Browse files Browse the repository at this point in the history
* initial implementation of lock interface in remote and local mode

* making the interface async to support remote lock class

* moving remotelockstore to stores class

* lint fixes

* removing unused imports

* using remotelockstore in remote store

* removing comments

* removing lock counter from memory and using the val from api response

* nit fix
  • Loading branch information
vaibhawvipul authored Jan 25, 2024
1 parent 4997eea commit a4bc908
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 6 deletions.
102 changes: 97 additions & 5 deletions lib/core/stores/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,20 @@ import { IStore, IPromiseStore, IScheduleStore } from "../store";
import { IEncoder } from "../encoder";
import { Base64Encoder } from "../encoders/base64";
import { ErrorCodes, ResonateError } from "../error";
import { ILockStore } from "../store";
import { ILogger } from "../logger";
import { Schedule, isSchedule } from "../schedule";
import { Logger } from "../loggers/logger";
import { LocalLockStore } from "./local";

export class RemoteStore implements IStore {
public promises: RemotePromiseStore;
public schedules: RemoteScheduleStore;
public locks: LocalLockStore;
public locks: RemoteLockStore;

constructor(url: string, logger: ILogger, encoder: IEncoder<string, string> = new Base64Encoder()) {
this.promises = new RemotePromiseStore(url, logger, encoder);
this.schedules = new RemoteScheduleStore(url, logger, encoder);

// temp
this.locks = new LocalLockStore();
this.locks = new RemoteLockStore(url, logger);
}
}

Expand Down Expand Up @@ -496,3 +494,97 @@ function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules:
obj.schedules.every(isSchedule)
);
}

export class RemoteLockStore implements ILockStore {
private url: string;
private heartbeatInterval: number | null = null;

constructor(
url: string,
private logger: ILogger = new Logger(),
) {
this.url = url;
}

async tryAcquire(resourceId: string, processId: string, executionId: string): Promise<boolean> {
const lockRequest = {
resourceId: resourceId,
processId: processId,
executionId: executionId,
};

const acquired = call<boolean>(
`${this.url}/locks/acquire`,
(b: unknown): b is boolean => typeof b === "boolean",
{
method: "post",
body: JSON.stringify(lockRequest),
},
this.logger,
);

if (await acquired) {
if (this.heartbeatInterval === null) {
this.startHeartbeat(processId);
}
return true;
}
return false;
}

async release(resourceId: string, executionId: string): Promise<void> {
const releaseLockRequest = {
resource_id: resourceId,
execution_id: executionId,
};

call<void>(
`${this.url}/locks/release`,
(response: unknown): response is void => response === undefined,
{
method: "post",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(releaseLockRequest),
},
this.logger,
);
}

private startHeartbeat(processId: string): void {
this.heartbeatInterval = +setInterval(async () => {
const locksAffected = await this.heartbeat(processId);
if (locksAffected === 0) {
this.stopHeartbeat();
}
}, 5000); // desired heartbeat interval (in ms)
}

private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}

private async heartbeat(processId: string): Promise<number> {
const heartbeatRequest = {
process_id: processId,
timeout: 0,
};

return call<number>(
`${this.url}/locks/heartbeat`,
(locksAffected: unknown): locksAffected is number => typeof locksAffected === "number",
{
method: "post",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(heartbeatRequest),
},
this.logger,
);
}
}
2 changes: 1 addition & 1 deletion test/lock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe("Lock", () => {
expect(sharedResource.length).toBe(1);

// release lock so p2 can run
store.locks.release("write/id", sharedResource[0]);
await store.locks.release("write/id", sharedResource[0]);

const r = await p2;

Expand Down

0 comments on commit a4bc908

Please sign in to comment.