From f047326efc6bb72281b4144286bdff9f317449a1 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 15 Aug 2024 13:58:37 +0200 Subject: [PATCH 1/2] Fix #1678 --- .../src/invocation_status_table/mod.rs | 17 +- .../src/invocation_status_table/mod.rs | 147 ++-- crates/storage-api/src/storage.rs | 169 ++-- crates/storage-api/src/timer_table/mod.rs | 1 + .../src/invocation_status/row.rs | 4 +- .../worker/src/partition/state_machine/mod.rs | 826 +++++++++++------- .../state_machine/tests/delayed_send.rs | 319 +++++++ .../partition/state_machine/tests/matchers.rs | 44 + .../src/partition/state_machine/tests/mod.rs | 135 ++- 9 files changed, 1163 insertions(+), 499 deletions(-) create mode 100644 crates/worker/src/partition/state_machine/tests/delayed_send.rs create mode 100644 crates/worker/src/partition/state_machine/tests/matchers.rs diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index e7b86eb2c..8d1b9e949 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -18,8 +18,8 @@ use futures_util::stream; use restate_rocksdb::RocksDbPerfGuard; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InboxedInvocation, InvocationStatus, - InvocationStatusTable, NeoInvocationStatus, ReadOnlyInvocationStatusTable, ScheduledInvocation, - SourceTable, + InvocationStatusTable, NeoInvocationStatus, PreFlightInvocationMetadata, + ReadOnlyInvocationStatusTable, SourceTable, }; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey}; @@ -94,8 +94,10 @@ fn put_invocation_status( status: InvocationStatus, ) { match &status { - InvocationStatus::Scheduled(ScheduledInvocation { source_table, .. }) - | InvocationStatus::Inboxed(InboxedInvocation { source_table, .. }) + InvocationStatus::Inboxed(InboxedInvocation { + metadata: PreFlightInvocationMetadata { source_table, .. }, + .. + }) | InvocationStatus::Invoked(InFlightInvocationMetadata { source_table, .. }) | InvocationStatus::Suspended { metadata: InFlightInvocationMetadata { source_table, .. }, @@ -115,6 +117,13 @@ fn put_invocation_status( } } } + InvocationStatus::Scheduled { .. } => { + // The scheduled variant is only on the NeoInvocationStatus + storage.put_kv( + create_neo_invocation_status_key(invocation_id), + NeoInvocationStatus(status), + ); + } InvocationStatus::Free => { // TODO remove this once we remove the old InvocationStatus storage.delete_key(&create_invocation_status_key(invocation_id)); diff --git a/crates/storage-api/src/invocation_status_table/mod.rs b/crates/storage-api/src/invocation_status_table/mod.rs index 16ab4349a..5de1b31b5 100644 --- a/crates/storage-api/src/invocation_status_table/mod.rs +++ b/crates/storage-api/src/invocation_status_table/mod.rs @@ -91,8 +91,8 @@ impl InvocationStatus { #[inline] pub fn invocation_target(&self) -> Option<&InvocationTarget> { match self { - InvocationStatus::Scheduled(metadata) => Some(&metadata.invocation_target), - InvocationStatus::Inboxed(metadata) => Some(&metadata.invocation_target), + InvocationStatus::Scheduled(metadata) => Some(&metadata.metadata.invocation_target), + InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.invocation_target), InvocationStatus::Invoked(metadata) => Some(&metadata.invocation_target), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.invocation_target), InvocationStatus::Completed(completed) => Some(&completed.invocation_target), @@ -103,8 +103,8 @@ impl InvocationStatus { #[inline] pub fn idempotency_key(&self) -> Option<&ByteString> { match self { - InvocationStatus::Scheduled(metadata) => metadata.idempotency_key.as_ref(), - InvocationStatus::Inboxed(metadata) => metadata.idempotency_key.as_ref(), + InvocationStatus::Scheduled(metadata) => metadata.metadata.idempotency_key.as_ref(), + InvocationStatus::Inboxed(metadata) => metadata.metadata.idempotency_key.as_ref(), InvocationStatus::Invoked(metadata) => metadata.idempotency_key.as_ref(), InvocationStatus::Suspended { metadata, .. } => metadata.idempotency_key.as_ref(), InvocationStatus::Completed(completed) => completed.idempotency_key.as_ref(), @@ -171,8 +171,8 @@ impl InvocationStatus { &mut self, ) -> Option<&mut HashSet> { match self { - InvocationStatus::Scheduled(metadata) => Some(&mut metadata.response_sinks), - InvocationStatus::Inboxed(metadata) => Some(&mut metadata.response_sinks), + InvocationStatus::Scheduled(metadata) => Some(&mut metadata.metadata.response_sinks), + InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.response_sinks), InvocationStatus::Invoked(metadata) => Some(&mut metadata.response_sinks), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.response_sinks), _ => None, @@ -182,8 +182,8 @@ impl InvocationStatus { #[inline] pub fn get_response_sinks(&self) -> Option<&HashSet> { match self { - InvocationStatus::Scheduled(metadata) => Some(&metadata.response_sinks), - InvocationStatus::Inboxed(metadata) => Some(&metadata.response_sinks), + InvocationStatus::Scheduled(metadata) => Some(&metadata.metadata.response_sinks), + InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.response_sinks), InvocationStatus::Invoked(metadata) => Some(&metadata.response_sinks), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.response_sinks), _ => None, @@ -193,8 +193,8 @@ impl InvocationStatus { #[inline] pub fn get_timestamps(&self) -> Option<&StatusTimestamps> { match self { - InvocationStatus::Scheduled(metadata) => Some(&metadata.timestamps), - InvocationStatus::Inboxed(metadata) => Some(&metadata.timestamps), + InvocationStatus::Scheduled(metadata) => Some(&metadata.metadata.timestamps), + InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.timestamps), InvocationStatus::Invoked(metadata) => Some(&metadata.timestamps), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.timestamps), InvocationStatus::Completed(completed) => Some(&completed.timestamps), @@ -204,8 +204,8 @@ impl InvocationStatus { pub fn update_timestamps(&mut self) { match self { - InvocationStatus::Scheduled(metadata) => metadata.timestamps.update(), - InvocationStatus::Inboxed(metadata) => metadata.timestamps.update(), + InvocationStatus::Scheduled(metadata) => metadata.metadata.timestamps.update(), + InvocationStatus::Inboxed(metadata) => metadata.metadata.timestamps.update(), InvocationStatus::Invoked(metadata) => metadata.timestamps.update(), InvocationStatus::Suspended { metadata, .. } => metadata.timestamps.update(), _ => {} @@ -243,7 +243,7 @@ impl JournalMetadata { /// This is similar to [ServiceInvocation]. #[derive(Debug, Clone, PartialEq)] -pub struct ScheduledInvocation { +pub struct PreFlightInvocationMetadata { pub response_sinks: HashSet, pub timestamps: StatusTimestamps, @@ -256,7 +256,7 @@ pub struct ScheduledInvocation { pub span_context: ServiceInvocationSpanContext, pub headers: Vec
, /// Time when the request should be executed - pub execution_time: MillisSinceEpoch, + pub execution_time: Option, /// If zero, the invocation completion will not be retained. pub completion_retention_time: Duration, pub idempotency_key: Option, @@ -265,7 +265,18 @@ pub struct ScheduledInvocation { pub source_table: SourceTable, } +#[derive(Debug, Clone, PartialEq)] +pub struct ScheduledInvocation { + pub metadata: PreFlightInvocationMetadata, +} + impl ScheduledInvocation { + pub fn from_pre_flight_invocation_metadata(metadata: PreFlightInvocationMetadata) -> Self { + Self { metadata } + } +} + +impl PreFlightInvocationMetadata { pub fn from_service_invocation( service_invocation: ServiceInvocation, source_table: SourceTable, @@ -278,9 +289,7 @@ impl ScheduledInvocation { source: service_invocation.source, span_context: service_invocation.span_context, headers: service_invocation.headers, - execution_time: service_invocation - .execution_time - .expect("Scheduled invocations must have an execution time set"), + execution_time: service_invocation.execution_time, completion_retention_time: service_invocation .completion_retention_time .unwrap_or_default(), @@ -295,68 +304,29 @@ impl ScheduledInvocation { #[derive(Debug, Clone, PartialEq)] pub struct InboxedInvocation { pub inbox_sequence_number: u64, - pub response_sinks: HashSet, - pub timestamps: StatusTimestamps, - - // --- From ServiceInvocation - pub invocation_target: InvocationTarget, - - // Could be split out of ServiceInvocation, e.g. InvocationContent or similar. - pub argument: Bytes, - pub source: Source, - pub span_context: ServiceInvocationSpanContext, - pub headers: Vec
, - /// Time when the request should be executed - pub execution_time: Option, - /// If zero, the invocation completion will not be retained. - pub completion_retention_time: Duration, - pub idempotency_key: Option, - - /// Used by the Table implementation to pick where to write - pub source_table: SourceTable, + pub metadata: PreFlightInvocationMetadata, } impl InboxedInvocation { - pub fn from_service_invocation( - service_invocation: ServiceInvocation, + pub fn from_pre_flight_invocation_metadata( + metadata: PreFlightInvocationMetadata, inbox_sequence_number: u64, - source_table: SourceTable, ) -> Self { Self { inbox_sequence_number, - response_sinks: service_invocation.response_sink.into_iter().collect(), - timestamps: StatusTimestamps::now(), - invocation_target: service_invocation.invocation_target, - argument: service_invocation.argument, - source: service_invocation.source, - span_context: service_invocation.span_context, - headers: service_invocation.headers, - execution_time: service_invocation.execution_time, - completion_retention_time: service_invocation - .completion_retention_time - .unwrap_or_default(), - idempotency_key: service_invocation.idempotency_key, - source_table, + metadata, } } pub fn from_scheduled_invocation( - scheduled_invocation: ScheduledInvocation, + mut scheduled_invocation: ScheduledInvocation, inbox_sequence_number: u64, ) -> Self { + scheduled_invocation.metadata.timestamps.update(); + Self { inbox_sequence_number, - response_sinks: scheduled_invocation.response_sinks, - timestamps: scheduled_invocation.timestamps, - invocation_target: scheduled_invocation.invocation_target, - argument: scheduled_invocation.argument, - source: scheduled_invocation.source, - span_context: scheduled_invocation.span_context, - headers: scheduled_invocation.headers, - execution_time: Some(scheduled_invocation.execution_time), - completion_retention_time: scheduled_invocation.completion_retention_time, - idempotency_key: scheduled_invocation.idempotency_key, - source_table: scheduled_invocation.source_table, + metadata: scheduled_invocation.metadata, } } } @@ -378,27 +348,26 @@ pub struct InFlightInvocationMetadata { } impl InFlightInvocationMetadata { - pub fn from_service_invocation( - service_invocation: ServiceInvocation, - source_table: SourceTable, + pub fn from_pre_flight_invocation_metadata( + pre_flight_invocation_metadata: PreFlightInvocationMetadata, ) -> (Self, InvocationInput) { ( Self { - invocation_target: service_invocation.invocation_target, - journal_metadata: JournalMetadata::initialize(service_invocation.span_context), + invocation_target: pre_flight_invocation_metadata.invocation_target, + journal_metadata: JournalMetadata::initialize( + pre_flight_invocation_metadata.span_context, + ), pinned_deployment: None, - response_sinks: service_invocation.response_sink.into_iter().collect(), - timestamps: StatusTimestamps::now(), - source: service_invocation.source, - completion_retention_time: service_invocation - .completion_retention_time - .unwrap_or_default(), - idempotency_key: service_invocation.idempotency_key, - source_table, + response_sinks: pre_flight_invocation_metadata.response_sinks, + timestamps: pre_flight_invocation_metadata.timestamps, + source: pre_flight_invocation_metadata.source, + completion_retention_time: pre_flight_invocation_metadata.completion_retention_time, + idempotency_key: pre_flight_invocation_metadata.idempotency_key, + source_table: pre_flight_invocation_metadata.source_table, }, InvocationInput { - argument: service_invocation.argument, - headers: service_invocation.headers, + argument: pre_flight_invocation_metadata.argument, + headers: pre_flight_invocation_metadata.headers, }, ) } @@ -406,25 +375,9 @@ impl InFlightInvocationMetadata { pub fn from_inboxed_invocation( mut inboxed_invocation: InboxedInvocation, ) -> (Self, InvocationInput) { - inboxed_invocation.timestamps.update(); + inboxed_invocation.metadata.timestamps.update(); - ( - Self { - invocation_target: inboxed_invocation.invocation_target, - journal_metadata: JournalMetadata::initialize(inboxed_invocation.span_context), - pinned_deployment: None, - response_sinks: inboxed_invocation.response_sinks, - timestamps: inboxed_invocation.timestamps, - source: inboxed_invocation.source, - completion_retention_time: inboxed_invocation.completion_retention_time, - idempotency_key: inboxed_invocation.idempotency_key, - source_table: inboxed_invocation.source_table, - }, - InvocationInput { - argument: inboxed_invocation.argument, - headers: inboxed_invocation.headers, - }, - ) + Self::from_pre_flight_invocation_metadata(inboxed_invocation.metadata) } pub fn set_pinned_deployment(&mut self, pinned_deployment: PinnedDeployment) { diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index 83c760887..156c1b64e 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -348,21 +348,23 @@ pub mod v1 { neo_invocation_status::Status::Scheduled => { Ok(crate::invocation_status_table::InvocationStatus::Scheduled( crate::invocation_status_table::ScheduledInvocation { - response_sinks, - timestamps, - invocation_target, - argument: expect_or_fail!(argument)?, - source, - span_context: expect_or_fail!(span_context)?.try_into()?, - headers, - execution_time: MillisSinceEpoch::new(expect_or_fail!( - execution_time - )?), - completion_retention_time: completion_retention_time - .unwrap_or_default() - .try_into()?, - idempotency_key: idempotency_key.map(ByteString::from), - source_table: crate::invocation_status_table::SourceTable::New, + metadata: + crate::invocation_status_table::PreFlightInvocationMetadata { + response_sinks, + timestamps, + invocation_target, + argument: expect_or_fail!(argument)?, + source, + span_context: expect_or_fail!(span_context)?.try_into()?, + headers, + execution_time: execution_time.map(MillisSinceEpoch::new), + completion_retention_time: completion_retention_time + .unwrap_or_default() + .try_into()?, + idempotency_key: idempotency_key.map(ByteString::from), + source_table: + crate::invocation_status_table::SourceTable::New, + }, }, )) } @@ -370,19 +372,23 @@ pub mod v1 { Ok(crate::invocation_status_table::InvocationStatus::Inboxed( crate::invocation_status_table::InboxedInvocation { inbox_sequence_number: expect_or_fail!(inbox_sequence_number)?, - response_sinks, - timestamps, - invocation_target, - argument: expect_or_fail!(argument)?, - source, - span_context: expect_or_fail!(span_context)?.try_into()?, - headers, - execution_time: execution_time.map(MillisSinceEpoch::new), - completion_retention_time: completion_retention_time - .unwrap_or_default() - .try_into()?, - idempotency_key: idempotency_key.map(ByteString::from), - source_table: crate::invocation_status_table::SourceTable::New, + metadata: + crate::invocation_status_table::PreFlightInvocationMetadata { + response_sinks, + timestamps, + invocation_target, + argument: expect_or_fail!(argument)?, + source, + span_context: expect_or_fail!(span_context)?.try_into()?, + headers, + execution_time: execution_time.map(MillisSinceEpoch::new), + completion_retention_time: completion_retention_time + .unwrap_or_default() + .try_into()?, + idempotency_key: idempotency_key.map(ByteString::from), + source_table: + crate::invocation_status_table::SourceTable::New, + }, }, )) } @@ -463,17 +469,20 @@ pub mod v1 { match value { crate::invocation_status_table::InvocationStatus::Scheduled( crate::invocation_status_table::ScheduledInvocation { - response_sinks, - timestamps, - invocation_target, - argument, - source, - span_context, - headers, - execution_time, - completion_retention_time, - idempotency_key, - source_table: _, + metadata: + crate::invocation_status_table::PreFlightInvocationMetadata { + response_sinks, + timestamps, + invocation_target, + argument, + source, + span_context, + headers, + execution_time, + completion_retention_time, + idempotency_key, + source_table: _, + }, }, ) => NeoInvocationStatus { status: neo_invocation_status::Status::Scheduled.into(), @@ -488,7 +497,7 @@ pub mod v1 { .collect(), argument: Some(argument), headers: headers.into_iter().map(Into::into).collect(), - execution_time: Some(execution_time.as_u64()), + execution_time: execution_time.map(|t| t.as_u64()), completion_retention_time: Some(completion_retention_time.into()), idempotency_key: idempotency_key.map(|key| key.to_string()), inbox_sequence_number: None, @@ -500,18 +509,21 @@ pub mod v1 { }, crate::invocation_status_table::InvocationStatus::Inboxed( crate::invocation_status_table::InboxedInvocation { + metadata: + crate::invocation_status_table::PreFlightInvocationMetadata { + response_sinks, + timestamps, + invocation_target, + argument, + source, + span_context, + headers, + execution_time, + completion_retention_time, + idempotency_key, + source_table: _, + }, inbox_sequence_number, - response_sinks, - timestamps, - invocation_target, - argument, - source, - span_context, - headers, - execution_time, - completion_retention_time, - idempotency_key, - source_table: _, }, ) => NeoInvocationStatus { status: neo_invocation_status::Status::Inboxed.into(), @@ -1041,20 +1053,22 @@ pub mod v1 { Ok(crate::invocation_status_table::InboxedInvocation { inbox_sequence_number: value.inbox_sequence_number, - response_sinks, - timestamps: crate::invocation_status_table::StatusTimestamps::new( - MillisSinceEpoch::new(value.creation_time), - MillisSinceEpoch::new(value.modification_time), - ), - source, - span_context, - headers, - argument: value.argument, - execution_time, - idempotency_key, - completion_retention_time, - invocation_target, - source_table: crate::invocation_status_table::SourceTable::Old, + metadata: crate::invocation_status_table::PreFlightInvocationMetadata { + response_sinks, + timestamps: crate::invocation_status_table::StatusTimestamps::new( + MillisSinceEpoch::new(value.creation_time), + MillisSinceEpoch::new(value.modification_time), + ), + source, + span_context, + headers, + argument: value.argument, + execution_time, + idempotency_key, + completion_retention_time, + invocation_target, + source_table: crate::invocation_status_table::SourceTable::Old, + }, }) } } @@ -1062,18 +1076,21 @@ pub mod v1 { impl From for Inboxed { fn from(value: crate::invocation_status_table::InboxedInvocation) -> Self { let crate::invocation_status_table::InboxedInvocation { - invocation_target, + metadata: + crate::invocation_status_table::PreFlightInvocationMetadata { + response_sinks, + timestamps, + invocation_target, + argument, + source, + span_context, + headers, + execution_time, + completion_retention_time, + idempotency_key, + source_table: _, + }, inbox_sequence_number, - response_sinks, - timestamps, - argument, - source, - span_context, - headers, - execution_time, - completion_retention_time, - idempotency_key, - source_table: _, } = value; let headers = headers.into_iter().map(Into::into).collect(); diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index f865961a3..5de76fc12 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -180,6 +180,7 @@ impl restate_types::timer::TimerKey for TimerKey { #[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Timer { + // TODO remove this variant when removing the old invocation status table Invoke(ServiceInvocation), CompleteJournalEntry(InvocationId, u32), CleanInvocationStatus(InvocationId), diff --git a/crates/storage-query-datafusion/src/invocation_status/row.rs b/crates/storage-query-datafusion/src/invocation_status/row.rs index 723819c64..d6fffe9f7 100644 --- a/crates/storage-query-datafusion/src/invocation_status/row.rs +++ b/crates/storage-query-datafusion/src/invocation_status/row.rs @@ -61,11 +61,11 @@ pub(crate) fn append_invocation_status_row( match invocation_status { InvocationStatus::Scheduled(scheduled) => { row.status("scheduled"); - fill_invoked_by(&mut row, output, scheduled.source); + fill_invoked_by(&mut row, output, scheduled.metadata.source); } InvocationStatus::Inboxed(inboxed) => { row.status("inboxed"); - fill_invoked_by(&mut row, output, inboxed.source); + fill_invoked_by(&mut row, output, inboxed.metadata.source); } InvocationStatus::Invoked(metadata) => { row.status("invoked"); diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index a52fc233f..e9708905b 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -29,10 +29,12 @@ use restate_storage_api::idempotency_table::IdempotencyMetadata; use restate_storage_api::idempotency_table::{IdempotencyTable, ReadOnlyIdempotencyTable}; use restate_storage_api::inbox_table::{InboxEntry, SequenceNumberInboxEntry}; use restate_storage_api::invocation_status_table; -use restate_storage_api::invocation_status_table::InvocationStatus; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InboxedInvocation, - ReadOnlyInvocationStatusTable, + PreFlightInvocationMetadata, ReadOnlyInvocationStatusTable, +}; +use restate_storage_api::invocation_status_table::{ + InvocationStatus, ScheduledInvocation, SourceTable, }; use restate_storage_api::journal_table::JournalEntry; use restate_storage_api::journal_table::ReadOnlyJournalTable; @@ -65,7 +67,6 @@ use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, }; -use restate_types::journal::raw::PlainRawEntry; use restate_types::journal::raw::{RawEntryCodec, RawEntryCodecError}; use restate_types::journal::Completion; use restate_types::journal::CompletionResult; @@ -396,7 +397,8 @@ impl StateMachine { ) -> Result<(), Error> { match command { Command::Invoke(service_invocation) => { - self.handle_invoke(&mut ctx, service_invocation).await + self.on_service_invocation(&mut ctx, service_invocation) + .await } Command::InvocationResponse(InvocationResponse { id, @@ -450,31 +452,133 @@ impl StateMachine { Ok(()) } Command::ScheduleTimer(timer) => { - Self::do_register_timer(&mut ctx, timer, Default::default()).await?; + Self::register_timer(&mut ctx, timer, Default::default()).await?; Ok(()) } } } - async fn handle_invoke( + async fn on_service_invocation( &mut self, ctx: &mut StateMachineApplyContext<'_, State>, - mut service_invocation: ServiceInvocation, + service_invocation: ServiceInvocation, ) -> Result<(), Error> { + let invocation_id = service_invocation.invocation_id; debug_assert!( self.partition_key_range.contains(&service_invocation.partition_key()), "Service invocation with partition key '{}' has been delivered to a partition processor with key range '{:?}'. This indicates a bug.", service_invocation.partition_key(), self.partition_key_range); - Span::record_invocation_id(&service_invocation.invocation_id); + Span::record_invocation_id(&invocation_id); Span::record_invocation_target(&service_invocation.invocation_target); service_invocation .span_context .as_parent() .attach_to_span(&Span::current()); - // If an idempotency key is set, handle idempotency + // Phases of an invocation + // 1. Try deduplicate it first + // 1.1. Deduplicate using idempotency id + // 1.2. Deduplicate for run once workflow semantics (only for workflow method of workflows) + // 2. Check if we need to schedule it + // 3. Check if we need to inbox it (only for exclusive methods of virtual objects) + // 4. Execute it + + // 1.1. Handle deduplication for idempotency id + let Some(service_invocation) = self + .handle_service_invocation_idempotency_id(ctx, service_invocation) + .await? + else { + // Invocation was deduplicated, nothing else to do here + return Ok(()); + }; + + // 1.2. Handle deduplication for workflows + let Some(mut service_invocation) = self + .handle_service_invocation_workflow_run(ctx, service_invocation) + .await? + else { + // Invocation was deduplicated, nothing else to do here + return Ok(()); + }; + + // Prepare PreFlightInvocationMetadata structure + let submit_notification_sink = service_invocation.submit_notification_sink.take(); + let pre_flight_invocation_metadata = PreFlightInvocationMetadata::from_service_invocation( + service_invocation, + self.default_invocation_status_source_table, + ); + + // 2. Check if we need to schedule it + let Some(pre_flight_invocation_metadata) = self + .handle_service_invocation_execution_time( + ctx, + invocation_id, + pre_flight_invocation_metadata, + ) + .await? + else { + // Invocation was scheduled, send back the ingress attach notification and return + Self::send_submit_notification_if_needed( + ctx, + invocation_id, + None, + submit_notification_sink, + ); + return Ok(()); + }; + + // 3. Check if we need to inbox it (only for exclusive methods of virtual objects) + let Some(pre_flight_invocation_metadata) = self + .handle_service_invocation_exclusive_handler( + ctx, + invocation_id, + pre_flight_invocation_metadata, + ) + .await? + else { + // Invocation was scheduled, send back the ingress attach notification and return + Self::send_submit_notification_if_needed( + ctx, + invocation_id, + None, + submit_notification_sink, + ); + // Invocation was inboxed, nothing else to do here + return Ok(()); + }; + + // 4. Execute it + Self::send_submit_notification_if_needed( + ctx, + invocation_id, + None, + submit_notification_sink, + ); + + let (in_flight_invocation_metadata, invocation_input) = + InFlightInvocationMetadata::from_pre_flight_invocation_metadata( + pre_flight_invocation_metadata, + ); + + Self::init_journal_and_invoke( + ctx, + invocation_id, + in_flight_invocation_metadata, + invocation_input, + ) + .await + } + + /// Returns the invocation in case the invocation was not deduplicated + async fn handle_service_invocation_idempotency_id< + State: StateReader + StateStorage + IdempotencyTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + service_invocation: ServiceInvocation, + ) -> Result, Error> { if let Some(idempotency_id) = service_invocation.compute_idempotency_id() { if service_invocation.invocation_target.invocation_target_ty() == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) @@ -496,11 +600,21 @@ impl StateMachine { Some(attached_invocation_id), service_invocation.submit_notification_sink, ); + debug_if_leader!( + ctx.is_leader, + restate.idempotency.id = ?idempotency_id, + "Invocation is deduplicated" + ); // Invocation was either resolved, or the sink was enqueued. Nothing else to do here. - return Ok(()); + return Ok(None); } + debug_if_leader!( + ctx.is_leader, + restate.idempotency.id = ?idempotency_id, + "First time we see this idempotency id, invocation will be processed"); + // Idempotent invocation needs to be processed for the first time, let's roll! Self::do_store_idempotency_id( ctx, @@ -510,64 +624,15 @@ impl StateMachine { .await?; } } + Ok(Some(service_invocation)) + } - // If an execution_time is set, we schedule the invocation to be processed later - if let Some(execution_time) = service_invocation.execution_time { - let span_context = service_invocation.span_context.clone(); - Self::do_register_timer( - ctx, - TimerKeyValue::invoke(execution_time, service_invocation), - span_context, - ) - .await?; - // The span will be created later on invocation - return Ok(()); - } - - // If it's exclusive, we need to acquire the exclusive lock - if service_invocation.invocation_target.invocation_target_ty() - == InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) - { - let keyed_service_id = service_invocation - .invocation_target - .as_keyed_service_id() - .expect( - "When the handler type is Exclusive, the invocation target must have a key", - ); - - let service_status = ctx - .storage - .get_virtual_object_status(&keyed_service_id) - .await?; - - // If locked, enqueue in inbox and be done with it - if let VirtualObjectStatus::Locked(_) = service_status { - let inbox_seq_number = self - .enqueue_into_inbox( - ctx, - InboxEntry::Invocation(keyed_service_id, service_invocation.invocation_id), - ) - .await?; - Self::send_submit_notification_if_needed( - ctx, - service_invocation.invocation_id, - None, - service_invocation.submit_notification_sink.take(), - ); - Self::do_store_inboxed_invocation( - ctx, - service_invocation.invocation_id, - InboxedInvocation::from_service_invocation( - service_invocation, - inbox_seq_number, - self.default_invocation_status_source_table, - ), - ) - .await?; - return Ok(()); - } - } - + /// Returns the invocation in case the invocation was not deduplicated + async fn handle_service_invocation_workflow_run( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + mut service_invocation: ServiceInvocation, + ) -> Result, Error> { if service_invocation.invocation_target.invocation_target_ty() == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) { @@ -625,24 +690,229 @@ impl StateMachine { Some(original_invocation_id), service_invocation.submit_notification_sink.take(), ); - return Ok(()); + + debug_if_leader!( + ctx.is_leader, + "Invocation to workflow method is deduplicated" + ); + + return Ok(None); } + debug_if_leader!( + ctx.is_leader, + "First time we see this workflow id, invocation will be processed" + ); + + ctx.storage + .store_service_status( + &keyed_service_id, + VirtualObjectStatus::Locked(service_invocation.invocation_id), + ) + .await?; } + Ok(Some(service_invocation)) + } - Self::send_submit_notification_if_needed( - ctx, - service_invocation.invocation_id, - None, - service_invocation.submit_notification_sink.take(), - ); + /// Returns the invocation in case the invocation was not scheduled + async fn handle_service_invocation_execution_time( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + metadata: PreFlightInvocationMetadata, + ) -> Result, Error> { + if let Some(execution_time) = metadata.execution_time { + let span_context = metadata.span_context.clone(); + match self.default_invocation_status_source_table { + SourceTable::Old => { + // TODO remove this code once we remove the Old table + Self::register_timer( + ctx, + TimerKeyValue::invoke( + execution_time, + ServiceInvocation { + invocation_id, + invocation_target: metadata.invocation_target, + argument: metadata.argument, + source: metadata.source, + span_context: span_context.clone(), + headers: metadata.headers, + execution_time: metadata.execution_time, + completion_retention_time: Some(metadata.completion_retention_time), + idempotency_key: metadata.idempotency_key, + response_sink: metadata.response_sinks.into_iter().next(), + submit_notification_sink: None, + }, + ), + span_context, + ) + .await?; + } + SourceTable::New => { + debug_if_leader!(ctx.is_leader, "Store scheduled invocation"); + + Self::register_timer( + ctx, + TimerKeyValue::neo_invoke(execution_time, invocation_id), + span_context, + ) + .await?; - // We're ready to invoke the service! - Self::do_invoke_service( + ctx.storage + .store_invocation_status( + &invocation_id.clone(), + InvocationStatus::Scheduled( + ScheduledInvocation::from_pre_flight_invocation_metadata(metadata), + ), + ) + .await?; + } + } + // The span will be created later on invocation + return Ok(None); + } + + Ok(Some(metadata)) + } + + /// Returns the invocation in case the invocation was not inboxed + async fn handle_service_invocation_exclusive_handler( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + metadata: PreFlightInvocationMetadata, + ) -> Result, Error> { + if metadata.invocation_target.invocation_target_ty() + == InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) + { + let keyed_service_id = metadata.invocation_target.as_keyed_service_id().expect( + "When the handler type is Exclusive, the invocation target must have a key", + ); + + let service_status = ctx + .storage + .get_virtual_object_status(&keyed_service_id) + .await?; + + if let VirtualObjectStatus::Locked(_) = service_status { + // If locked, enqueue in inbox and be done with it + let inbox_seq_number = self + .enqueue_into_inbox( + ctx, + InboxEntry::Invocation(keyed_service_id, invocation_id), + ) + .await?; + + debug_if_leader!( + ctx.is_leader, + restate.outbox.seq = inbox_seq_number, + "Store inboxed invocation" + ); + ctx.storage + .store_invocation_status( + &invocation_id, + InvocationStatus::Inboxed( + InboxedInvocation::from_pre_flight_invocation_metadata( + metadata, + inbox_seq_number, + ), + ), + ) + .await?; + + return Ok(None); + } else { + // If unlocked, lock it + debug_if_leader!( + ctx.is_leader, + restate.service.id = %keyed_service_id, + "Locking service" + ); + + ctx.storage + .store_service_status( + &keyed_service_id, + VirtualObjectStatus::Locked(invocation_id), + ) + .await?; + } + } + Ok(Some(metadata)) + } + + async fn init_journal_and_invoke( + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + mut in_flight_invocation_metadata: InFlightInvocationMetadata, + invocation_input: InvocationInput, + ) -> Result<(), Error> { + let invoke_input_journal = Self::init_journal( ctx, - service_invocation, - self.default_invocation_status_source_table, + invocation_id, + &mut in_flight_invocation_metadata, + invocation_input, ) .await?; + + Self::invoke( + ctx, + invocation_id, + in_flight_invocation_metadata, + invoke_input_journal, + ) + .await + } + + async fn init_journal( + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + in_flight_invocation_metadata: &mut InFlightInvocationMetadata, + invocation_input: InvocationInput, + ) -> Result { + debug_if_leader!(ctx.is_leader, "Init journal with input entry"); + + // In our current data model, ServiceInvocation has always an input, so initial length is 1 + in_flight_invocation_metadata.journal_metadata.length = 1; + + let input_entry = + Codec::serialize_as_input_entry(invocation_input.headers, invocation_input.argument); + + ctx.storage + .store_journal_entry(&invocation_id, 0, input_entry.clone()) + .await?; + + Ok(InvokeInputJournal::CachedJournal( + restate_invoker_api::JournalMetadata::new( + in_flight_invocation_metadata.journal_metadata.length, + in_flight_invocation_metadata + .journal_metadata + .span_context + .clone(), + None, + ), + vec![input_entry.erase_enrichment()], + )) + } + + async fn invoke( + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + in_flight_invocation_metadata: InFlightInvocationMetadata, + invoke_input_journal: InvokeInputJournal, + ) -> Result<(), Error> { + debug_if_leader!(ctx.is_leader, "Invoke"); + + ctx.action_collector.push(Action::Invoke { + invocation_id, + invocation_target: in_flight_invocation_metadata.invocation_target.clone(), + invoke_input_journal, + }); + ctx.storage + .store_invocation_status( + &invocation_id, + InvocationStatus::Invoked(in_flight_invocation_metadata), + ) + .await?; + Ok(()) } @@ -651,10 +921,20 @@ impl StateMachine { ctx: &mut StateMachineApplyContext<'_, State>, inbox_entry: InboxEntry, ) -> Result { - let inbox_seq_number = self.inbox_seq_number; - Self::do_enqueue_into_inbox(ctx, self.inbox_seq_number, inbox_entry).await?; + let seq_number = self.inbox_seq_number; + debug_if_leader!( + ctx.is_leader, + restate.inbox.seq = seq_number, + "Enqueue inbox entry" + ); + + ctx.storage + .enqueue_into_inbox(seq_number, inbox_entry) + .await?; + // need to store the next inbox sequence number + ctx.storage.store_inbox_seq_number(seq_number + 1).await?; self.inbox_seq_number += 1; - Ok(inbox_seq_number) + Ok(seq_number) } /// If an invocation id is returned, the request has been resolved and no further processing is needed @@ -867,10 +1147,13 @@ impl StateMachine { let InboxedInvocation { inbox_sequence_number, - response_sinks, - span_context, - invocation_target, - .. + metadata: + PreFlightInvocationMetadata { + response_sinks, + span_context, + invocation_target, + .. + }, } = inboxed_invocation; // Reply back to callers with error, and publish end trace @@ -1170,17 +1453,67 @@ impl StateMachine { // ServiceInvocations scheduled with a timer are always owned by the same partition processor // where the invocation should be executed - self.handle_invoke(ctx, service_invocation).await + self.on_service_invocation(ctx, service_invocation).await } Timer::CleanInvocationStatus(invocation_id) => { self.try_purge_invocation(ctx, invocation_id).await } - _ => { - todo!("Unimplemented") - } + Timer::NeoInvoke(invocation_id) => self.on_neo_invoke_timer(ctx, invocation_id).await, } } + async fn on_neo_invoke_timer( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + ) -> Result<(), Error> { + debug_if_leader!( + ctx.is_leader, + "Handle scheduled invocation timer with invocation id {invocation_id}" + ); + let invocation_status = Self::get_invocation_status_and_trace(ctx, &invocation_id).await?; + + if let InvocationStatus::Free = &invocation_status { + warn!("Fired a timer for an unknown invocation. The invocation might have been deleted/purged previously."); + return Ok(()); + } + + let_assert!( + InvocationStatus::Scheduled(scheduled_invocation) = invocation_status, + "Invocation {} should be in scheduled status", + invocation_id + ); + + // Scheduled invocations have been deduplicated already in on_service_invocation, and they already sent back the submit notification. + + // 3. Check if we need to inbox it (only for exclusive methods of virtual objects) + let Some(pre_flight_invocation_metadata) = self + .handle_service_invocation_exclusive_handler( + ctx, + invocation_id, + scheduled_invocation.metadata, + ) + .await? + else { + // Invocation was inboxed, nothing else to do here + return Ok(()); + }; + + // 4. Execute it + let (in_flight_invocation_metadata, invocation_input) = + InFlightInvocationMetadata::from_pre_flight_invocation_metadata( + pre_flight_invocation_metadata, + ); + + Self::init_journal_and_invoke( + ctx, + invocation_id, + in_flight_invocation_metadata, + invocation_input, + ) + .await + } + async fn try_invoker_effect< State: StateReader + StateStorage @@ -1314,7 +1647,7 @@ impl StateMachine { ); // Pop from inbox - Self::try_pop_inbox(ctx, &invocation_metadata.invocation_target).await?; + Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; // If there are any response sinks, or we need to store back the completed status, // we need to find the latest output entry @@ -1366,7 +1699,7 @@ impl StateMachine { Ok(()) } - async fn fail_invocation( + async fn fail_invocation( &mut self, ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, @@ -1397,7 +1730,7 @@ impl StateMachine { .await?; // Pop from inbox - Self::try_pop_inbox(ctx, &invocation_metadata.invocation_target).await?; + Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; // Store the completed status or free it if !invocation_metadata.completion_retention_time.is_zero() { @@ -1461,7 +1794,7 @@ impl StateMachine { Ok(()) } - async fn try_pop_inbox( + async fn consume_inbox( ctx: &mut StateMachineApplyContext<'_, State>, invocation_target: &InvocationTarget, ) -> Result<(), Error> { @@ -1469,14 +1802,69 @@ impl StateMachine { if invocation_target.invocation_target_ty() == InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) { - Self::do_pop_inbox( - ctx, - invocation_target.as_keyed_service_id().expect( - "When the handler type is Exclusive, the invocation target must have a key", - ), - ) - .await? + let keyed_service_id = invocation_target.as_keyed_service_id().expect( + "When the handler type is Exclusive, the invocation target must have a key", + ); + + debug_if_leader!( + ctx.is_leader, + rpc.service = %keyed_service_id, + "Consume inbox" + ); + + // Pop until we find the first inbox entry. + // Note: the inbox seq numbers can have gaps. + while let Some(inbox_entry) = ctx.storage.pop_inbox(&keyed_service_id).await? { + match inbox_entry.inbox_entry { + InboxEntry::Invocation(_, invocation_id) => { + let inboxed_status = + Self::get_invocation_status_and_trace(ctx, &invocation_id).await?; + + let_assert!( + InvocationStatus::Inboxed(inboxed_invocation) = inboxed_status, + "InvocationStatus must contain an Inboxed invocation for the id {}", + invocation_id + ); + + debug_if_leader!( + ctx.is_leader, + rpc.service = %keyed_service_id, + "Invoke inboxed" + ); + + // Lock the service + ctx.storage + .store_service_status( + &keyed_service_id, + VirtualObjectStatus::Locked(invocation_id), + ) + .await?; + + let (in_flight_invocation_meta, invocation_input) = + InFlightInvocationMetadata::from_inboxed_invocation(inboxed_invocation); + Self::init_journal_and_invoke( + ctx, + invocation_id, + in_flight_invocation_meta, + invocation_input, + ) + .await?; + + // Started a new invocation + return Ok(()); + } + InboxEntry::StateMutation(state_mutation) => { + Self::mutate_state(ctx.storage, state_mutation).await?; + } + } + } + + // We consumed the inbox, nothing else to do here + ctx.storage + .store_service_status(&keyed_service_id, VirtualObjectStatus::Unlocked) + .await?; } + Ok(()) } @@ -1833,7 +2221,7 @@ impl StateMachine { Entry::Sleep(SleepEntry { wake_up_time, .. }) = journal_entry.deserialize_entry_ref::()? ); - Self::do_register_timer( + Self::register_timer( ctx, TimerKeyValue::complete_journal_entry( MillisSinceEpoch::new(wake_up_time), @@ -2279,42 +2667,27 @@ impl StateMachine { request_id, }) = submit_notification_sink { - Self::do_ingress_submit_notification( - ctx, - IngressResponseEnvelope { + let attached_invocation_id = attached_invocation_id.unwrap_or(original_invocation_id); + + debug_if_leader!( + ctx.is_leader, + "Sending ingress attach invocation {} to {}", + original_invocation_id, + attached_invocation_id, + ); + + ctx.action_collector + .push(Action::IngressSubmitNotification(IngressResponseEnvelope { target_node: node_id, inner: ingress::SubmittedInvocationNotification { request_id, original_invocation_id, - attached_invocation_id: attached_invocation_id - .unwrap_or(original_invocation_id), + attached_invocation_id, }, - }, - ); + })); } } - pub(super) async fn do_invoke_service( - ctx: &mut StateMachineApplyContext<'_, S>, - service_invocation: ServiceInvocation, - source_table: invocation_status_table::SourceTable, - ) -> Result<(), Error> { - debug_if_leader!(ctx.is_leader, "Effect: Invoke service"); - - let invocation_id = service_invocation.invocation_id; - let (in_flight_invocation_meta, invocation_input) = - InFlightInvocationMetadata::from_service_invocation(service_invocation, source_table); - Self::invoke_service( - ctx.storage, - ctx.action_collector, - invocation_id, - in_flight_invocation_meta, - invocation_input, - ) - .await?; - Ok(()) - } - async fn do_resume_service( ctx: &mut StateMachineApplyContext<'_, S>, invocation_id: InvocationId, @@ -2376,28 +2749,6 @@ impl StateMachine { Ok(()) } - async fn do_store_inboxed_invocation( - ctx: &mut StateMachineApplyContext<'_, S>, - invocation_id: InvocationId, - inboxed_invocation: InboxedInvocation, - ) -> Result<(), Error> { - debug_if_leader!( - ctx.is_leader, - restate.invocation.id = %invocation_id, - restate.outbox.seq = inboxed_invocation.inbox_sequence_number, - "Effect: Store inboxed invocation" - ); - - ctx.storage - .store_invocation_status( - &invocation_id, - InvocationStatus::Inboxed(inboxed_invocation), - ) - .await?; - - Ok(()) - } - async fn do_store_completed_invocation( ctx: &mut StateMachineApplyContext<'_, S>, invocation_id: InvocationId, @@ -2442,41 +2793,6 @@ impl StateMachine { Ok(()) } - async fn do_enqueue_into_inbox( - ctx: &mut StateMachineApplyContext<'_, S>, - seq_number: MessageIndex, - inbox_entry: InboxEntry, - ) -> Result<(), Error> { - debug_if_leader!( - ctx.is_leader, - restate.inbox.seq = seq_number, - "Effect: Enqueue invocation in inbox" - ); - - ctx.storage - .enqueue_into_inbox(seq_number, inbox_entry) - .await?; - // need to store the next inbox sequence number - ctx.storage.store_inbox_seq_number(seq_number + 1).await?; - - Ok(()) - } - - async fn do_pop_inbox( - ctx: &mut StateMachineApplyContext<'_, S>, - service_id: ServiceId, - ) -> Result<(), Error> { - debug_if_leader!( - ctx.is_leader, - rpc.service = %service_id.service_name, - "Effect: Pop inbox" - ); - - Self::pop_from_inbox(ctx.storage, ctx.action_collector, service_id).await?; - - Ok(()) - } - async fn do_delete_inbox_entry( ctx: &mut StateMachineApplyContext<'_, S>, service_id: ServiceId, @@ -2657,7 +2973,7 @@ impl StateMachine { Ok(()) } - async fn do_register_timer( + async fn register_timer( ctx: &mut StateMachineApplyContext<'_, S>, timer_value: TimerKeyValue, span_context: ServiceInvocationSpanContext, @@ -2682,7 +2998,7 @@ impl StateMachine { restate.journal.index = entry_index, restate.timer.wake_up_time = %timer_value.wake_up_time(), restate.timer.key = %TimerKeyDisplay(timer_value.key()), - "Effect: Register Sleep timer" + "Register Sleep timer" ) } Timer::Invoke(service_invocation) => { @@ -2694,7 +3010,7 @@ impl StateMachine { restate.invocation.target = %service_invocation.invocation_target, restate.timer.wake_up_time = %timer_value.wake_up_time(), restate.timer.key = %TimerKeyDisplay(timer_value.key()), - "Effect: Register background invoke timer" + "Register background invoke timer" ) } Timer::NeoInvoke(invocation_id) => { @@ -2704,7 +3020,7 @@ impl StateMachine { restate.invocation.id = %invocation_id, restate.timer.wake_up_time = %timer_value.wake_up_time(), restate.timer.key = %TimerKeyDisplay(timer_value.key()), - "Effect: Register background invoke timer" + "Register background invoke timer" ) } Timer::CleanInvocationStatus(_) => { @@ -2712,7 +3028,7 @@ impl StateMachine { ctx.is_leader, restate.timer.wake_up_time = %timer_value.wake_up_time(), restate.timer.key = %TimerKeyDisplay(timer_value.key()), - "Effect: Register cleanup invocation status timer" + "Register cleanup invocation status timer" ) } }; @@ -3071,21 +3387,6 @@ impl StateMachine { .push(Action::IngressResponse(ingress_response)); } - fn do_ingress_submit_notification( - ctx: &mut StateMachineApplyContext<'_, S>, - attach_notification: IngressResponseEnvelope, - ) { - debug_if_leader!( - ctx.is_leader, - "Effect: Ingress attach invocation {} to {}", - attach_notification.inner.original_invocation_id, - attach_notification.inner.attached_invocation_id, - ); - - ctx.action_collector - .push(Action::IngressSubmitNotification(attach_notification)); - } - async fn do_put_promise( ctx: &mut StateMachineApplyContext<'_, S>, service_id: ServiceId, @@ -3113,53 +3414,6 @@ impl StateMachine { Ok(()) } - async fn pop_from_inbox( - state_storage: &mut S, - collector: &mut ActionCollector, - service_id: ServiceId, - ) -> Result<(), Error> - where - S: StateStorage + ReadOnlyInvocationStatusTable, - { - // Pop until we find the first inbox entry. - // Note: the inbox seq numbers can have gaps. - while let Some(inbox_entry) = state_storage.pop_inbox(&service_id).await? { - match inbox_entry.inbox_entry { - InboxEntry::Invocation(_, invocation_id) => { - let inboxed_status = - state_storage.get_invocation_status(&invocation_id).await?; - - let_assert!( - InvocationStatus::Inboxed(inboxed_invocation) = inboxed_status, - "InvocationStatus must contain an Inboxed invocation for the id {}", - invocation_id - ); - - let (in_flight_invocation_meta, invocation_input) = - InFlightInvocationMetadata::from_inboxed_invocation(inboxed_invocation); - Self::invoke_service( - state_storage, - collector, - invocation_id, - in_flight_invocation_meta, - invocation_input, - ) - .await?; - return Ok(()); - } - InboxEntry::StateMutation(state_mutation) => { - Self::mutate_state(state_storage, state_mutation).await?; - } - } - } - - state_storage - .store_service_status(&service_id, VirtualObjectStatus::Unlocked) - .await?; - - Ok(()) - } - async fn mutate_state( state_storage: &mut S, state_mutation: ExternalStateMutation, @@ -3201,72 +3455,6 @@ impl StateMachine { Ok(()) } - async fn invoke_service( - state_storage: &mut S, - collector: &mut ActionCollector, - invocation_id: InvocationId, - mut in_flight_invocation_metadata: InFlightInvocationMetadata, - invocation_input: InvocationInput, - ) -> Result<(), Error> { - // In our current data model, ServiceInvocation has always an input, so initial length is 1 - in_flight_invocation_metadata.journal_metadata.length = 1; - - let invocation_target_type = in_flight_invocation_metadata - .invocation_target - .invocation_target_ty(); - if invocation_target_type - == InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) - || invocation_target_type - == InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) - { - state_storage - .store_service_status( - &in_flight_invocation_metadata - .invocation_target - .as_keyed_service_id() - .unwrap(), - VirtualObjectStatus::Locked(invocation_id), - ) - .await?; - } - state_storage - .store_invocation_status( - &invocation_id, - InvocationStatus::Invoked(in_flight_invocation_metadata.clone()), - ) - .await?; - - let input_entry = - Codec::serialize_as_input_entry(invocation_input.headers, invocation_input.argument); - let (entry_header, serialized_entry) = input_entry.into_inner(); - - collector.push(Action::Invoke { - invocation_id, - invocation_target: in_flight_invocation_metadata.invocation_target, - invoke_input_journal: InvokeInputJournal::CachedJournal( - restate_invoker_api::JournalMetadata::new( - in_flight_invocation_metadata.journal_metadata.length, - in_flight_invocation_metadata.journal_metadata.span_context, - None, - ), - vec![PlainRawEntry::new( - entry_header.clone().erase_enrichment(), - serialized_entry.clone(), - )], - ), - }); - - state_storage - .store_journal_entry( - &invocation_id, - 0, - EnrichedRawEntry::new(entry_header, serialized_entry), - ) - .await?; - - Ok(()) - } - /// Stores the given completion. Returns `true` if an [`RawEntry`] was completed. async fn store_completion( state_storage: &mut S, diff --git a/crates/worker/src/partition/state_machine/tests/delayed_send.rs b/crates/worker/src/partition/state_machine/tests/delayed_send.rs new file mode 100644 index 000000000..389241c71 --- /dev/null +++ b/crates/worker/src/partition/state_machine/tests/delayed_send.rs @@ -0,0 +1,319 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use super::*; + +use restate_storage_api::inbox_table::ReadOnlyInboxTable; +use restate_types::invocation::SubmitNotificationSink; +use restate_types::time::MillisSinceEpoch; +use std::time::{Duration, SystemTime}; +use test_log::test; + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn send_with_delay() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope( + "mock-state-machine", + None, + MockStateMachine::create_with_neo_invocation_status_table(), + ) + .await; + + let invocation_target = InvocationTarget::mock_service(); + let invocation_id = InvocationId::mock_random(); + + let node_id = GenerationalNodeId::new(1, 1); + let request_id = IngressRequestId::default(); + + let wake_up_time = MillisSinceEpoch::from(SystemTime::now() + Duration::from_secs(60)); + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + response_sink: None, + submit_notification_sink: Some(SubmitNotificationSink::Ingress { + node_id, + request_id, + }), + // Doesn't matter the execution time here, just needs to be filled + execution_time: Some(wake_up_time), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + all!( + not(contains(matchers::actions::invoke_for_id(invocation_id))), + contains(pat!(Action::RegisterTimer { .. })), + contains(pat!(Action::IngressSubmitNotification(eq( + IngressResponseEnvelope { + target_node: node_id, + inner: ingress::SubmittedInvocationNotification { + request_id, + original_invocation_id: invocation_id, + attached_invocation_id: invocation_id + }, + } + )))) + ) + ); + + // Now fire the timer + let actions = state_machine + .apply(Command::Timer(TimerKeyValue::neo_invoke( + wake_up_time, + invocation_id, + ))) + .await; + + assert_that!( + actions, + all!( + contains(matchers::actions::invoke_for_id(invocation_id)), + not(contains(pat!(Action::IngressSubmitNotification(eq( + IngressResponseEnvelope { + target_node: node_id, + inner: ingress::SubmittedInvocationNotification { + request_id, + original_invocation_id: invocation_id, + attached_invocation_id: invocation_id + }, + } + ))))) + ) + ); + assert_that!( + state_machine + .rocksdb_storage + .get_invocation_status(&invocation_id) + .await, + ok(pat!(InvocationStatus::Invoked { .. })) + ); +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn send_with_delay_to_locked_virtual_object() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope( + "mock-state-machine", + None, + MockStateMachine::create_with_neo_invocation_status_table(), + ) + .await; + + let invocation_target = InvocationTarget::mock_virtual_object(); + let invocation_id = InvocationId::generate(&invocation_target); + + let node_id = GenerationalNodeId::new(1, 1); + let request_id = IngressRequestId::default(); + + let wake_up_time = MillisSinceEpoch::from(SystemTime::now() + Duration::from_secs(60)); + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + response_sink: None, + submit_notification_sink: Some(SubmitNotificationSink::Ingress { + node_id, + request_id, + }), + // Doesn't matter the execution time here, just needs to be filled + execution_time: Some(wake_up_time), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + all!( + not(contains(matchers::actions::invoke_for_id(invocation_id))), + contains(pat!(Action::RegisterTimer { .. })), + contains(pat!(Action::IngressSubmitNotification(eq( + IngressResponseEnvelope { + target_node: node_id, + inner: ingress::SubmittedInvocationNotification { + request_id, + original_invocation_id: invocation_id, + attached_invocation_id: invocation_id + }, + } + )))) + ) + ); + + // Now lock the service_id + let mut tx = state_machine.rocksdb_storage.transaction(); + tx.put_virtual_object_status( + &invocation_target.as_keyed_service_id().unwrap(), + VirtualObjectStatus::Locked(InvocationId::generate(&invocation_target)), + ) + .await; + tx.commit().await.unwrap(); + + // Now fire the timer + let actions = state_machine + .apply(Command::Timer(TimerKeyValue::neo_invoke( + wake_up_time, + invocation_id, + ))) + .await; + + assert_that!( + actions, + all!( + not(contains(matchers::actions::invoke_for_id(invocation_id))), + not(contains(pat!(Action::IngressSubmitNotification(eq( + IngressResponseEnvelope { + target_node: node_id, + inner: ingress::SubmittedInvocationNotification { + request_id, + original_invocation_id: invocation_id, + attached_invocation_id: invocation_id + }, + } + ))))) + ) + ); + assert_that!( + state_machine + .rocksdb_storage + .get_invocation_status(&invocation_id) + .await, + ok(pat!(InvocationStatus::Inboxed { .. })) + ); + assert_that!( + state_machine + .rocksdb_storage + .inbox(&invocation_target.as_keyed_service_id().unwrap()) + .try_collect::>() + .await, + ok(contains(matchers::storage::invocation_inbox_entry( + invocation_id, + &invocation_target + ))) + ); +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn send_with_delay_and_idempotency_key() { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope( + "mock-state-machine", + None, + MockStateMachine::create_with_neo_invocation_status_table(), + ) + .await; + + let idempotency_key = ByteString::from_static("my-idempotency-key"); + let retention = Duration::from_secs(60) * 60 * 24; + let invocation_target = InvocationTarget::mock_virtual_object(); + let first_invocation_id = InvocationId::generate_with_idempotency_key( + &invocation_target, + Some(idempotency_key.clone()), + ); + + let node_id = GenerationalNodeId::new(1, 1); + let request_id_1 = IngressRequestId::default(); + + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id: first_invocation_id, + invocation_target: invocation_target.clone(), + idempotency_key: Some(idempotency_key.clone()), + response_sink: None, + submit_notification_sink: Some(SubmitNotificationSink::Ingress { + node_id, + request_id: request_id_1, + }), + completion_retention_time: Some(retention), + // Doesn't matter the execution time here, just needs to be filled + execution_time: Some(MillisSinceEpoch::from( + SystemTime::now() + Duration::from_secs(60), + )), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + all!( + not(contains(matchers::actions::invoke_for_id( + first_invocation_id + ))), + contains(pat!(Action::RegisterTimer { .. })), + contains(pat!(Action::IngressSubmitNotification(eq( + IngressResponseEnvelope { + target_node: node_id, + inner: ingress::SubmittedInvocationNotification { + request_id: request_id_1, + original_invocation_id: first_invocation_id, + attached_invocation_id: first_invocation_id + }, + } + )))) + ) + ); + + // Send another invocation which reattaches to the original one + let second_invocation_id = InvocationId::generate_with_idempotency_key( + &invocation_target, + Some(idempotency_key.clone()), + ); + let request_id_2 = IngressRequestId::default(); + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id: second_invocation_id, + invocation_target: invocation_target.clone(), + idempotency_key: Some(idempotency_key), + response_sink: None, + submit_notification_sink: Some(SubmitNotificationSink::Ingress { + node_id, + request_id: request_id_2, + }), + completion_retention_time: Some(retention), + // Doesn't matter the execution time here, just needs to be filled + execution_time: Some(MillisSinceEpoch::from( + SystemTime::now() + Duration::from_secs(60), + )), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + all!( + not(contains(matchers::actions::invoke_for_id( + first_invocation_id + ))), + not(contains(matchers::actions::invoke_for_id( + second_invocation_id + ))), + contains(pat!(Action::IngressSubmitNotification(eq( + IngressResponseEnvelope { + target_node: node_id, + inner: ingress::SubmittedInvocationNotification { + request_id: request_id_2, + original_invocation_id: second_invocation_id, + attached_invocation_id: first_invocation_id + }, + } + )))) + ) + ); +} diff --git a/crates/worker/src/partition/state_machine/tests/matchers.rs b/crates/worker/src/partition/state_machine/tests/matchers.rs new file mode 100644 index 000000000..38a542afa --- /dev/null +++ b/crates/worker/src/partition/state_machine/tests/matchers.rs @@ -0,0 +1,44 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use googletest::prelude::*; + +pub mod storage { + use super::*; + + use restate_storage_api::inbox_table::{InboxEntry, SequenceNumberInboxEntry}; + use restate_types::identifiers::InvocationId; + use restate_types::invocation::InvocationTarget; + + pub fn invocation_inbox_entry( + invocation_id: InvocationId, + invocation_target: &InvocationTarget, + ) -> impl Matcher { + pat!(SequenceNumberInboxEntry { + inbox_entry: pat!(InboxEntry::Invocation( + eq(invocation_target.as_keyed_service_id().unwrap()), + eq(invocation_id) + )) + }) + } +} + +pub mod actions { + use super::*; + + use crate::partition::state_machine::Action; + use restate_types::identifiers::InvocationId; + + pub fn invoke_for_id(invocation_id: InvocationId) -> impl Matcher { + pat!(Action::Invoke { + invocation_id: eq(invocation_id) + }) + } +} diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 4178d4fc2..67786933a 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -10,8 +10,10 @@ use super::*; +mod delayed_send; mod idempotency; mod kill_cancel; +mod matchers; mod workflow; use crate::partition::types::{InvokerEffect, InvokerEffectKind}; @@ -27,13 +29,16 @@ use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_rocksdb::RocksDbManager; use restate_service_protocol::awakeable_id::AwakeableIdentifier; use restate_service_protocol::codec::ProtobufRawEntryCodec; +use restate_storage_api::inbox_table::ReadOnlyInboxTable; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, ReadOnlyInvocationStatusTable, SourceTable, }; use restate_storage_api::journal_table::{JournalEntry, ReadOnlyJournalTable}; use restate_storage_api::outbox_table::OutboxTable; -use restate_storage_api::service_status_table::{VirtualObjectStatus, VirtualObjectStatusTable}; +use restate_storage_api::service_status_table::{ + ReadOnlyVirtualObjectStatusTable, VirtualObjectStatus, VirtualObjectStatusTable, +}; use restate_storage_api::state_table::{ReadOnlyStateTable, StateTable}; use restate_storage_api::Transaction; use restate_test_util::matchers::*; @@ -82,6 +87,17 @@ impl MockStateMachine { .await } + pub async fn create_with_neo_invocation_status_table() -> Self { + Self::create_with_state_machine(StateMachine::new( + 0, /* inbox_seq_number */ + 0, /* outbox_seq_number */ + None, /* outbox_head_seq_number */ + PartitionKey::MIN..=PartitionKey::MAX, + SourceTable::New, + )) + .await + } + pub async fn create_with_state_machine( state_machine: StateMachine, ) -> Self { @@ -845,6 +861,123 @@ async fn truncate_outbox_with_gap() -> Result<(), Error> { Ok(()) } +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn consecutive_exclusive_handler_invocations_will_use_inbox() -> TestResult { + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let mut state_machine = tc + .run_in_scope("mock-state-machine", None, MockStateMachine::create()) + .await; + + let (first_invocation_id, invocation_target) = + InvocationId::mock_with(InvocationTarget::mock_virtual_object()); + let keyed_service_id = invocation_target.as_keyed_service_id().unwrap(); + let second_invocation_id = InvocationId::generate(&invocation_target); + + // Let's start the first invocation + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id: first_invocation_id, + invocation_target: invocation_target.clone(), + ..ServiceInvocation::mock() + })) + .await; + assert_that!( + actions, + contains(matchers::actions::invoke_for_id(first_invocation_id)) + ); + assert_that!( + state_machine + .rocksdb_storage + .get_virtual_object_status(&keyed_service_id) + .await, + ok(eq(VirtualObjectStatus::Locked(first_invocation_id))) + ); + + // Let's start the second invocation + let actions = state_machine + .apply(Command::Invoke(ServiceInvocation { + invocation_id: second_invocation_id, + invocation_target: invocation_target.clone(), + ..ServiceInvocation::mock() + })) + .await; + + // This should have not been invoked, but it should rather be in the inbox + assert_that!( + actions, + not(contains(matchers::actions::invoke_for_id( + second_invocation_id + ))) + ); + assert_that!( + state_machine + .rocksdb_storage + .inbox(&keyed_service_id) + .try_collect::>() + .await, + ok(contains(matchers::storage::invocation_inbox_entry( + second_invocation_id, + &invocation_target + ))) + ); + assert_that!( + state_machine + .rocksdb_storage + .get_virtual_object_status(&keyed_service_id) + .await, + ok(eq(VirtualObjectStatus::Locked(first_invocation_id))) + ); + + // Send the End Effect to terminate the first invocation + let actions = state_machine + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id: first_invocation_id, + kind: InvokerEffectKind::End, + })) + .await; + // At this point we expect the invoke for the second, and also the lock updated + assert_that!( + actions, + contains(matchers::actions::invoke_for_id(second_invocation_id)) + ); + assert_that!( + state_machine + .rocksdb_storage + .get_virtual_object_status(&keyed_service_id) + .await, + ok(eq(VirtualObjectStatus::Locked(second_invocation_id))) + ); + + let _ = state_machine + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id: second_invocation_id, + kind: InvokerEffectKind::End, + })) + .await; + + // After the second was completed too, the inbox is empty and the service is unlocked + assert_that!( + state_machine + .rocksdb_storage + .inbox(&keyed_service_id) + .try_collect::>() + .await, + ok(empty()) + ); + assert_that!( + state_machine + .rocksdb_storage + .get_virtual_object_status(&keyed_service_id) + .await, + ok(eq(VirtualObjectStatus::Unlocked)) + ); + + Ok(()) +} + async fn mock_start_invocation_with_service_id( state_machine: &mut MockStateMachine, service_id: ServiceId, From c5fcb242a1bdadae7b4803249835bfa9c0f3a3d9 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 16 Aug 2024 09:04:45 +0200 Subject: [PATCH 2/2] Feedback --- .../src/invocation_status_table/mod.rs | 11 +++++++++-- .../src/invocation_status_table/mod.rs | 2 +- crates/worker/src/partition/state_machine/mod.rs | 16 ++++++++-------- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index 8d1b9e949..d57af778c 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -19,7 +19,7 @@ use restate_rocksdb::RocksDbPerfGuard; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InboxedInvocation, InvocationStatus, InvocationStatusTable, NeoInvocationStatus, PreFlightInvocationMetadata, - ReadOnlyInvocationStatusTable, SourceTable, + ReadOnlyInvocationStatusTable, ScheduledInvocation, SourceTable, }; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey}; @@ -117,7 +117,14 @@ fn put_invocation_status( } } } - InvocationStatus::Scheduled { .. } => { + InvocationStatus::Scheduled(ScheduledInvocation { + metadata: PreFlightInvocationMetadata { source_table, .. }, + }) => { + assert_eq!( + *source_table, + SourceTable::New, + "Scheduled status can be stored only for NeoInvocationStatus table" + ); // The scheduled variant is only on the NeoInvocationStatus storage.put_kv( create_neo_invocation_status_key(invocation_id), diff --git a/crates/storage-api/src/invocation_status_table/mod.rs b/crates/storage-api/src/invocation_status_table/mod.rs index 5de1b31b5..e64530cb8 100644 --- a/crates/storage-api/src/invocation_status_table/mod.rs +++ b/crates/storage-api/src/invocation_status_table/mod.rs @@ -24,7 +24,7 @@ use std::future::Future; use std::ops::RangeInclusive; use std::time::Duration; -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SourceTable { Old, New, diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index e9708905b..b81ab6be4 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -480,9 +480,9 @@ impl StateMachine { // Phases of an invocation // 1. Try deduplicate it first // 1.1. Deduplicate using idempotency id - // 1.2. Deduplicate for run once workflow semantics (only for workflow method of workflows) + // 1.2. Deduplicate for "run once" workflow semantics (only for workflow handlers of workflows services) // 2. Check if we need to schedule it - // 3. Check if we need to inbox it (only for exclusive methods of virtual objects) + // 3. Check if we need to inbox it (only for exclusive handlers of virtual objects services) // 4. Execute it // 1.1. Handle deduplication for idempotency id @@ -538,7 +538,7 @@ impl StateMachine { ) .await? else { - // Invocation was scheduled, send back the ingress attach notification and return + // Invocation was inboxed, send back the ingress attach notification and return Self::send_submit_notification_if_needed( ctx, invocation_id, @@ -571,7 +571,7 @@ impl StateMachine { .await } - /// Returns the invocation in case the invocation was not deduplicated + /// Returns the invocation in case the invocation is not a duplicate async fn handle_service_invocation_idempotency_id< State: StateReader + StateStorage + IdempotencyTable, >( @@ -603,7 +603,7 @@ impl StateMachine { debug_if_leader!( ctx.is_leader, restate.idempotency.id = ?idempotency_id, - "Invocation is deduplicated" + "Invocation is a duplicate" ); // Invocation was either resolved, or the sink was enqueued. Nothing else to do here. @@ -627,7 +627,7 @@ impl StateMachine { Ok(Some(service_invocation)) } - /// Returns the invocation in case the invocation was not deduplicated + /// Returns the invocation in case the invocation is not a duplicate async fn handle_service_invocation_workflow_run( &mut self, ctx: &mut StateMachineApplyContext<'_, State>, @@ -693,7 +693,7 @@ impl StateMachine { debug_if_leader!( ctx.is_leader, - "Invocation to workflow method is deduplicated" + "Invocation to workflow method is a duplicate" ); return Ok(None); @@ -713,7 +713,7 @@ impl StateMachine { Ok(Some(service_invocation)) } - /// Returns the invocation in case the invocation was not scheduled + /// Returns the invocation in case the invocation should run immediately async fn handle_service_invocation_execution_time( &mut self, ctx: &mut StateMachineApplyContext<'_, State>,