Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace DurablePromise union type with single type #128

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/core/promises/promises.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IEncoder } from "../encoder";
import { ErrorCodes, ResonateError } from "../errors";
import { IPromiseStore } from "../store";
import { PendingPromise, ResolvedPromise, RejectedPromise, CanceledPromise, TimedoutPromise } from "./types";
import { DurablePromiseRecord } from "./types";

/**
* Durable Promise create options.
Expand Down Expand Up @@ -69,7 +69,7 @@ export class DurablePromise<T> {
constructor(
private store: IPromiseStore,
private encoder: IEncoder<unknown, string | undefined>,
private promise: PendingPromise | ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise,
private promise: DurablePromiseRecord,
) {
this.completed = new Promise((resolve) => {
this.complete = resolve;
Expand Down
115 changes: 17 additions & 98 deletions lib/core/promises/types.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,5 @@
export type DurablePromise = PendingPromise | ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise;

export type PendingPromise = {
state: "PENDING";
id: string;
timeout: number;
param: {
headers: Record<string, string> | undefined;
data: string | undefined;
};
value: {
headers: undefined;
data: undefined;
};
createdOn: number;
completedOn: undefined;
idempotencyKeyForCreate: string | undefined;
idempotencyKeyForComplete: undefined;
tags: Record<string, string> | undefined;
};

export type ResolvedPromise = {
state: "RESOLVED";
id: string;
timeout: number;
param: {
headers: Record<string, string> | undefined;
data: string | undefined;
};
value: {
headers: Record<string, string> | undefined;
data: string | undefined;
};
createdOn: number;
completedOn: number;
idempotencyKeyForCreate: string | undefined;
idempotencyKeyForComplete: string | undefined;
tags: Record<string, string> | undefined;
};

export type RejectedPromise = {
state: "REJECTED";
export type DurablePromiseRecord = {
state: "PENDING" | "RESOLVED" | "REJECTED" | "REJECTED_CANCELED" | "REJECTED_TIMEDOUT";
id: string;
timeout: number;
param: {
Expand All @@ -51,53 +11,14 @@ export type RejectedPromise = {
data: string | undefined;
};
createdOn: number;
completedOn: number;
completedOn: number | undefined;
idempotencyKeyForCreate: string | undefined;
idempotencyKeyForComplete: string | undefined;
tags: Record<string, string> | undefined;
};

export type CanceledPromise = {
state: "REJECTED_CANCELED";
id: string;
timeout: number;
param: {
headers: Record<string, string> | undefined;
data: string | undefined;
};
value: {
headers: Record<string, string> | undefined;
data: string | undefined;
};
createdOn: number;
completedOn: number;
idempotencyKeyForCreate: string | undefined;
idempotencyKeyForComplete: string | undefined;
tags: Record<string, string> | undefined;
};

export type TimedoutPromise = {
state: "REJECTED_TIMEDOUT";
id: string;
timeout: number;
param: {
headers: Record<string, string> | undefined;
data: string | undefined;
};
value: {
headers: undefined;
data: undefined;
};
createdOn: number;
completedOn: number;
idempotencyKeyForCreate: string | undefined;
idempotencyKeyForComplete: undefined;
tags: Record<string, string> | undefined;
};

// Type guards

export function isDurablePromise(p: unknown): p is DurablePromise {
// This is an unsound type guard, we should be more strict in what we call a DurablePromise
export function isDurablePromiseRecord(p: unknown): p is DurablePromiseRecord {
return (
p !== null &&
typeof p === "object" &&
Expand All @@ -107,28 +28,26 @@ export function isDurablePromise(p: unknown): p is DurablePromise {
);
}

export function isPendingPromise(p: unknown): p is PendingPromise {
return isDurablePromise(p) && p.state === "PENDING";
export function isPendingPromise(p: DurablePromiseRecord): boolean {
return p.state === "PENDING";
}

export function isResolvedPromise(p: unknown): p is ResolvedPromise {
return isDurablePromise(p) && p.state === "RESOLVED";
export function isResolvedPromise(p: DurablePromiseRecord): boolean {
return p.state === "RESOLVED";
}

export function isRejectedPromise(p: unknown): p is RejectedPromise {
return isDurablePromise(p) && p.state === "REJECTED";
export function isRejectedPromise(p: DurablePromiseRecord): boolean {
return p.state === "REJECTED";
}

export function isCanceledPromise(p: unknown): p is CanceledPromise {
return isDurablePromise(p) && p.state === "REJECTED_CANCELED";
export function isCanceledPromise(p: DurablePromiseRecord): boolean {
return p.state === "REJECTED_CANCELED";
}

export function isTimedoutPromise(p: unknown): p is TimedoutPromise {
return isDurablePromise(p) && p.state === "REJECTED_TIMEDOUT";
export function isTimedoutPromise(p: DurablePromiseRecord): boolean {
return p.state === "REJECTED_TIMEDOUT";
}

export function isCompletedPromise(
p: unknown,
): p is ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise {
return isDurablePromise(p) && ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state);
export function isCompletedPromise(p: DurablePromiseRecord): boolean {
return ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state);
}
2 changes: 2 additions & 0 deletions lib/core/storages/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { IStorage } from "../storage";
export class MemoryStorage<T> implements IStorage<T> {
private items: Record<string, T> = {};

// read-modify-write
async rmw<X extends T | undefined>(id: string, func: (item: T | undefined) => X): Promise<X> {
const item = func(this.items[id]);
if (item) {
Expand All @@ -12,6 +13,7 @@ export class MemoryStorage<T> implements IStorage<T> {
return item;
}

// read-modify-delete
async rmd(id: string, func: (item: T) => boolean): Promise<boolean> {
const item = this.items[id];
let result = false;
Expand Down
32 changes: 12 additions & 20 deletions lib/core/storages/withTimeout.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
import { DurablePromise, ResolvedPromise, TimedoutPromise, isPendingPromise } from "../promises/types";
import { DurablePromiseRecord, isPendingPromise } from "../promises/types";
import { IStorage } from "../storage";
import { MemoryStorage } from "./memory";

export class WithTimeout implements IStorage<DurablePromise> {
constructor(private storage: IStorage<DurablePromise> = new MemoryStorage<DurablePromise>()) {}
export class WithTimeout implements IStorage<DurablePromiseRecord> {
constructor(private storage: IStorage<DurablePromiseRecord> = new MemoryStorage<DurablePromiseRecord>()) {}

rmw<T extends DurablePromise | undefined>(id: string, func: (item: DurablePromise | undefined) => T): Promise<T> {
rmw<T extends DurablePromiseRecord | undefined>(
id: string,
func: (item: DurablePromiseRecord | undefined) => T,
): Promise<T> {
return this.storage.rmw(id, (p) => func(p ? timeout(p) : undefined));
}

rmd(id: string, func: (item: DurablePromise) => boolean): Promise<boolean> {
rmd(id: string, func: (item: DurablePromiseRecord) => boolean): Promise<boolean> {
return this.storage.rmd(id, (p) => func(timeout(p)));
}

async *all(): AsyncGenerator<DurablePromise[], void> {
async *all(): AsyncGenerator<DurablePromiseRecord[], void> {
for await (const promises of this.storage.all()) {
yield promises.map(timeout);
}
}
}

function timeout<T extends DurablePromise>(promise: T): T | ResolvedPromise | TimedoutPromise {
function timeout<T extends DurablePromiseRecord>(promise: T): DurablePromiseRecord {
if (isPendingPromise(promise) && Date.now() >= promise.timeout) {
const body = {
return {
state: promise.tags?.["resonate:timeout"] === "true" ? "RESOLVED" : "REJECTED_TIMEDOUT",
id: promise.id,
timeout: promise.timeout,
param: promise.param,
Expand All @@ -36,18 +40,6 @@ function timeout<T extends DurablePromise>(promise: T): T | ResolvedPromise | Ti
idempotencyKeyForComplete: undefined,
tags: promise.tags,
};

if (promise.tags?.["resonate:timeout"] === "true") {
return {
state: "RESOLVED",
...body,
};
} else {
return {
state: "REJECTED_TIMEDOUT",
...body,
};
}
}

return promise;
Expand Down
21 changes: 7 additions & 14 deletions lib/core/store.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
import {
DurablePromise,
PendingPromise,
ResolvedPromise,
RejectedPromise,
CanceledPromise,
TimedoutPromise,
} from "./promises/types";
import { DurablePromiseRecord } from "./promises/types";

import { Schedule } from "./schedules/types";

Expand Down Expand Up @@ -42,7 +35,7 @@ export interface IPromiseStore {
data: string | undefined,
timeout: number,
tags: Record<string, string> | undefined,
): Promise<PendingPromise | CanceledPromise | ResolvedPromise | RejectedPromise | TimedoutPromise>;
): Promise<DurablePromiseRecord>;

/**
* Cancels a new promise.
Expand All @@ -60,7 +53,7 @@ export interface IPromiseStore {
strict: boolean,
headers: Record<string, string> | undefined,
data: string | undefined,
): Promise<CanceledPromise | ResolvedPromise | RejectedPromise | TimedoutPromise>;
): Promise<DurablePromiseRecord>;

/**
* Resolves a promise.
Expand All @@ -78,7 +71,7 @@ export interface IPromiseStore {
strict: boolean,
headers: Record<string, string> | undefined,
data: string | undefined,
): Promise<CanceledPromise | ResolvedPromise | RejectedPromise | TimedoutPromise>;
): Promise<DurablePromiseRecord>;

/**
* Rejects a promise
Expand All @@ -96,15 +89,15 @@ export interface IPromiseStore {
strict: boolean,
headers: Record<string, string> | undefined,
data: string | undefined,
): Promise<CanceledPromise | ResolvedPromise | RejectedPromise | TimedoutPromise>;
): Promise<DurablePromiseRecord>;

/**
* Retrieves a promise based on its id.
*
* @param id Unique identifier for the promise to be retrieved.
* @returns A durable promise that is pending, canceled, resolved, or rejected.
*/
get(id: string): Promise<DurablePromise>;
get(id: string): Promise<DurablePromiseRecord>;

/**
* Search for promises.
Expand All @@ -120,7 +113,7 @@ export interface IPromiseStore {
state: string | undefined,
tags: Record<string, string> | undefined,
limit?: number,
): AsyncGenerator<DurablePromise[], void>;
): AsyncGenerator<DurablePromiseRecord[], void>;
}

/**
Expand Down
23 changes: 9 additions & 14 deletions lib/core/stores/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@ import { ILogger } from "../logger";
import { Logger } from "../loggers/logger";
import { StoreOptions } from "../options";
import {
DurablePromise,
PendingPromise,
ResolvedPromise,
RejectedPromise,
CanceledPromise,
TimedoutPromise,
DurablePromiseRecord,
isPendingPromise,
isResolvedPromise,
isRejectedPromise,
Expand All @@ -34,7 +29,7 @@ export class LocalStore implements IStore {

constructor(
opts: Partial<StoreOptions> = {},
promiseStorage: IStorage<DurablePromise> = new WithTimeout(new MemoryStorage<DurablePromise>()),
promiseStorage: IStorage<DurablePromiseRecord> = new WithTimeout(new MemoryStorage<DurablePromiseRecord>()),
scheduleStorage: IStorage<Schedule> = new MemoryStorage<Schedule>(),
lockStorage: IStorage<{ id: string; eid: string }> = new MemoryStorage<{ id: string; eid: string }>(),
) {
Expand Down Expand Up @@ -122,7 +117,7 @@ export class LocalStore implements IStore {
export class LocalPromiseStore implements IPromiseStore {
constructor(
private store: LocalStore,
private storage: IStorage<DurablePromise>,
private storage: IStorage<DurablePromiseRecord>,
) {}

async create(
Expand All @@ -133,7 +128,7 @@ export class LocalPromiseStore implements IPromiseStore {
data: string | undefined,
timeout: number,
tags: Record<string, string> | undefined,
): Promise<PendingPromise | ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise> {
): Promise<DurablePromiseRecord> {
return this.storage.rmw(id, (promise) => {
if (!promise) {
return {
Expand Down Expand Up @@ -174,7 +169,7 @@ export class LocalPromiseStore implements IPromiseStore {
strict: boolean,
headers: Record<string, string> | undefined,
data: string | undefined,
): Promise<ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise> {
): Promise<DurablePromiseRecord> {
return this.storage.rmw(id, (promise) => {
if (!promise) {
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
Expand Down Expand Up @@ -219,7 +214,7 @@ export class LocalPromiseStore implements IPromiseStore {
strict: boolean,
headers: Record<string, string> | undefined,
data: string | undefined,
): Promise<ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise> {
): Promise<DurablePromiseRecord> {
return this.storage.rmw(id, (promise) => {
if (!promise) {
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
Expand Down Expand Up @@ -264,7 +259,7 @@ export class LocalPromiseStore implements IPromiseStore {
strict: boolean,
headers: Record<string, string> | undefined,
data: string | undefined,
): Promise<ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise> {
): Promise<DurablePromiseRecord> {
return this.storage.rmw(id, (promise) => {
if (!promise) {
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
Expand Down Expand Up @@ -303,7 +298,7 @@ export class LocalPromiseStore implements IPromiseStore {
});
}

async get(id: string): Promise<DurablePromise> {
async get(id: string): Promise<DurablePromiseRecord> {
const promise = await this.storage.rmw(id, (p) => p);

if (!promise) {
Expand All @@ -318,7 +313,7 @@ export class LocalPromiseStore implements IPromiseStore {
state?: string,
tags?: Record<string, string>,
limit?: number,
): AsyncGenerator<DurablePromise[], void> {
): AsyncGenerator<DurablePromiseRecord[], void> {
// filter the promises returned from all storage
const regex = new RegExp(id.replaceAll("*", ".*"));
const states = searchStates(state);
Expand Down
Loading
Loading