Skip to content

Commit

Permalink
feat(plugin-server): Support preserving distinct ID locality on overf…
Browse files Browse the repository at this point in the history
…low rerouting (#20945)

Turned off by default for backwards compatibility for now.
  • Loading branch information
tkaemming authored Apr 2, 2024
1 parent 39050a9 commit 85ef237
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 43 deletions.
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export function getDefaultConfig(): PluginsServerConfig {
TASKS_PER_WORKER: 10,
INGESTION_CONCURRENCY: 10,
INGESTION_BATCH_SIZE: 500,
INGESTION_OVERFLOW_ENABLED: false,
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false,
PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log,
LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info,
SENTRY_DSN: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Counter } from 'prom-client'
import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { isIngestionOverflowEnabled } from '../../utils/env-utils'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
Expand All @@ -24,7 +23,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.
Before processing, if isIngestionOverflowEnabled and an event has
Before processing, if overflow rerouting is enabled and an event has
overflowed the capacity for its (team_id, distinct_id) pair, it will not
be processed here but instead re-produced into the
`events_plugin_ingestion_overflow` topic for later processing.
Expand All @@ -47,7 +46,11 @@ export const startAnalyticsEventsIngestionConsumer = async ({
// deployment, we require an env variable to be set to confirm this before
// enabling re-production of events to the OVERFLOW topic.

const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled
const overflowMode = hub.INGESTION_OVERFLOW_ENABLED
? hub.INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY
? IngestionOverflowMode.Reroute
: IngestionOverflowMode.RerouteRandomly
: IngestionOverflowMode.Disabled

const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ require('@sentry/tracing')

export enum IngestionOverflowMode {
Disabled,
Reroute,
Reroute, // preserves partition locality
RerouteRandomly, // discards partition locality
ConsumeSplitByDistinctId,
ConsumeSplitEvenly,
}
Expand Down Expand Up @@ -217,7 +218,9 @@ export async function eachBatchParallelIngestion(
op: 'emitToOverflow',
data: { eventCount: splitBatch.toOverflow.length },
})
processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow))
processingPromises.push(
emitToOverflow(queue, splitBatch.toOverflow, overflowMode === IngestionOverflowMode.RerouteRandomly)
)
overflowSpan.finish()
}

Expand Down Expand Up @@ -257,14 +260,14 @@ function computeKey(pluginEvent: PipelineEvent): string {
return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}`
}

async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]) {
async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[], useRandomPartitioner: boolean) {
ingestionOverflowingMessagesTotal.inc(kafkaMessages.length)
await Promise.all(
kafkaMessages.map((message) =>
queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null, // No locality guarantees in overflow
key: useRandomPartitioner ? undefined : message.key,
headers: message.headers,
waitForAck: true,
})
Expand All @@ -286,6 +289,9 @@ export function splitIngestionBatch(
toProcess: [],
toOverflow: [],
}
const shouldRerouteToOverflow = [IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly].includes(
overflowMode
)

if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) {
/**
Expand Down Expand Up @@ -314,7 +320,7 @@ export function splitIngestionBatch(

const batches: Map<string, { message: Message; pluginEvent: PipelineEvent }[]> = new Map()
for (const message of kafkaMessages) {
if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) {
if (shouldRerouteToOverflow && message.key == null) {
// Overflow detected by capture, reroute to overflow topic
// Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers
output.toOverflow.push(message)
Expand All @@ -334,12 +340,8 @@ export function splitIngestionBatch(
}

const eventKey = computeKey(pluginEvent)
if (
overflowMode === IngestionOverflowMode.Reroute &&
!ConfiguredLimiter.consume(eventKey, 1, message.timestamp)
) {
if (shouldRerouteToOverflow && !ConfiguredLimiter.consume(eventKey, 1, message.timestamp)) {
// Local overflow detection triggering, reroute to overflow topic too
message.key = null
ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc()
if (LoggingLimiter.consume(eventKey, 1)) {
status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`)
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ export interface PluginsServerConfig {
TASKS_PER_WORKER: number // number of parallel tasks per worker thread
INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch
INGESTION_BATCH_SIZE: number // kafka consumer batch size
INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion)
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: boolean // whether or not Kafka message keys should be preserved or discarded when messages are rerouted to overflow
TASK_TIMEOUT: number // how many seconds until tasks are timed out
DATABASE_URL: string // Postgres database URL
DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database
Expand Down
5 changes: 0 additions & 5 deletions plugin-server/src/utils/env-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Productio

export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT

export function isIngestionOverflowEnabled(): boolean {
const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED
return stringToBoolean(ingestionOverflowEnabled)
}

export function isOverflowBatchByDistinctId(): boolean {
const overflowBatchByDistinctId = process.env.INGESTION_OVERFLOW_BATCH_BY_DISTINCT_ID
return stringToBoolean(overflowBatchByDistinctId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,32 +107,35 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
expect(runEventPipeline).not.toHaveBeenCalled()
})

it('reroutes excess events to OVERFLOW topic', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
waitForAck: true,
})
it.each([IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly])(
'reroutes excess events to OVERFLOW topic (mode=%p)',
async (overflowMode) => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, overflowMode)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: overflowMode === IngestionOverflowMode.Reroute ? batch[0].key : undefined,
waitForAck: true,
})

// Event is not processed here
expect(runEventPipeline).not.toHaveBeenCalled()
})
// Event is not processed here
expect(runEventPipeline).not.toHaveBeenCalled()
}
)

it('does not reroute if not over capacity limit', async () => {
const now = Date.now()
Expand Down

0 comments on commit 85ef237

Please sign in to comment.