Skip to content

Commit

Permalink
Feature: Allow multiple stream config (#56)
Browse files Browse the repository at this point in the history
* added multiple stream config feature

* updated README.md
  • Loading branch information
shinnida220 authored Jan 26, 2024
1 parent 02768d3 commit 6e2341c
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 48 deletions.
14 changes: 1 addition & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/nats-listener/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module.exports = {
parserOptions: {
project: 'tsconfig.json',
sourceType: 'module',
tsconfigRootDir: __dirname,
},
plugins: ['@typescript-eslint/eslint-plugin'],
extends: [
Expand Down
2 changes: 1 addition & 1 deletion packages/nats-listener/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"test:e2e": "jest --config ./test/jest-e2e.json"
},
"dependencies": {
"@nestjs-plugins/nestjs-nats-jetstream-transport": "^2.0.0",
"@nestjs-plugins/nestjs-nats-jetstream-transport": "file:../nestjs-nats-jetstream-transport",
"@nestjs/common": "^8.0.0",
"@nestjs/core": "^8.0.0",
"@nestjs/microservices": "^8.2.3",
Expand Down
30 changes: 30 additions & 0 deletions packages/nats-listener/src/app.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,36 @@ export class AppController {
console.log('received: ' + context.message.subject, data);
}

/** Added more listeners for the other subject */
@EventPattern('other.updated')
public async otherUpdatedHandler(
@Payload() data: string,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}

@EventPattern('other.created')
public async otherCreatedHandler(
@Payload() data: { id: number; name: string },
@Ctx() context: NatsJetStreamContext,
) {
console.log(context.message.headers);
console.log(context.message.info);
context.message.ack();
console.log('received: ' + context.message.subject, data);
}

@EventPattern('other.deleted')
public async otherDeletedHandler(
@Payload() data: any,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}

// request - response
@MessagePattern({ cmd: 'sum' })
async accumulate(
Expand Down
14 changes: 10 additions & 4 deletions packages/nats-listener/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ import { Logger } from '@nestjs/common';
import { DebugEvents, Events } from 'nats';

async function bootstrap() {
const streamConfig: NatsStreamConfig = {
name: 'mystream',
subjects: ['order.*'],
};
const streamConfig: NatsStreamConfig[] = [
{
name: 'mystream',
subjects: ['order.*'],
},
{
name: 'my-other-stream',
subjects: ['other.*'],
},
];
const logger = new Logger();
const options: CustomStrategy = {
strategy: new NatsJetStreamServer({
Expand Down
1 change: 1 addition & 0 deletions packages/nats-publisher/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module.exports = {
parserOptions: {
project: 'tsconfig.json',
sourceType: 'module',
tsconfigRootDir: __dirname,
},
plugins: ['@typescript-eslint/eslint-plugin'],
extends: [
Expand Down
23 changes: 12 additions & 11 deletions packages/nestjs-nats-jetstream-transport/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
module.exports = {
parser: "@typescript-eslint/parser",
parser: '@typescript-eslint/parser',
parserOptions: {
project: "tsconfig.json",
sourceType: "module",
project: 'tsconfig.json',
sourceType: 'module',
tsconfigRootDir: __dirname,
},
plugins: ["@typescript-eslint/eslint-plugin"],
plugins: ['@typescript-eslint/eslint-plugin'],
extends: [
"plugin:@typescript-eslint/recommended",
"plugin:prettier/recommended",
'plugin:@typescript-eslint/recommended',
'plugin:prettier/recommended',
],
root: true,
env: {
node: true,
jest: true,
},
ignorePatterns: [".eslintrc.js"],
ignorePatterns: ['.eslintrc.js'],
rules: {
"@typescript-eslint/interface-name-prefix": "off",
"@typescript-eslint/explicit-function-return-type": "off",
"@typescript-eslint/explicit-module-boundary-types": "off",
"@typescript-eslint/no-explicit-any": "off",
'@typescript-eslint/interface-name-prefix': 'off',
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/explicit-module-boundary-types': 'off',
'@typescript-eslint/no-explicit-any': 'off',
},
};
7 changes: 7 additions & 0 deletions packages/nestjs-nats-jetstream-transport/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ async function bootstrap() {
name: 'mystream',
subjects: ['order.*'],
},
// streamConfig: [{
// name: 'mystream',
// subjects: ['order.*'],
// },{
// name: 'myOtherStream',
// subjects: ['other.*'],
// }],
}),
};

Expand Down
1 change: 0 additions & 1 deletion packages/nestjs-nats-jetstream-transport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"author": "Bernt Anker [[email protected]]",
"license": "ISC",
"dependencies": {
"@nestjs-plugins/nestjs-nats-jetstream-transport": "^1.3.15",
"nats": "^2.6.1",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.4.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ export interface NatsJetStreamServerOptions {
Pick<NatsConnectionOptions, 'name'>;
consumerOptions: Partial<ServerConsumerOptions>;
jetStreamOptions?: JetStreamOptions;
streamConfig?: NatsStreamConfig;
streamConfig?: NatsStreamConfig | NatsStreamConfig[];
}
41 changes: 24 additions & 17 deletions packages/nestjs-nats-jetstream-transport/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,32 @@ export class NatsJetStreamServer
private async setupStream() {
const { streamConfig } = this.options;
const streams = await this.jsm.streams.list().next();
const stream = streams.find(
(stream) => stream.config.name === streamConfig.name,
);

if (stream) {
const streamSubjects = new Set([
...stream.config.subjects,
...streamConfig.subjects,
]);
const reqStreamConfigs = !Array.isArray(streamConfig)
? [streamConfig]
: streamConfig;

const streamInfo = await this.jsm.streams.update(stream.config.name, {
...stream.config,
...streamConfig,
subjects: [...streamSubjects.keys()],
});
this.logger.log(`Stream ${streamInfo.config.name} updated`);
} else {
const streamInfo = await this.jsm.streams.add(streamConfig);
this.logger.log(`Stream ${streamInfo.config.name} created`);
for (const requiredStreamConfig of reqStreamConfigs) {
const stream = streams.find(
(stream) => stream.config.name === requiredStreamConfig.name,
);

if (stream) {
const streamSubjects = new Set([
...stream.config.subjects,
...requiredStreamConfig.subjects,
]);

const streamInfo = await this.jsm.streams.update(stream.config.name, {
...stream.config,
...requiredStreamConfig,
subjects: [...streamSubjects.keys()],
});
this.logger.log(`Stream ${streamInfo.config.name} updated`);
} else {
const streamInfo = await this.jsm.streams.add(requiredStreamConfig);
this.logger.log(`Stream ${streamInfo.config.name} created`);
}
}
}
}

0 comments on commit 6e2341c

Please sign in to comment.