Skip to content

Commit

Permalink
fix(decorators): add decorator to tag a consumer service
Browse files Browse the repository at this point in the history
  • Loading branch information
akshatdubeysf committed Oct 31, 2022
1 parent beb9f17 commit 0c69402
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 18 deletions.
23 changes: 13 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# loopback4-kafka-client


[![LoopBack](<https://github.com/strongloop/loopback-next/raw/master/docs/site/imgs/branding/Powered-by-LoopBack-Badge-(blue)[email protected]>)](http://loopback.io/)

[![Node version](https://img.shields.io/node/v/loopback4-kafka-client.svg?style=flat-square)](https://nodejs.org/en/download/)
Expand All @@ -12,6 +11,7 @@
[![License](https://img.shields.io/github/license/sourcefuse/loopback4-kafka-client.svg?color=blue&label=License&style=flat-square)](https://github.com/sourcefuse/loopback4-kafka-client/blob/master/LICENSE)
[![Downloads](https://img.shields.io/npm/dw/loopback4-kafka-client.svg?label=Downloads&style=flat-square&color=blue)](https://www.npmjs.com/package/loopback4-kafka-client)
[![Total Downloads](https://img.shields.io/npm/dt/loopback4-kafka-client.svg?label=Total%20Downloads&style=flat-square&color=blue)](https://www.npmjs.com/package/loopback4-kafka-client)

A Kakfa Client for Loopback4 built on top of [KafkaJS](https://kafka.js.org/).

## Installation
Expand Down Expand Up @@ -83,11 +83,11 @@ export class TestStream implements IStreamDefinition {

### Consumer

A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement the `IConsumer` interface and should be using the `asConsumer` binding template. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.
A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement the `IConsumer` interface and should be using the `@consumer()` decorator. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.

##### Example

```
```ts
// application.ts
this.configure(KafkaConnectorComponentBindings.COMPONENT).to({
...
Expand All @@ -96,8 +96,9 @@ this.configure(KafkaConnectorComponentBindings.COMPONENT).to({
});
```

```
```ts
// start.consumer.ts
@consumer<TestStream, Events.start>()
export class StartConsumer implements IConsumer<TestStream, Events.start> {
constructor(
@inject('test.handler.start')
Expand All @@ -114,19 +115,21 @@ export class StartConsumer implements IConsumer<TestStream, Events.start> {

If you want to write a shared handler for different events, you can use the `eventHandlerKey` to bind a handler in the application -

```
```ts
// application.ts
this.bind(eventHandlerKey(Events.Start)).to((payload: StartEvent) => {
console.log(payload);
})
this.bind(eventHandlerKey<TestStream, Events.Stop>(Events.Stop)).toProvider(CustomEventHandlerProvider);
});
this.bind(eventHandlerKey<TestStream, Events.Stop>(Events.Stop)).toProvider(
CustomEventHandlerProvider,
);
```

and then you can use the handler using the `@eventHandler` decorator -

```
```ts
// start.consumer.ts
@injectable(asConsumer)
@consumer<TestStream, Events.start>()
export class StartConsumer implements IConsumer<TestStream, Events.start> {
constructor(
@eventHandler<TestStream>(Events.Start)
Expand All @@ -144,7 +147,7 @@ Note: The topic name passed to decorator must be first configured in the Compone

#### Example

```
```ts
// application.ts
...
this.configure(KafkaConnectorComponentBindings.COMPONENT).to({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import {injectable} from '@loopback/core';
import {consumer} from '../../../../decorators';
import {eventHandler} from '../../../../decorators/handler.decorator';
import {asConsumer} from '../../../../keys';
import {IConsumer, StreamHandler} from '../../../../types';
import {StreamHandler} from '../../../../types';
import {Events} from '../events.enum';
import {TestStream} from '../stream';
import {Topics} from '../topics.enum';

@injectable(asConsumer)
export class StartConsumer implements IConsumer<TestStream, Events.start> {
@consumer<TestStream, Events.start>()
export class StartConsumer {
constructor(
@eventHandler<TestStream>(Events.start)
public handler: StreamHandler<TestStream, Events.start>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import {injectable} from '@loopback/core';
import {consumer} from '../../../../decorators';
import {eventHandler} from '../../../../decorators/handler.decorator';
import {asConsumer} from '../../../../keys';
import {IConsumer, StreamHandler} from '../../../../types';
import {Events} from '../events.enum';
import {TestStream} from '../stream';
import {Topics} from '../topics.enum';

@injectable(asConsumer)
@consumer<TestStream, Events.stop>()
export class StopConsumer implements IConsumer<TestStream, Events.stop> {
constructor(
@eventHandler<TestStream>(Events.stop)
Expand Down
11 changes: 11 additions & 0 deletions src/__tests__/unit/consumer-decorator.unit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import {Reflector} from '@loopback/core';
import {expect} from '@loopback/testlab';
import {asConsumer} from '../../keys';
import {StartConsumer} from '../acceptance/fixtures/consumer/start-consumer.extension';

describe('unit: Consumer Decorator', () => {
it('should mark a service as a consumer extension', () => {
const key = Reflector.getOwnMetadata('binding.metadata', StartConsumer);
expect(key.templates[1]).to.equal(asConsumer);
});
});
12 changes: 12 additions & 0 deletions src/decorators/consumer.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {Constructor, injectable} from '@loopback/core';
import {asConsumer} from '../keys';
import {IConsumer, IStreamDefinition} from '../types';

export function consumer<
T extends IStreamDefinition,
E extends keyof T['messages'],
>() {
return injectable(asConsumer) as (
target: Constructor<IConsumer<T, E>>,
) => void;
}
1 change: 1 addition & 0 deletions src/decorators/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './producer.decorator';
export * from './handler.decorator';
export * from './consumer.decorator';

0 comments on commit 0c69402

Please sign in to comment.