Skip to content

Developer Notes

Ryan Slominski edited this page Jul 13, 2022 · 41 revisions

Timers in Kafka Streams

We use the Kafka Streams Processor API to schedule punctuation callbacks. We only want the callback to run once, not repeat indefinitely, so cancelling of the punctuator is attempted as soon as the callback is triggered. It isn't guaranteed that we can cancel the punctuator immediately so it may run more than once. Writing null to the shelved-alarms topic multiple times vs once is idempotent if nothing occurs in-between: when a new message is consumed on the shelved-alarms topic the first action is to cancel the timer associated with the alarm (alarm name is the key), if any. This means if a new (modified) shelve expiration is produced it will result in the old timer being cancelled before the new timer is created. However there will always be some timing concerns with multiple distributed clients writing messages to Kafka - the broker imposes the order, but that order may not be apparent until after a user produces a message (so clients would need to consume from the shelved-alarms topic and observe the messages they produce may be interleaved with messages from others).

Happens-Before Constraints

Originally all override rules ran concurrently as separate stream processing apps running in parallel. The state processor would also run in parallel at the same time. Now processing is done serially in a pipeline one after the other to ensure synchronization and timing. We experimented with other strategies such as:

Windowing Configure a session window while consuming the "alarm-activations" topic with a 1 second duration in the state-processor, as that would likely allow enough time for all the various overrides to be computed (many overrides are triggered by messages on "alarm-activations"). The guarantees here don't seem as straight forward to reason about as with the next option (pipelining).

Pipelining (the option we ultimately went with) On-delays, off-delays, and masking are particularly sensitive to override computation timing. The alarm-state processor could be updated to consume from the last topic in a chain of topics such as:

merged-topics - > intermediate-ondelay-processed -> intermediate-offdelay-processed -> intermediate-mask-processed -> fully-processed-alarms

This would ensure all override processors have had a chance to run before effective state is computed. The first processor in the stream would merge all topics into one monolog topic.

Sync Topic A much more complicated, but potentially more throughput (parallelization) approach would be to have a sync topic that each override writes to using the active-message id (index?) plus override type as a key and the processor could wait until it has confirmation from each override processor as the trigger to end the current window. In other words, maybe there is a way to let everything run in parallel and then sync-up to merge results at the end?

App Consolidation If a single monolithic streams app did everything then threading tools and logic ordering could be used to minimize intermediate results. This has the disadvantage of one big hunk of complex code. This likely would likely be easiest using the regular Consumer and Producer APIs instead of KafkaStreams API.

Pipelining Details

Topic Joining

We consolidate alarm-activations, alarm-overrides, alarm-registrations, and alarm-classes topics into a single topic on a single partition to gain more ordering constraints. The topic key could consist of two fields: alarm name plus message type, where message type would be one of (active, register, class, disabled, filtered, masked, ondelayed, shelved, offdelayed, latched). Alternatively, we could use the alarm name as key and value would contain ALL related data. This second approach has the advantage of simplicity and also allows us to enrich the monolog record with processing hints for re-use by later processors.

The override processor would be consolidated into the state processor and might look like:

[admin app(s)] -> registered and class msgs ------
                                                 |
                                                 V         
[annunciator app(s)] -> active msgs -> single-consolidated-topic -> [pipeline-of-processors] -> effective-state
                                                 A
                                                 |
[operator app(s)] -> manual override msgs --------

The single consolidated app would handle auto-overrides (timers, latching), effective registrations from merging registrations with classes, override merging and precedence, and translating batch filter msgs to individual alarm overrides.

The single-consolidated topic must be a single partition "monolog", but the state-processor could actually produce to the effective-state topic using multiple partitions as long as all the data for any given alarm is on the same partition.

See Software Design

See Also

Clone this wiki locally