Skip to content

Commit

Permalink
shared debounce key for person state
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Mar 19, 2024
1 parent 310a98c commit c0d3002
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 19 deletions.
49 changes: 33 additions & 16 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,31 @@ export class PersonState {
return undefined
}
if (isDistinctIdIllegal(mergeIntoDistinctId)) {
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
teamId,
'cannot_merge_with_illegal_distinct_id',
{
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
},
(teamId) => `${teamId}:mergeIntoDistinctId`
)
return undefined
}
if (isDistinctIdIllegal(otherPersonDistinctId)) {
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
teamId,
'cannot_merge_with_illegal_distinct_id',
{
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
},
(teamId) => `${teamId}:mergeIntoDistinctId`
)
return undefined
}
return promiseRetry(
Expand Down Expand Up @@ -403,12 +415,17 @@ export class PersonState {

// If merge isn't allowed, we will ignore it, log an ingestion warning and exit
if (!mergeAllowed) {
// TODO: add event UUID to the ingestion warning
await captureIngestionWarning(this.db.kafkaProducer, this.teamId, 'cannot_merge_already_identified', {
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
this.teamId,
'cannot_merge_already_identified',
{
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
},
(teamId) => `${teamId}:mergeIntoDistinctId`
)
status.warn('🤔', 'refused to merge an already identified user via an $identify or $create_alias call')
return mergeInto // We're returning the original person tied to distinct_id used for the event
}
Expand Down
13 changes: 10 additions & 3 deletions plugin-server/src/worker/ingestion/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,17 @@ export async function captureIngestionWarning(
teamId: TeamId,
type: string,
details: Record<string, any>,
debounce_key?: string
/**
* Optional key to debounce the warning. If not provided, the teamId and type will be used as the key.
* If a function is provided, it will be called with the teamId and type and should return a string key.
* If a string is provided it is combined with the teamId and type.
*/
debounceKey?: string | ((teamId: number, type: string) => string)
) {
const limiter_key = `${teamId}:${type}:${debounce_key}`
if (IngestionWarningLimiter.consume(limiter_key, 1)) {
const limiterKey =
typeof debounceKey === 'function' ? debounceKey(teamId, type) : `${teamId}:${type}:${debounceKey}`

if (IngestionWarningLimiter.consume(limiterKey, 1)) {
await kafkaProducer.queueMessage({
topic: KAFKA_INGESTION_WARNINGS,
messages: [
Expand Down

0 comments on commit c0d3002

Please sign in to comment.