diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 4bd3f5cfe..eb0fb5899 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -260,8 +260,9 @@ where let shutdown = cancellation_watcher(); tokio::pin!(shutdown); + let in_memory_limit = updateable_options.load().in_memory_queue_length_limit(); // Prepare the segmented queue - let mut segmented_input_queue = SegmentQueue::init(tmp_dir, 1_056_784) + let mut segmented_input_queue = SegmentQueue::init(tmp_dir, in_memory_limit) .await .expect("Cannot initialize input spillable queue"); diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 85c3fdf8f..877cba64b 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -125,6 +125,13 @@ pub struct InvokerOptions { /// If empty, the system temporary directory will be used instead. tmp_dir: Option, + /// # Spill invocations to disk + /// + /// Defines the threshold after which queues invocations will spill to disk at + /// the path defined in `tmp-dir`. In other words, this is the number of invocations + /// that can be kept in memory before spilling to disk. + in_memory_queue_length_limit: NonZeroUsize, + /// # Limit number of concurrent invocations from this node /// /// Number of concurrent invocations that can be processed by the invoker. @@ -147,6 +154,10 @@ impl InvokerOptions { self.concurrent_invocations_limit.map(Into::into) } + pub fn in_memory_queue_length_limit(&self) -> usize { + self.in_memory_queue_length_limit.into() + } + pub fn message_size_limit(&self) -> Option { self.message_size_limit.map(Into::into) } @@ -162,6 +173,7 @@ impl Default for InvokerOptions { None, Some(Duration::from_secs(10)), ), + in_memory_queue_length_limit: NonZeroUsize::new(1_056_784).unwrap(), inactivity_timeout: Duration::from_secs(60).into(), abort_timeout: Duration::from_secs(60).into(), message_size_warning: NonZeroUsize::new(10_000_000).unwrap(), // 10MB