Skip to content

Commit

Permalink
fix: metrics key shoulb configurable so it is different on prod and stag
Browse files Browse the repository at this point in the history
  • Loading branch information
kaladivo committed Oct 31, 2024
1 parent b06b1c4 commit b64932e
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {METRICS_QUEUE_NAME} from '@vexl-next/server-utils/src/metrics/domain'
import {Effect, Runtime} from 'effect'
import {type Consumer, type IMessageTransferable} from 'redis-smq'
import {ErrorSettingUpConsumer} from '.'
Expand Down Expand Up @@ -34,6 +33,7 @@ export const silentlyShutdownConsumer = (

export const registerMessageHandler = <E, R>(
consumer: Consumer,
queueName: string,
handler: (message: IMessageTransferable) => Effect.Effect<void, E, R>
): Effect.Effect<void, ErrorSettingUpConsumer, R> =>
Effect.gen(function* (_) {
Expand All @@ -43,7 +43,7 @@ export const registerMessageHandler = <E, R>(
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
Effect.async<void, ErrorSettingUpConsumer>((cb) => {
consumer.consume(
METRICS_QUEUE_NAME,
queueName,
(message, consumedCallback) => {
runFork(
handler(message).pipe(
Expand Down
13 changes: 9 additions & 4 deletions apps/metrics-service/src/utils/MetricsConsumerService/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {Schema} from '@effect/schema'
import {metricsQueueNameConfig} from '@vexl-next/server-utils/src/commonConfigs'
import {
setupRedisSmqConnection,
type SettingUpRedisSmqConnectionError,
Expand Down Expand Up @@ -40,22 +41,26 @@ export class MetricsConsumerService extends Context.Tag(
Layer.scoped(
MetricsConsumerService,
Effect.gen(function* (_) {
const queueName = yield* _(metricsQueueNameConfig)

yield* _(Effect.log('Configuring redis smq'))
yield* _(setupRedisSmqConnection(redisUrl))

yield* _(Effect.log('Ensuring queue exists'))
yield* _(Effect.log(`Ensuring queue exists. Queue name: ${queueName}`))
const queue = new Queue()

yield* _(Effect.addFinalizer(() => silentlyShutdownQueue(queue)))
yield* _(ensureMetricsQueueExists(queue))
yield* _(ensureMetricsQueueExists(queue, queueName))
yield* _(Effect.log('Queue ensured'))

const consumer = new Consumer()
yield* _(Effect.addFinalizer(() => silentlyShutdownConsumer(consumer)))
yield* _(startConsumer(consumer))

yield* _(registerMessageHandler(consumer, messageHandler))
yield* _(Effect.log('Registered message handler'))
yield* _(registerMessageHandler(consumer, queueName, messageHandler))
yield* _(
Effect.log(`Registered message handler: Queue name: ${queueName}`)
)

return {queue, consumer}
})
Expand Down
17 changes: 9 additions & 8 deletions apps/metrics-service/src/utils/MetricsConsumerService/queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {METRICS_QUEUE_NAME} from '@vexl-next/server-utils/src/metrics/domain'
import {Effect} from 'effect'
import {EQueueDeliveryModel, EQueueType, type Queue} from 'redis-smq'
import {ErrorSettingUpConsumer} from './ErrorSettingUpConsumer'
Expand All @@ -23,12 +22,13 @@ const checkQueueExists = (
})

const createMetricsQueue = (
queue: Queue
queue: Queue,
queueName: string
): Effect.Effect<void, ErrorSettingUpConsumer> =>
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
Effect.async<void, ErrorSettingUpConsumer>((cb) => {
queue.save(
METRICS_QUEUE_NAME,
queueName,
EQueueType.FIFO_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
(err) => {
Expand All @@ -49,15 +49,16 @@ const createMetricsQueue = (
})

export const ensureMetricsQueueExists = (
queue: Queue
queue: Queue,
queueName: string
): Effect.Effect<void, ErrorSettingUpConsumer, never> =>
Effect.gen(function* (_) {
if (yield* _(checkQueueExists(METRICS_QUEUE_NAME, queue))) {
yield* _(Effect.log('Queue already exists'))
if (yield* _(checkQueueExists(queueName, queue))) {
yield* _(Effect.log(`Queue (name: ${queueName}) already exists`))
return
}
yield* _(createMetricsQueue(queue))
yield* _(Effect.log('Queue created'))
yield* _(createMetricsQueue(queue, queueName))
yield* _(Effect.log(`Queue (name: ${queueName}) created`))
})

export const silentlyShutdownQueue = (queue: Queue): Effect.Effect<void> =>
Expand Down
2 changes: 2 additions & 0 deletions packages/server-utils/src/commonConfigs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,5 @@ export const memoryDebugIntervalMsConfig = Config.option(
export const disableDevToolsInDevelopmentConfig = Config.option(
Config.boolean('DISABLE_DEV_TOOLS')
)

export const metricsQueueNameConfig = Config.string('METRICS_QUEUE_NAME')
3 changes: 1 addition & 2 deletions packages/server-utils/src/metrics/MetricsClientService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ export class MetricsClientService extends Context.Tag('MetricsClientService')<
message: MetricsMessage
): Effect.Effect<void, ReportingMetricsError> =>
message.toProducibleMessage().pipe(
Effect.catchTag(
'ParseError',
Effect.catchAll(
(error) =>
new ReportingMetricsError({
cause: error,
Expand Down
22 changes: 14 additions & 8 deletions packages/server-utils/src/metrics/domain.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import {Schema} from '@effect/schema'
import {type ParseError} from '@effect/schema/ParseResult'
import {UuidE} from '@vexl-next/domain/src/utility/Uuid.brand'
import {Effect} from 'effect'
import {type ConfigError, Effect} from 'effect'
import {type IMessageTransferable, ProducibleMessage} from 'redis-smq'

export const METRICS_QUEUE_NAME = 'metrics' as const
import {metricsQueueNameConfig} from '../commonConfigs'

export class MetricsMessage extends Schema.Class<MetricsMessage>(
'MetricsMessage'
Expand Down Expand Up @@ -34,16 +33,23 @@ export class MetricsMessage extends Schema.Class<MetricsMessage>(
return MetricsMessage.decodeUnknwon(message.body)
}

toProducibleMessage(): Effect.Effect<ProducibleMessage, ParseError> {
return MetricsMessage.encode(this).pipe(
Effect.flatMap((encodedBody) =>
toProducibleMessage(): Effect.Effect<
ProducibleMessage,
ParseError | ConfigError.ConfigError
> {
const encodeMtricsMessage = MetricsMessage.encode(this)
return Effect.gen(function* (_) {
const encodedBody = yield* _(encodeMtricsMessage)
const queueName = yield* _(metricsQueueNameConfig)

return yield* _(
Effect.sync(() => {
const message = new ProducibleMessage()
message.setBody(encodedBody).setQueue(METRICS_QUEUE_NAME)
message.setBody(encodedBody).setQueue(queueName)
return message
})
)
)
})
}
}
export class ReportingMetricsError extends Schema.TaggedError<ReportingMetricsError>(
Expand Down

0 comments on commit b64932e

Please sign in to comment.