diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 13d8761598a83..21844a660ceb9 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -253,7 +253,7 @@ export async function eachBatchParallelIngestion( } } -function computeKey(pluginEvent: PipelineEvent): string { +export function computeKey(pluginEvent: PipelineEvent): string { return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}` } diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index e042cf5c1ac34..e036f7c6b0016 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -1,9 +1,14 @@ +import { Message } from 'node-rdkafka' + import { buildStringMatcher } from '../../../src/config/config' import { KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../src/config/kafka-topics' import { + computeKey, eachBatchParallelIngestion, IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' +import { Hub } from '../../../src/types' +import { createHub } from '../../../src/utils/db/hub' import { ConfiguredLimiter } from '../../../src/utils/token-bucket' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' @@ -47,58 +52,55 @@ const captureEndpointEvent2 = { } describe('eachBatchParallelIngestion with overflow reroute', () => { + let hub: Hub + let closeServer: () => Promise let queue: any - function createBatchWithMultipleEventsWithKeys(events: any[], timestamp?: any): any { - return events.map((event) => ({ + function createBatchWithMultipleEvents(events: any[], timestamp?: any, withKey: boolean = true): Message[] { + return events.map((event, i) => ({ partition: 0, topic: KAFKA_EVENTS_PLUGIN_INGESTION, - value: JSON.stringify(event), + value: Buffer.from(JSON.stringify(event)), timestamp, - offset: event.offset, - key: event.team_id + ':' + event.distinct_id, + offset: i, + key: withKey ? computeKey(event) : null, + size: 0, // irrelevant, but needed for type checking })) } - beforeEach(() => { + beforeEach(async () => { + ;[hub, closeServer] = await createHub() queue = { bufferSleep: jest.fn(), - pluginsServer: { - INGESTION_CONCURRENCY: 4, - kafkaProducer: { - produce: jest.fn(), - }, - db: 'database', - }, + pluginsServer: hub, } jest.mock('./../../../src/worker/ingestion/event-pipeline/runner') }) + afterEach(async () => { + await closeServer() + jest.clearAllMocks() + }) + it('reroutes events with no key to OVERFLOW topic', async () => { - const batch = [ - { - partition: 0, - topic: KAFKA_EVENTS_PLUGIN_INGESTION, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], - key: null, - token: 'ok', - }, - ] + const now = Date.now() + const [message] = createBatchWithMultipleEvents( + [captureEndpointEvent1], + now, + false // act as if this message was intended to be routed to overflow by capture + ) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) + const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const tokenBlockList = buildStringMatcher('another_token,more_token', false) - await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) + await eachBatchParallelIngestion(tokenBlockList, [message], queue, IngestionOverflowMode.Reroute) expect(consume).not.toHaveBeenCalled() expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ + expect(produce).toHaveBeenCalledWith({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], + value: message.value, key: null, waitForAck: true, }) @@ -109,23 +111,23 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { it('reroutes excess events to OVERFLOW topic', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) + const event = captureEndpointEvent1 + const [message] = createBatchWithMultipleEvents([event], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) + const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const tokenBlockList = buildStringMatcher('another_token,more_token', false) - await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) + await eachBatchParallelIngestion(tokenBlockList, [message], queue, IngestionOverflowMode.Reroute) expect(consume).toHaveBeenCalledWith( - captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], + computeKey(event), // NOTE: can't use ``message.key`` here as it will already have been mutated 1, now ) expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ + expect(produce).toHaveBeenCalledWith({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], + value: message.value, key: null, waitForAck: true, }) @@ -136,8 +138,9 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { it('does not reroute if not over capacity limit', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1, captureEndpointEvent2], now) + const batch = createBatchWithMultipleEvents([captureEndpointEvent1, captureEndpointEvent2], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) + const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) @@ -153,17 +156,18 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { now ) expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled() + expect(produce).not.toHaveBeenCalled() // Event is processed expect(runEventPipeline).toHaveBeenCalledTimes(2) }) it('does drop events from blocked tokens', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys( + const batch = createBatchWithMultipleEvents( [captureEndpointEvent1, captureEndpointEvent2, captureEndpointEvent1], now ) + const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) const tokenBlockList = buildStringMatcher('mytoken,another_token', false) @@ -176,7 +180,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { now ) expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled() + expect(produce).not.toHaveBeenCalled() expect(runEventPipeline).toHaveBeenCalledTimes(1) }) })