Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preliminary support for a job queue #1212

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions app/server/lib/FlexServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {DocWorkerInfo, IDocWorkerMap} from 'app/server/lib/DocWorkerMap';
import {expressWrap, jsonErrorHandler, secureJsonErrorHandler} from 'app/server/lib/expressWrap';
import {Hosts, RequestWithOrg} from 'app/server/lib/extractOrg';
import {addGoogleAuthEndpoint} from "app/server/lib/GoogleAuth";
import {GristBullMQJobs, GristJobs} from 'app/server/lib/GristJobs';
import {DocTemplate, GristLoginMiddleware, GristLoginSystem, GristServer,
RequestWithGrist} from 'app/server/lib/GristServer';
import {initGristSessions, SessionStore} from 'app/server/lib/gristSessions';
Expand Down Expand Up @@ -186,6 +187,7 @@ export class FlexServer implements GristServer {
private _isReady: boolean = false;
private _updateManager: UpdateManager;
private _sandboxInfo: SandboxInfo;
private _jobs?: GristJobs;

constructor(public port: number, public name: string = 'flexServer',
public readonly options: FlexServerOptions = {}) {
Expand Down Expand Up @@ -339,6 +341,14 @@ export class FlexServer implements GristServer {
return this.server ? (this.server.address() as AddressInfo).port : this.port;
}

/**
* Get interface to job queues.
*/
public getJobs(): GristJobs {
const jobs = this._jobs || new GristBullMQJobs();
return jobs;
}

/**
* Get a url to an org that should be accessible by all signed-in users. For now, this
* returns the base URL of the personal org (typically docs[-s]).
Expand Down Expand Up @@ -943,6 +953,7 @@ export class FlexServer implements GristServer {
if (this.server) { this.server.close(); }
if (this.httpsServer) { this.httpsServer.close(); }
if (this.housekeeper) { await this.housekeeper.stop(); }
if (this._jobs) { await this._jobs.stop(); }
await this._shutdown();
if (this._accessTokens) { await this._accessTokens.close(); }
// Do this after _shutdown, since DocWorkerMap is used during shutdown.
Expand Down
335 changes: 335 additions & 0 deletions app/server/lib/GristJobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
import { makeId } from 'app/server/lib/idUtils';
import log from 'app/server/lib/log';
import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';

/**
*
* Support for queues.
*
* We use BullMQ for queuing, since it seems currently the best the
* node ecosystem has to offer. BullMQ relies on Redis. Since queuing
* is so handy, but we'd like most of Grist to be usable without Redis,
* we make some effort to support queuing without BullMQ. This
* may not be sustainable, we'll see.
*
* Important: if you put a job in a queue, it can outlast your process.
* That has implications for testing and deployment, so be careful.
*
*/
export interface GristJobs {
/**
* All workers and jobs are scoped to individual named queues,
* with the real interfaces operating at that level.
*/
queue(queueName?: string): GristQueueScope;

/**
* Shut everything down that we're responsible for.
* Set obliterate flag to destroy jobs even if they are
* stored externally (useful for testing).
*/
stop(options?: {
obliterate?: boolean,
}): Promise<void>;
}

/**
* For a given queue, we can add jobs, or methods to process jobs,
*/
export interface GristQueueScope {
/**
* Add a job.
*/
add(name: string, data: any, options?: JobAddOptions): Promise<void>;
berhalak marked this conversation as resolved.
Show resolved Hide resolved


/**
* Add a job handler for all jobs regardless of name.
* Handlers given by handleName take priority, but no
* job handling will happen until handleDefault has been
* called.
*/
handleDefault(defaultCallback: JobHandler): void;

/**
* Add a job handler for jobs with a specific name.
* Handler will only be effective once handleAll is called
* to specify what happens to jobs not matching expected
* names.
*/
handleName(name: string,
callback: (job: GristJob) => Promise<any>): void;

/**
* Shut everything down that we're responsible for.
* Set obliterate flag to destroy jobs even if they are
* stored externally (useful for testing).
*/
stop(options?: {
obliterate?: boolean,
}): Promise<void>;
}

/**
* The type of a function for handling jobs on a queue.
*/
export type JobHandler = (job: GristJob) => Promise<any>;

/**
* The name used for a queue if no specific name is given.
*/
export const DEFAULT_QUEUE_NAME = 'default';

/**
* BullMQ jobs are a string name, and then a data object.
*/
interface GristJob {
name: string;
data: any;
}

/**
* Options when adding a job. BullMQ has many more.
*/
interface JobAddOptions {
delay?: number;
jobId?: string;
repeat?: {
every: number;
}
}

/**
* Implementation for job functionality across the application.
* Will use BullMQ, with an in-memory fallback if Redis is
* unavailable.
*/
export class GristBullMQJobs implements GristJobs {
private _connection?: IORedis;
private _checkedForConnection: boolean = false;
private _queues = new Map<string, GristQueueScope>();

/**
* Get BullMQ-compatible options for the queue.
*/
public getQueueOptions() {
// Following BullMQ, queue options contain the connection
// to redis, if any.
if (!this._checkedForConnection) {
this._connect();
this._checkedForConnection = true;
}
if (!this._connection) {
return {};
}
return {
connection: this._connection,
maxRetriesPerRequest: null,
};
}

/**
* Get an interface scoped to a particular queue by name.
*/
public queue(queueName: string = DEFAULT_QUEUE_NAME): GristQueueScope {
if (!this._queues.get(queueName)) {
this._queues.set(
queueName,
new GristBullMQQueueScope(queueName, this),
);
}
return this._queues.get(queueName)!;
}

public async stop(options: {
paulfitz marked this conversation as resolved.
Show resolved Hide resolved
obliterate?: boolean,
} = {}) {
for (const q of this._queues.values()) {
await q.stop(options);
}
this._queues.clear();
this._connection?.disconnect();
}

/**
* Connect to Redis if available.
*/
private _connect() {
// Connect to Redis for use with BullMQ, if REDIS_URL is set.
const urlTxt = process.env.REDIS_URL || process.env.TEST_REDIS_URL;
if (!urlTxt) {
this._connection = undefined;
log.warn('Using in-memory queues, Redis is unavailable');
return;
}
const url = new URL(urlTxt);
const conn = new IORedis({
host: url.hostname,
port: url.port ? parseInt(url.port, 10) : undefined,
db: (url.pathname.charAt(0) === '/') ?
parseInt(url.pathname.substring(1), 10) : undefined,
maxRetriesPerRequest: null,
});
this._connection = conn;
log.info('Storing queues externally in Redis');
}
}

/**
* Work with a particular named queue.
*/
export class GristBullMQQueueScope implements GristQueueScope {
private _queue: Queue|GristWorker|undefined;
private _worker: Worker|GristWorker|undefined;
private _namedProcessors: Record<string, JobHandler> = {};

public constructor(public readonly queueName: string,
private _owner: GristBullMQJobs) {}

public handleDefault(defaultCallback: JobHandler) {
// The default callback passes any recognized named jobs to
// processors added with handleName(), then, if there is no
// specific processor, calls the defaultCallback.
const callback = async (job: GristJob) => {
const processor = this._namedProcessors[job.name] || defaultCallback;
return processor(job);
};
const options = this._owner.getQueueOptions();
if (!options.connection) {
// If Redis isn't available, we go our own way, not
// using BullMQ.
const worker = new GristWorker(this.queueName, callback);
this._worker = worker;
return worker;
}
const worker = new Worker(this.queueName, callback, options);
this._worker = worker;
return worker;
}

public handleName(name: string,
callback: (job: GristJob) => Promise<any>) {
this._namedProcessors[name] = callback;
}

public async stop(options: {
obliterate?: boolean,
} = {}) {
await this._worker?.close();
if (options.obliterate) {
await this._queue?.obliterate();
}
}

public async add(name: string, data: any, options?: JobAddOptions) {
await this._getQueue().add(name, data, {
...options,
// These settings are quite arbitrary, and should be
// revised when it matters, or made controllable.
removeOnComplete: {
age: 3600, // keep up to 1 hour
count: 1000, // keep up to 1000 jobs
},
removeOnFail: {
age: 24 * 3600, // keep up to 24 hours
},
});
}

private _getQueue(): Queue|GristWorker {
if (this._queue) { return this._queue; }
const queue = this._pickQueueImplementation();
this._queue = queue;
return queue;
}

private _pickQueueImplementation() {
const name = this.queueName;
const queueOptions = this._owner.getQueueOptions();
// If we have Redis, get a proper BullMQ interface.
// Otherwise, make do.
if (queueOptions.connection) {
return new Queue(name, queueOptions);
}
// If in memory, we hand a job directly to the single worker for their
// queue. This is very crude.
const worker = this._worker;
if (!worker) {
throw new Error(`no handler yet for ${this.queueName}`);
}
// We only access workers directly when working in-memory, to
// hand jobs directly to them.
if (isBullMQWorker(worker)) {
// Not expected! Somehow we have a BullMQ worker.
throw new Error(`wrong kind of worker for ${this.queueName}`);
}
return worker;
}
}

/**
* If running in memory without Redis, all jobs need to be
* created and served by the the same process. This class
* pretends to be a BullMQ worker, but accepts jobs directly
* without any intermediate queue. This could be elaborated
* in future if needed.
*/
class GristWorker {
private _jobs: Map<string, NodeJS.Timeout> = new Map();

public constructor(public queueName: string,
private _callback: (job: GristJob) => Promise<void>) {
}

public async close() {
for (const job of this._jobs.keys()) {
// Key deletion is safe with the keys() iterator.
this._clearJob(job);
}
}

public async add(name: string, data: any, options?: JobAddOptions) {
if (options?.delay) {
if (options.repeat) {
// Unexpected combination.
throw new Error('cannot delay and repeat');
}
const jobId = options.jobId || makeId();
this._clearJob(jobId);
this._jobs.set(jobId, setTimeout(() => this._callback({name, data}),
options.delay));
return;
}
if (options?.repeat) {
const jobId = options.jobId || makeId();
this._clearJob(jobId);
this._jobs.set(jobId, setInterval(() => this._callback({name, data}),
options.repeat.every));
return;
}
await this._callback({name, data});
}

public async obliterate() {
await this.close();
}

private _clearJob(id: string) {
const job = this._jobs.get(id);
if (!job) { return; }
// We don't know if the job is a once-off or repeating,
// so we call both clearInterval and clearTimeout, which
// apparently works.
clearInterval(job);
clearTimeout(job);
this._jobs.delete(id);
}
}

/**
* Check if a worker is a real BullMQ worker, or just pretend.
*/
function isBullMQWorker(worker: Worker|GristWorker): worker is Worker {
return 'isNextJob' in worker;
}
Loading
Loading