From 39f34753be0e27af8f2a0c651c14099d5538d23f Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 12 Jun 2024 11:48:48 +0100 Subject: [PATCH] [config] Adds invoker segment queue limit to configuration file This adds a new configuration key `in-memory-queue-length-limit` under `[worker.invoker]` to enable configuring the segment queue from the configuration file. The default value matches the currently hard coded value. --- crates/invoker-impl/src/lib.rs | 3 ++- crates/types/src/config/worker.rs | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 4bd3f5cfe..eb0fb5899 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -260,8 +260,9 @@ where let shutdown = cancellation_watcher(); tokio::pin!(shutdown); + let in_memory_limit = updateable_options.load().in_memory_queue_length_limit(); // Prepare the segmented queue - let mut segmented_input_queue = SegmentQueue::init(tmp_dir, 1_056_784) + let mut segmented_input_queue = SegmentQueue::init(tmp_dir, in_memory_limit) .await .expect("Cannot initialize input spillable queue"); diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 85c3fdf8f..877cba64b 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -125,6 +125,13 @@ pub struct InvokerOptions { /// If empty, the system temporary directory will be used instead. tmp_dir: Option, + /// # Spill invocations to disk + /// + /// Defines the threshold after which queues invocations will spill to disk at + /// the path defined in `tmp-dir`. In other words, this is the number of invocations + /// that can be kept in memory before spilling to disk. + in_memory_queue_length_limit: NonZeroUsize, + /// # Limit number of concurrent invocations from this node /// /// Number of concurrent invocations that can be processed by the invoker. @@ -147,6 +154,10 @@ impl InvokerOptions { self.concurrent_invocations_limit.map(Into::into) } + pub fn in_memory_queue_length_limit(&self) -> usize { + self.in_memory_queue_length_limit.into() + } + pub fn message_size_limit(&self) -> Option { self.message_size_limit.map(Into::into) } @@ -162,6 +173,7 @@ impl Default for InvokerOptions { None, Some(Duration::from_secs(10)), ), + in_memory_queue_length_limit: NonZeroUsize::new(1_056_784).unwrap(), inactivity_timeout: Duration::from_secs(60).into(), abort_timeout: Duration::from_secs(60).into(), message_size_warning: NonZeroUsize::new(10_000_000).unwrap(), // 10MB