Skip to content

Commit

Permalink
Task: Implement the resume event for tasks.
Browse files Browse the repository at this point in the history
This commits include most of the initial implementation
for the task framework. At the time of this commit, only
the Resume event is being handled.
  • Loading branch information
avillega committed Aug 27, 2024
1 parent 3e49fc1 commit 8a97b4d
Show file tree
Hide file tree
Showing 16 changed files with 762 additions and 61 deletions.
1 change: 1 addition & 0 deletions .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:
- name: Run tests
env:
RESONATE_STORE_URL: http://localhost:8001
RESONATE_TASK_SOURCE_URL: http://localhost:3000/recv
run: npm test -- --verbose

- name: Upload coverage report to Codecov
Expand Down
7 changes: 7 additions & 0 deletions lib/core/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,15 @@ export type ResonateOptions = {
/**
* The remote promise store url. If not provided, an in-memory
* promise store will be used.
* Must be set if `tasksUrl` is set.
*/
url: string;

/**
* Tasks Url to listen for tasks from the server, must be a valid http url.
* Default port 3000. Must be set if `url` is set.
*/
tasksUrl: string;
};

/**
Expand Down
13 changes: 9 additions & 4 deletions lib/core/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export function isRetryPolicy(value: unknown): value is RetryPolicy {
export function exponential(
initialDelayMs: number = 100,
backoffFactor: number = 2,
maxAttempts: number = Infinity,
maxAttempts: number = -1,
maxDelayMs: number = 60000,
): Exponential {
return {
Expand All @@ -71,7 +71,7 @@ export function exponential(
};
}

export function linear(delayMs: number = 1000, maxAttempts: number = Infinity): Linear {
export function linear(delayMs: number = 1000, maxAttempts: number = -1): Linear {
return {
kind: "linear",
delayMs,
Expand Down Expand Up @@ -157,25 +157,30 @@ export async function runWithRetry<T>(
throw error;
}

// Maps every of the supported retry policies to have the same
// fields so we can reuse the same function for retries
function retryDefaults(retryPolicy: RetryPolicy): {
initialDelay: number;
backoffFactor: number;
maxAttempts: number;
maxDelay: number;
} {
let maxAttemps;
switch (retryPolicy.kind) {
case "exponential":
maxAttemps = retryPolicy.maxAttempts === -1 ? Infinity : retryPolicy.maxAttempts
return {
initialDelay: retryPolicy.initialDelayMs,
backoffFactor: retryPolicy.backoffFactor,
maxAttempts: retryPolicy.maxAttempts,
maxAttempts: maxAttemps,
maxDelay: retryPolicy.maxDelayMs,
};
case "linear":
maxAttemps = retryPolicy.maxAttempts === -1 ? Infinity : retryPolicy.maxAttempts
return {
initialDelay: retryPolicy.delayMs,
backoffFactor: 1,
maxAttempts: retryPolicy.maxAttempts,
maxAttempts: maxAttemps,
maxDelay: retryPolicy.delayMs,
};
case "never":
Expand Down
39 changes: 39 additions & 0 deletions lib/core/store.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DurablePromiseRecord } from "./promises/types";

import { Schedule } from "./schedules/types";
import { ResumeBody } from "./tasks";

/**
* Store Interface
Expand All @@ -9,6 +10,13 @@ export interface IStore {
readonly promises: IPromiseStore;
readonly schedules: IScheduleStore;
readonly locks: ILockStore;
readonly callbacks: ICallbackStore;
readonly tasks: ITaskStore;

/**
* Do neccesary clean up and free of resources on stop
*/
stop(): void;
}

/**
Expand Down Expand Up @@ -195,3 +203,34 @@ export interface ILockStore {
*/
release(id: string, eid: string): Promise<boolean>;
}

/**
* Task Store API
*/
export interface ITaskStore {
/**
* Claims a task.
*
* @param tasId Id of the task to claim.
* @param count Count of the task to claim.
* @returns A boolean indicating whether or not the task was claim.
*
*/
claim(taskId: string, count: number): Promise<ResumeBody>;

/**
* Completes the task.
*
* @param taskId Id of the task to complete.
* @param count Count of the task to claim.
*
*/
complete(taskId: string, count: number): Promise<boolean>;
}

/**
* Callback Store API
*/
export interface ICallbackStore {
create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<boolean>;
}
121 changes: 119 additions & 2 deletions lib/core/stores/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,64 @@ import {
isRejectedPromise,
isCanceledPromise,
isTimedoutPromise,
isCompletedPromise,
} from "../promises/types";
import { Schedule } from "../schedules/types";
import { IStorage } from "../storage";
import { MemoryStorage } from "../storages/memory";
import { WithTimeout } from "../storages/withTimeout";
import { IStore, IPromiseStore, IScheduleStore, ILockStore } from "../store";
import { IStore, IPromiseStore, IScheduleStore, ILockStore, ICallbackStore, ITaskStore } from "../store";
import { ResumeBody, isResumeBody } from "../tasks";
import { LocalTasksSource } from "../tasksSources/local";

export class LocalStore implements IStore {
public promises: LocalPromiseStore;
public schedules: LocalScheduleStore;
public locks: LocalLockStore;
public callbacks: LocalCallbackStore;
public tasks: LocalTaskStore;

public readonly logger: ILogger;

private toSchedule: Schedule[] = [];
private next: number | undefined = undefined;
private tasksSource: LocalTasksSource;
private callbacksTimeout: NodeJS.Timeout | undefined;

constructor(
tasksSource: LocalTasksSource,
opts: Partial<StoreOptions> = {},
promiseStorage: IStorage<DurablePromiseRecord> = new WithTimeout(new MemoryStorage<DurablePromiseRecord>()),
scheduleStorage: IStorage<Schedule> = new MemoryStorage<Schedule>(),
lockStorage: IStorage<{ id: string; eid: string }> = new MemoryStorage<{ id: string; eid: string }>(),
callbacksStorage: IStorage<{ id: string; data: string }> = new MemoryStorage<{
id: string;
data: string;
}>(),
taskStorage: IStorage<{ id: string; counter: number; data: string }> = new MemoryStorage<{
id: string;
counter: number;
data: string;
}>(),
) {
this.callbacks = new LocalCallbackStore(this, callbacksStorage);
this.promises = new LocalPromiseStore(this, promiseStorage);
this.schedules = new LocalScheduleStore(this, scheduleStorage);
this.locks = new LocalLockStore(this, lockStorage);
this.tasks = new LocalTaskStore(this, taskStorage);

this.logger = opts.logger ?? new Logger();
this.tasksSource = tasksSource;

this.init();
this.handleCallbacks();
}

stop(): void {
clearTimeout(this.next);
this.next = undefined;
clearTimeout(this.callbacksTimeout);
this.callbacksTimeout = undefined;
}

// handler the schedule store can call
Expand Down Expand Up @@ -112,6 +140,28 @@ export class LocalStore implements IStore {
.replace("{{.id}}", schedule.id)
.replace("{{.timestamp}}", schedule.nextRunTime.toString());
}

// Handles all the callbacks
async handleCallbacks() {
clearTimeout(this.callbacksTimeout);

this.callbacksTimeout = setInterval(async () => {
const callbacksToDelete = [];
for await (const callbacks of this.callbacks.getAll()) {
for (const callback of callbacks) {
const promise = await this.promises.get(callback.id);
if (isCompletedPromise(promise)) {
const task = await this.tasks.create(promise.id, callback.data);
this.tasksSource.emitTask(task);
callbacksToDelete.push(callback);
}
}
}
for (const callback of callbacksToDelete) {
await this.callbacks.delete(callback.id);
}
}, 500);
}
}

export class LocalPromiseStore implements IPromiseStore {
Expand Down Expand Up @@ -170,7 +220,7 @@ export class LocalPromiseStore implements IPromiseStore {
headers: Record<string, string> | undefined,
data: string | undefined,
): Promise<DurablePromiseRecord> {
return this.storage.rmw(id, (promise) => {
return await this.storage.rmw(id, (promise) => {
if (!promise) {
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
}
Expand Down Expand Up @@ -492,6 +542,73 @@ export class LocalLockStore implements ILockStore {
}
}

export class LocalCallbackStore implements ICallbackStore {
constructor(
private store: LocalStore,
private storage: IStorage<{ id: string; data: string }>,
) {}

async create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<boolean> {
await this.storage.rmw(promiseId, (callback) => {
if (!callback) {
return {
id: promiseId,
data: data ?? "",
};
}
});
return true;
}

async get(promiseId: string): Promise<{ id: string; data: string } | undefined> {
return this.storage.rmw(promiseId, (callback) => {
if (callback) return callback;
});
}

getAll(): AsyncGenerator<{ id: string; data: string }[], void, unknown> {
return this.storage.all();
}

async delete(callbackId: string): Promise<boolean> {
return await this.storage.rmd(callbackId, (callback) => callback.id === callbackId);
}
}

export class LocalTaskStore implements ITaskStore {
constructor(
private store: LocalStore,
private storage: IStorage<{ id: string; counter: number; data: string }>,
) {}

create(taskId: string, data: string): Promise<{ id: string; counter: number }> {
return this.storage.rmw(taskId, (task) => {
return task ? task : { id: taskId, counter: 0, data: data };
});
}

async claim(taskId: string, count: number): Promise<ResumeBody> {
const task = await this.storage.rmw(taskId, (task) => {
if (task) {
return task;
}
});
if (!task) {
throw new ResonateError("Task not found", ErrorCodes.STORE_NOT_FOUND);
}

const resumeBody = JSON.parse(task.data);
if (!isResumeBody(resumeBody)) {
throw new ResonateError("Invalid response", ErrorCodes.STORE_PAYLOAD, resumeBody);
}
return resumeBody;
}

complete(taskId: string, count: number): Promise<boolean> {
return this.storage.rmd(taskId, (task) => task.id === taskId);
}
}

// Utils

function searchStates(state: string | undefined): string[] {
Expand Down
Loading

0 comments on commit 8a97b4d

Please sign in to comment.