Skip to content

Commit

Permalink
preliminary support for a job queue (#1212)
Browse files Browse the repository at this point in the history
Grist has needed a job queue for some time. This adds one, using
BullMQ. BullMQ however requires Redis, meaning we couldn't use
jobs for the large subset of Grist that needs to be runnable without
Redis (e.g. for use on desktop, or on simple self-hosted sites).
So simple immediate, delayed, and repeated jobs are supported also
in a crude single-process form when Redis is not available.

This code isn't ready for actual use since an important issue
remains to be worked out, specifically how to handle draining
the queue during deployments to avoid mixing versions (or - if
allowing mixed versions - thinking through any extra support needed
for the developer to avoid introducing hard-to-test code paths).
  • Loading branch information
paulfitz authored Sep 25, 2024
1 parent 74248ef commit 36722c1
Show file tree
Hide file tree
Showing 6 changed files with 638 additions and 1 deletion.
11 changes: 11 additions & 0 deletions app/server/lib/FlexServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {DocWorkerInfo, IDocWorkerMap} from 'app/server/lib/DocWorkerMap';
import {expressWrap, jsonErrorHandler, secureJsonErrorHandler} from 'app/server/lib/expressWrap';
import {Hosts, RequestWithOrg} from 'app/server/lib/extractOrg';
import {addGoogleAuthEndpoint} from "app/server/lib/GoogleAuth";
import {GristBullMQJobs, GristJobs} from 'app/server/lib/GristJobs';
import {DocTemplate, GristLoginMiddleware, GristLoginSystem, GristServer,
RequestWithGrist} from 'app/server/lib/GristServer';
import {initGristSessions, SessionStore} from 'app/server/lib/gristSessions';
Expand Down Expand Up @@ -186,6 +187,7 @@ export class FlexServer implements GristServer {
private _isReady: boolean = false;
private _updateManager: UpdateManager;
private _sandboxInfo: SandboxInfo;
private _jobs?: GristJobs;

constructor(public port: number, public name: string = 'flexServer',
public readonly options: FlexServerOptions = {}) {
Expand Down Expand Up @@ -339,6 +341,14 @@ export class FlexServer implements GristServer {
return this.server ? (this.server.address() as AddressInfo).port : this.port;
}

/**
* Get interface to job queues.
*/
public getJobs(): GristJobs {
const jobs = this._jobs || new GristBullMQJobs();
return jobs;
}

/**
* Get a url to an org that should be accessible by all signed-in users. For now, this
* returns the base URL of the personal org (typically docs[-s]).
Expand Down Expand Up @@ -943,6 +953,7 @@ export class FlexServer implements GristServer {
if (this.server) { this.server.close(); }
if (this.httpsServer) { this.httpsServer.close(); }
if (this.housekeeper) { await this.housekeeper.stop(); }
if (this._jobs) { await this._jobs.stop(); }
await this._shutdown();
if (this._accessTokens) { await this._accessTokens.close(); }
// Do this after _shutdown, since DocWorkerMap is used during shutdown.
Expand Down
339 changes: 339 additions & 0 deletions app/server/lib/GristJobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
import { makeId } from 'app/server/lib/idUtils';
import log from 'app/server/lib/log';
import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';

/**
*
* Support for queues.
*
* We use BullMQ for queuing, since it seems currently the best the
* node ecosystem has to offer. BullMQ relies on Redis. Since queuing
* is so handy, but we'd like most of Grist to be usable without Redis,
* we make some effort to support queuing without BullMQ. This
* may not be sustainable, we'll see.
*
* Important: if you put a job in a queue, it can outlast your process.
* That has implications for testing and deployment, so be careful.
*
* Long running jobs may be a challenge. BullMQ cancelation
* relies on non-open source features:
* https://docs.bullmq.io/bullmq-pro/observables/cancelation
*
*/
export interface GristJobs {
/**
* All workers and jobs are scoped to individual named queues,
* with the real interfaces operating at that level.
*/
queue(queueName?: string): GristQueueScope;

/**
* Shut everything down that we're responsible for.
* Set obliterate flag to destroy jobs even if they are
* stored externally (useful for testing).
*/
stop(options?: {
obliterate?: boolean,
}): Promise<void>;
}

/**
* For a given queue, we can add jobs, or methods to process jobs,
*/
export interface GristQueueScope {
/**
* Add a job.
*/
add(name: string, data: any, options?: JobAddOptions): Promise<void>;


/**
* Add a job handler for all jobs regardless of name.
* Handlers given by handleName take priority, but no
* job handling will happen until handleDefault has been
* called.
*/
handleDefault(defaultCallback: JobHandler): void;

/**
* Add a job handler for jobs with a specific name.
* Handler will only be effective once handleAll is called
* to specify what happens to jobs not matching expected
* names.
*/
handleName(name: string,
callback: (job: GristJob) => Promise<any>): void;

/**
* Shut everything down that we're responsible for.
* Set obliterate flag to destroy jobs even if they are
* stored externally (useful for testing).
*/
stop(options?: {
obliterate?: boolean,
}): Promise<void>;
}

/**
* The type of a function for handling jobs on a queue.
*/
export type JobHandler = (job: GristJob) => Promise<any>;

/**
* The name used for a queue if no specific name is given.
*/
export const DEFAULT_QUEUE_NAME = 'default';

/**
* BullMQ jobs are a string name, and then a data object.
*/
interface GristJob {
name: string;
data: any;
}

/**
* Options when adding a job. BullMQ has many more.
*/
interface JobAddOptions {
delay?: number;
jobId?: string;
repeat?: {
every: number;
}
}

/**
* Implementation for job functionality across the application.
* Will use BullMQ, with an in-memory fallback if Redis is
* unavailable.
*/
export class GristBullMQJobs implements GristJobs {
private _connection?: IORedis;
private _checkedForConnection: boolean = false;
private _queues = new Map<string, GristQueueScope>();

/**
* Get BullMQ-compatible options for the queue.
*/
public getQueueOptions() {
// Following BullMQ, queue options contain the connection
// to redis, if any.
if (!this._checkedForConnection) {
this._connect();
this._checkedForConnection = true;
}
if (!this._connection) {
return {};
}
return {
connection: this._connection,
maxRetriesPerRequest: null,
};
}

/**
* Get an interface scoped to a particular queue by name.
*/
public queue(queueName: string = DEFAULT_QUEUE_NAME): GristQueueScope {
if (!this._queues.get(queueName)) {
this._queues.set(
queueName,
new GristBullMQQueueScope(queueName, this),
);
}
return this._queues.get(queueName)!;
}

public async stop(options: {
obliterate?: boolean,
} = {}) {
for (const q of this._queues.values()) {
await q.stop(options);
}
this._queues.clear();
this._connection?.disconnect();
}

/**
* Connect to Redis if available.
*/
private _connect() {
// Connect to Redis for use with BullMQ, if REDIS_URL is set.
const urlTxt = process.env.REDIS_URL || process.env.TEST_REDIS_URL;
if (!urlTxt) {
this._connection = undefined;
log.warn('Using in-memory queues, Redis is unavailable');
return;
}
const url = new URL(urlTxt);
const conn = new IORedis({
host: url.hostname,
port: url.port ? parseInt(url.port, 10) : undefined,
db: (url.pathname.charAt(0) === '/') ?
parseInt(url.pathname.substring(1), 10) : undefined,
maxRetriesPerRequest: null,
});
this._connection = conn;
log.info('Storing queues externally in Redis');
}
}

/**
* Work with a particular named queue.
*/
export class GristBullMQQueueScope implements GristQueueScope {
private _queue: Queue|GristWorker|undefined;
private _worker: Worker|GristWorker|undefined;
private _namedProcessors: Record<string, JobHandler> = {};

public constructor(public readonly queueName: string,
private _owner: GristBullMQJobs) {}

public handleDefault(defaultCallback: JobHandler) {
// The default callback passes any recognized named jobs to
// processors added with handleName(), then, if there is no
// specific processor, calls the defaultCallback.
const callback = async (job: GristJob) => {
const processor = this._namedProcessors[job.name] || defaultCallback;
return processor(job);
};
const options = this._owner.getQueueOptions();
if (!options.connection) {
// If Redis isn't available, we go our own way, not
// using BullMQ.
const worker = new GristWorker(this.queueName, callback);
this._worker = worker;
return worker;
}
const worker = new Worker(this.queueName, callback, options);
this._worker = worker;
return worker;
}

public handleName(name: string,
callback: (job: GristJob) => Promise<any>) {
this._namedProcessors[name] = callback;
}

public async stop(options: {
obliterate?: boolean,
} = {}) {
await this._worker?.close();
if (options.obliterate) {
await this._queue?.obliterate();
}
}

public async add(name: string, data: any, options?: JobAddOptions) {
await this._getQueue().add(name, data, {
...options,
// These settings are quite arbitrary, and should be
// revised when it matters, or made controllable.
removeOnComplete: {
age: 3600, // keep up to 1 hour
count: 1000, // keep up to 1000 jobs
},
removeOnFail: {
age: 24 * 3600, // keep up to 24 hours
},
});
}

private _getQueue(): Queue|GristWorker {
if (this._queue) { return this._queue; }
const queue = this._pickQueueImplementation();
this._queue = queue;
return queue;
}

private _pickQueueImplementation() {
const name = this.queueName;
const queueOptions = this._owner.getQueueOptions();
// If we have Redis, get a proper BullMQ interface.
// Otherwise, make do.
if (queueOptions.connection) {
return new Queue(name, queueOptions);
}
// If in memory, we hand a job directly to the single worker for their
// queue. This is very crude.
const worker = this._worker;
if (!worker) {
throw new Error(`no handler yet for ${this.queueName}`);
}
// We only access workers directly when working in-memory, to
// hand jobs directly to them.
if (isBullMQWorker(worker)) {
// Not expected! Somehow we have a BullMQ worker.
throw new Error(`wrong kind of worker for ${this.queueName}`);
}
return worker;
}
}

/**
* If running in memory without Redis, all jobs need to be
* created and served by the the same process. This class
* pretends to be a BullMQ worker, but accepts jobs directly
* without any intermediate queue. This could be elaborated
* in future if needed.
*/
class GristWorker {
private _jobs: Map<string, NodeJS.Timeout> = new Map();

public constructor(public queueName: string,
private _callback: (job: GristJob) => Promise<void>) {
}

public async close() {
for (const job of this._jobs.keys()) {
// Key deletion is safe with the keys() iterator.
this._clearJob(job);
}
}

public async add(name: string, data: any, options?: JobAddOptions) {
if (options?.delay) {
if (options.repeat) {
// Unexpected combination.
throw new Error('cannot delay and repeat');
}
const jobId = options.jobId || makeId();
this._clearJob(jobId);
this._jobs.set(jobId, setTimeout(() => this._callback({name, data}),
options.delay));
return;
}
if (options?.repeat) {
const jobId = options.jobId || makeId();
this._clearJob(jobId);
this._jobs.set(jobId, setInterval(() => this._callback({name, data}),
options.repeat.every));
return;
}
await this._callback({name, data});
}

public async obliterate() {
await this.close();
}

private _clearJob(id: string) {
const job = this._jobs.get(id);
if (!job) { return; }
// We don't know if the job is a once-off or repeating,
// so we call both clearInterval and clearTimeout, which
// apparently works.
clearInterval(job);
clearTimeout(job);
this._jobs.delete(id);
}
}

/**
* Check if a worker is a real BullMQ worker, or just pretend.
*/
function isBullMQWorker(worker: Worker|GristWorker): worker is Worker {
return 'isNextJob' in worker;
}
Loading

0 comments on commit 36722c1

Please sign in to comment.