Skip to content

Commit

Permalink
Pass x- headers to queue driver on message publish.
Browse files Browse the repository at this point in the history
  • Loading branch information
llambeau committed May 10, 2024
1 parent dbf26f6 commit 0bdb8f1
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/config/schema.fio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ ID = String(s | /[a-z]+[a-z_-]+/.test(s))
#### QUEUE
AMQP.Exchange.Type = String :: { "topic", "direct", "fanout" }
AMQP.Exchange.Type = String :: { "topic", "direct", "fanout", "x-delayed-message" }
AMQP.Exchange = {
name : String
type :? AMQP.Exchange.Type
default :? Boolean
options :? {
durable: Boolean
...: .
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/jobs/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ export default class JobDispatcher {
dispatched: new Date(),
})));

const promises = jobs.map(job => Arnavon.queue.push(jobName, job));
delete options['strict'];

const promises = jobs.map(job => Arnavon.queue.push(jobName, job, options));

return Promise.all(promises)
.then(() => jobs);
Expand All @@ -109,7 +111,7 @@ export default class JobDispatcher {
return this.jobs[metadata.jobName as string].validator;
}

dispatch(jobName: string, data: unknown, meta = {}) {
dispatch(jobName: string, data: unknown, meta = {}, extraOptions = {}) {
const jobConfig = this.jobs[jobName];
if (!jobConfig) {
this.#counters.unknown.inc({ jobName });
Expand All @@ -133,7 +135,7 @@ export default class JobDispatcher {
dispatched: new Date(),
}));

return Arnavon.queue.push(jobName, job)
return Arnavon.queue.push(jobName, job, extraOptions)
.then(() => job);
}

Expand Down
6 changes: 4 additions & 2 deletions src/queue/drivers/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export type AMQPQueueConfig = {

export type AQMPPushOptions = {
exchange: string
headers?: Record<string, unknown>
}

class AmqpQueue extends Queue {
Expand Down Expand Up @@ -182,12 +183,13 @@ class AmqpQueue extends Queue {
return this;
}

_push(key: string, data: unknown, { exchange }: AQMPPushOptions) {
_push(key: string, data: unknown, { exchange, headers }: AQMPPushOptions) {
if (!this.#channel) {
throw new Error('Cannot push, no channel found');
}
const payload = Buffer.from(JSON.stringify(data));
const options = { persistent: true };
const options = { persistent: true, headers };

return new Promise((resolve, reject) => {
return (this.#channel as amqplib.ConfirmChannel)
.publish(exchange || this.#exchange, key, payload, options, (err) => {
Expand Down
1 change: 1 addition & 0 deletions src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class Queue extends EventEmitter {
// subclasses should implement _push(key, data)
push(key: string, data: unknown, opts = {}): Promise<unknown> {
logger.info(`${this.constructor.name} - Pushing to queue`, key, data);

return this._push(key, data, opts).then((job) => {
logger.info(`${this.constructor.name} - Pushed`);
return job;
Expand Down
14 changes: 13 additions & 1 deletion src/server/rest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,21 @@ export default (dispatcher: JobDispatcher) => {
});
}

// Pass all x-* headers (except x-arnavon-*) to the queue
// This allows using special rabbitmq things like delayed messages, ttl, etc
const headers = Object.keys(req.headers)
.filter(h => h.toLowerCase().startsWith('x-') && !h.toLowerCase().startsWith('x-arnavon'))
.reduce((headers, k) => {
headers[k] = req.headers[k];
return headers;
}, {});

// Decide on the dispatch mode
const dispatchFn = pushMode === 'SINGLE' ? dispatcher.dispatch : dispatcher.dispatchBatch;
dispatchFn.bind(dispatcher)(req.params.id, req.body, { id: req.id }, { strict: validationMode === 'ALL-OR-NOTHING' })
dispatchFn.bind(dispatcher)(req.params.id, req.body, { id: req.id }, {
strict: validationMode === 'ALL-OR-NOTHING',
headers,
})
.then((job) => {
return res.status(201).send(job);
})
Expand Down

0 comments on commit 0bdb8f1

Please sign in to comment.