Optimize signal ingestion pipeline and create perf-test cli util #3337
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR introduces a blocking thread-safe FIFO queue in the signal ingestion pipeline. This ensures signals are processed in the order they were received and prevents race conditions where a signal instance can override a shared value (e.g. case associations). The queue has a max size of 500 that is also used to limit the number of instances retrieved by the query in
process_signals
. We also create ascoped_session
for each run of the scheduler.A boundary of
.filter(SignalInstance.created_at >= one_hour_ago)
was added to the ingestion query to prevent cases where we are unable to keep up for an hour, which should never happen, and if it does is an acceptable scenario to prevent performance issues and outages.Caching logic was introduced in
_should_update_signal_message
to prevent the Slackchat.update
method from being ratelimited.Finally, a stress test CLI utility was created that attempts to replicate signal ingestion via the
/instances
API endpoint.Usage:
I tested a push of 5000 instances from my M1 MBP.
timer results:
Non-deduplicated signal instance (calls many external API's) and results in new case.
Deduplicated signal instance.
We can play around with these configurable options to see what makes the most sense:
@scheduler.add(every(1).minutes, name="signal-process")
MAX_SIGNAL_INSTANCES = 500