diff --git a/apps/metrics-service/src/utils/MetricsConsumerService/consumer.ts b/apps/metrics-service/src/utils/MetricsConsumerService/consumer.ts index 36245e65a..122ddf6b1 100644 --- a/apps/metrics-service/src/utils/MetricsConsumerService/consumer.ts +++ b/apps/metrics-service/src/utils/MetricsConsumerService/consumer.ts @@ -1,4 +1,3 @@ -import {METRICS_QUEUE_NAME} from '@vexl-next/server-utils/src/metrics/domain' import {Effect, Runtime} from 'effect' import {type Consumer, type IMessageTransferable} from 'redis-smq' import {ErrorSettingUpConsumer} from '.' @@ -34,6 +33,7 @@ export const silentlyShutdownConsumer = ( export const registerMessageHandler = ( consumer: Consumer, + queueName: string, handler: (message: IMessageTransferable) => Effect.Effect ): Effect.Effect => Effect.gen(function* (_) { @@ -43,7 +43,7 @@ export const registerMessageHandler = ( // eslint-disable-next-line @typescript-eslint/no-invalid-void-type Effect.async((cb) => { consumer.consume( - METRICS_QUEUE_NAME, + queueName, (message, consumedCallback) => { runFork( handler(message).pipe( diff --git a/apps/metrics-service/src/utils/MetricsConsumerService/index.ts b/apps/metrics-service/src/utils/MetricsConsumerService/index.ts index 1f327d91b..8bd7c4b91 100644 --- a/apps/metrics-service/src/utils/MetricsConsumerService/index.ts +++ b/apps/metrics-service/src/utils/MetricsConsumerService/index.ts @@ -1,4 +1,5 @@ import {Schema} from '@effect/schema' +import {metricsQueueNameConfig} from '@vexl-next/server-utils/src/commonConfigs' import { setupRedisSmqConnection, type SettingUpRedisSmqConnectionError, @@ -40,22 +41,26 @@ export class MetricsConsumerService extends Context.Tag( Layer.scoped( MetricsConsumerService, Effect.gen(function* (_) { + const queueName = yield* _(metricsQueueNameConfig) + yield* _(Effect.log('Configuring redis smq')) yield* _(setupRedisSmqConnection(redisUrl)) - yield* _(Effect.log('Ensuring queue exists')) + yield* _(Effect.log(`Ensuring queue exists. Queue name: ${queueName}`)) const queue = new Queue() yield* _(Effect.addFinalizer(() => silentlyShutdownQueue(queue))) - yield* _(ensureMetricsQueueExists(queue)) + yield* _(ensureMetricsQueueExists(queue, queueName)) yield* _(Effect.log('Queue ensured')) const consumer = new Consumer() yield* _(Effect.addFinalizer(() => silentlyShutdownConsumer(consumer))) yield* _(startConsumer(consumer)) - yield* _(registerMessageHandler(consumer, messageHandler)) - yield* _(Effect.log('Registered message handler')) + yield* _(registerMessageHandler(consumer, queueName, messageHandler)) + yield* _( + Effect.log(`Registered message handler: Queue name: ${queueName}`) + ) return {queue, consumer} }) diff --git a/apps/metrics-service/src/utils/MetricsConsumerService/queue.ts b/apps/metrics-service/src/utils/MetricsConsumerService/queue.ts index 1270c6a23..5ae87a0fd 100644 --- a/apps/metrics-service/src/utils/MetricsConsumerService/queue.ts +++ b/apps/metrics-service/src/utils/MetricsConsumerService/queue.ts @@ -1,4 +1,3 @@ -import {METRICS_QUEUE_NAME} from '@vexl-next/server-utils/src/metrics/domain' import {Effect} from 'effect' import {EQueueDeliveryModel, EQueueType, type Queue} from 'redis-smq' import {ErrorSettingUpConsumer} from './ErrorSettingUpConsumer' @@ -23,12 +22,13 @@ const checkQueueExists = ( }) const createMetricsQueue = ( - queue: Queue + queue: Queue, + queueName: string ): Effect.Effect => // eslint-disable-next-line @typescript-eslint/no-invalid-void-type Effect.async((cb) => { queue.save( - METRICS_QUEUE_NAME, + queueName, EQueueType.FIFO_QUEUE, EQueueDeliveryModel.POINT_TO_POINT, (err) => { @@ -49,15 +49,16 @@ const createMetricsQueue = ( }) export const ensureMetricsQueueExists = ( - queue: Queue + queue: Queue, + queueName: string ): Effect.Effect => Effect.gen(function* (_) { - if (yield* _(checkQueueExists(METRICS_QUEUE_NAME, queue))) { - yield* _(Effect.log('Queue already exists')) + if (yield* _(checkQueueExists(queueName, queue))) { + yield* _(Effect.log(`Queue (name: ${queueName}) already exists`)) return } - yield* _(createMetricsQueue(queue)) - yield* _(Effect.log('Queue created')) + yield* _(createMetricsQueue(queue, queueName)) + yield* _(Effect.log(`Queue (name: ${queueName}) created`)) }) export const silentlyShutdownQueue = (queue: Queue): Effect.Effect => diff --git a/packages/server-utils/src/commonConfigs.ts b/packages/server-utils/src/commonConfigs.ts index ce4eb757f..a0fd761bb 100644 --- a/packages/server-utils/src/commonConfigs.ts +++ b/packages/server-utils/src/commonConfigs.ts @@ -90,3 +90,5 @@ export const memoryDebugIntervalMsConfig = Config.option( export const disableDevToolsInDevelopmentConfig = Config.option( Config.boolean('DISABLE_DEV_TOOLS') ) + +export const metricsQueueNameConfig = Config.string('METRICS_QUEUE_NAME') diff --git a/packages/server-utils/src/metrics/MetricsClientService.ts b/packages/server-utils/src/metrics/MetricsClientService.ts index 036dc3c22..c0b9cb298 100644 --- a/packages/server-utils/src/metrics/MetricsClientService.ts +++ b/packages/server-utils/src/metrics/MetricsClientService.ts @@ -68,8 +68,7 @@ export class MetricsClientService extends Context.Tag('MetricsClientService')< message: MetricsMessage ): Effect.Effect => message.toProducibleMessage().pipe( - Effect.catchTag( - 'ParseError', + Effect.catchAll( (error) => new ReportingMetricsError({ cause: error, diff --git a/packages/server-utils/src/metrics/domain.ts b/packages/server-utils/src/metrics/domain.ts index ed69edbbc..d30bc2892 100644 --- a/packages/server-utils/src/metrics/domain.ts +++ b/packages/server-utils/src/metrics/domain.ts @@ -1,10 +1,9 @@ import {Schema} from '@effect/schema' import {type ParseError} from '@effect/schema/ParseResult' import {UuidE} from '@vexl-next/domain/src/utility/Uuid.brand' -import {Effect} from 'effect' +import {type ConfigError, Effect} from 'effect' import {type IMessageTransferable, ProducibleMessage} from 'redis-smq' - -export const METRICS_QUEUE_NAME = 'metrics' as const +import {metricsQueueNameConfig} from '../commonConfigs' export class MetricsMessage extends Schema.Class( 'MetricsMessage' @@ -34,16 +33,23 @@ export class MetricsMessage extends Schema.Class( return MetricsMessage.decodeUnknwon(message.body) } - toProducibleMessage(): Effect.Effect { - return MetricsMessage.encode(this).pipe( - Effect.flatMap((encodedBody) => + toProducibleMessage(): Effect.Effect< + ProducibleMessage, + ParseError | ConfigError.ConfigError + > { + const encodeMtricsMessage = MetricsMessage.encode(this) + return Effect.gen(function* (_) { + const encodedBody = yield* _(encodeMtricsMessage) + const queueName = yield* _(metricsQueueNameConfig) + + return yield* _( Effect.sync(() => { const message = new ProducibleMessage() - message.setBody(encodedBody).setQueue(METRICS_QUEUE_NAME) + message.setBody(encodedBody).setQueue(queueName) return message }) ) - ) + }) } } export class ReportingMetricsError extends Schema.TaggedError(