Skip to content

Commit

Permalink
[FEAT] queue subs (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart authored May 8, 2024
1 parent 48aaad6 commit fc410a5
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 8 deletions.
5 changes: 3 additions & 2 deletions nhgc/examples/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ await nc.publish("hello", "world", {
},
});

// Implementing a service is simply a subscription that replies
// Implementing a service is simply a subscription that replies - services are usually
// in a queue group so they distribute the load
const svc = await nc.subscribe("q", async (err, msg) => {
if (err) {
console.error(err.message);
Expand All @@ -63,7 +64,7 @@ const svc = await nc.subscribe("q", async (err, msg) => {
} else {
console.log("message doesn't have a reply - ignoring");
}
});
}, { queue: "a" });

// to trigger a request:
const r = await nc.request("q", "question", {
Expand Down
18 changes: 13 additions & 5 deletions nhgc/nats.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { HttpImpl } from "./nhgc.ts";
import {
HeartbeatOpts,
Msg,
MsgCallback,
Nats,
ReviverFn,
Sub,
SubOpts,
Value,
} from "./types.ts";
import { addEventSource, deferred } from "./util.ts";
Expand Down Expand Up @@ -131,21 +131,29 @@ export class NatsImpl extends HttpImpl implements Nats {
return new MsgImpl(await r.json());
}

sub(subject: string, opts?: HeartbeatOpts): Promise<EventSource> {
sub(subject: string, opts: Partial<SubOpts> = {}): Promise<EventSource> {
const args = [];
args.push(`authorization=${this.apiKey}`);

if (opts && opts.idleHeartbeat && opts.idleHeartbeat > 0) {
if (opts.idleHeartbeat && opts.idleHeartbeat > 0) {
args.push(`idleHeartbeat=${opts.idleHeartbeat}`);
}
if (opts.queue) {
args.push(`queue=${encodeURIComponent(opts.queue)}`);
}

const qs = args.length ? args.join("&") : "";
const path = `/v1/nats/subjects/${subject}?${qs}`;

return addEventSource(new URL(path, this.url));
}

async subscribe(subject: string, cb: MsgCallback): Promise<Sub> {
const es = await this.sub(subject);
async subscribe(
subject: string,
cb: MsgCallback,
opts: Partial<SubOpts> = {},
): Promise<Sub> {
const es = await this.sub(subject, opts);
return Promise.resolve(new SubImpl(es, cb));
}

Expand Down
35 changes: 35 additions & 0 deletions nhgc/nats_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,38 @@ Deno.test("nats - request headers", async () => {
await nc.request("q", undefined, { headers: { "NatsH-Hello-World": "Hi" } });
sub.unsubscribe();
});

Deno.test("nats - queue subs", async () => {
const nhg = newNHG(getConnectionDetails());
const nc = nhg.nats;

const a = [];
const sub1 = await nc.subscribe("q.*", (err, msg) => {
if (err) {
fail(err.message);
}
a.push(msg?.subject);
}, { queue: "hello" });

const b = [];
const sub2 = await nc.subscribe("q.*", (err, msg) => {
if (err) {
fail(err.message);
}
b.push(msg?.subject);
}, { queue: "hello" });

const proms = [];
for (let i = 0; i < 100; i++) {
proms.push(nc.publish(`q.${i}`));
}
proms.push(nc.flush());
proms.push(await delay(1000));

await Promise.all(proms);

assertEquals(a.length + b.length, 100);

sub1.unsubscribe();
sub2.unsubscribe();
});
14 changes: 13 additions & 1 deletion nhgc/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,13 @@ export type HeartbeatOpts = {
idleHeartbeat?: number;
};

export type SubOpts = {
/**
* NATS queue group to place the subscription under
*/
queue: string;
} & HeartbeatOpts;

export type WatchOpts = HeartbeatOpts & {
/**
* Values to include in the updates
Expand Down Expand Up @@ -440,8 +447,13 @@ export interface Nats {
* there's an error, the reason for the error is not available.
* @param subject
* @param cb (err?: Error, msg?: Msg) => void;
* @param opts subscription options
*/
subscribe(subject: string, cb: MsgCallback): Promise<Sub>;
subscribe(
subject: string,
cb: MsgCallback,
opts?: Partial<SubOpts>,
): Promise<Sub>;

/**
* Performs a round trip to the server.
Expand Down

0 comments on commit fc410a5

Please sign in to comment.