Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Microservice Consumer Healthcheck #2550

Open
1 task done
edeesis opened this issue Jun 13, 2024 · 1 comment
Open
1 task done

Kafka Microservice Consumer Healthcheck #2550

edeesis opened this issue Jun 13, 2024 · 1 comment

Comments

@edeesis
Copy link

edeesis commented Jun 13, 2024

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

The Microservice Health Indicator only checks whether a producer can connect to Kafka, but I’m also interested in the state of the consumer. If the consumer has crashed, then I want the health indicator to fail.

unfortunately, implementing this myself I ran into a few issues:

  1. accessing the ServerKafka instance, which contains the consumer, doesn’t seem possible in the context of a health indicator
  2. there’s no easy way to subscribe to instrumentation events, see Add support for Kafka Instrumentation events nest#11616

Describe the solution you'd like

I’d like the Microservice Health Indicator to also check the status of the Server side, not just the client side.

Teachability, documentation, adoption, migration strategy

The usage should be the same as the existing MicroserviceHealthIndicator, though it shouldn’t require instantiating a new instance of ServerKafka, it should use the existing one.

What is the motivation / use case for changing the behavior?

I need to be able to know if the consumer has crashed and include that information in a liveness check.

@edeesis
Copy link
Author

edeesis commented Jun 20, 2024

My workaround I've found is to move the custom microservice implementation into a module in the application context, add a public method to expose the consumer and then use app.resolve to fetch it to run connectMicroservice:

export class KafkaMicroserviceServer extends ServerKafka {

    getConsumer(): Consumer {
      return this.consumer;
    }
export class KafkaConsumerHealthIndicator extends HealthIndicator implements OnModuleInit {
  private readonly crashEvents: { [groupId: string]: ConsumerCrashEvent | undefined } = {};

  constructor(
    private readonly kafkaMicroserviceServer: KafkaMicroserviceServer
  ) {
    super();
  }

  async onModuleInit(): Promise<void> {
    const consumer = this.kafkaMicroserviceServer.getConsumer();
    const { groupId } = await consumer.describeGroup();
    this.crashEvents[groupId] = undefined;
    consumer.on(consumer.events.CRASH, (event) => {
      this.crashEvents[event.payload.groupId] = event;
    });
  }
    const kafkaMicroserviceServer = await app.resolve(KafkaMicroserviceServer);
    app.connectMicroservice({
      strategy: kafkaMicroserviceServer,
    });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant