diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index a9d410bd2..cbbe45bf6 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -89,6 +89,7 @@ use std::marker::PhantomData; use std::ops::RangeInclusive; use std::time::Duration; use std::time::Instant; +use tracing::error; use utils::SpanExt; pub struct StateMachine { @@ -1847,7 +1848,7 @@ impl StateMachine { .unwrap_or(CompletionResult::Empty); Codec::write_completion(&mut journal_entry, completion_result.clone())?; - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, completion_result), @@ -1857,7 +1858,7 @@ impl StateMachine { "Trying to process entry {} for a target that has no state", journal_entry.header().as_entry_type() ); - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, CompletionResult::Empty), @@ -1976,7 +1977,7 @@ impl StateMachine { Codec::write_completion(&mut journal_entry, completion_result.clone())?; // We can already forward the completion - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, completion_result), @@ -2008,7 +2009,7 @@ impl StateMachine { )?; // Forward completion - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, completion_result), @@ -2047,7 +2048,7 @@ impl StateMachine { "Trying to process entry {} for a target that has no promises", journal_entry.header().as_entry_type() ); - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, CompletionResult::Success(Bytes::new())), @@ -2078,7 +2079,7 @@ impl StateMachine { Codec::write_completion(&mut journal_entry, completion_result.clone())?; // Forward completion - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, completion_result), @@ -2088,7 +2089,7 @@ impl StateMachine { "Trying to process entry {} for a target that has no promises", journal_entry.header().as_entry_type() ); - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, CompletionResult::Empty), @@ -2165,7 +2166,7 @@ impl StateMachine { Codec::write_completion(&mut journal_entry, completion_result.clone())?; // Forward completion - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, completion_result), @@ -2175,7 +2176,7 @@ impl StateMachine { "Trying to process entry {} for a target that has no promises", journal_entry.header().as_entry_type() ); - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, CompletionResult::Empty), @@ -2337,7 +2338,7 @@ impl StateMachine { { Codec::write_completion(&mut journal_entry, completion_result.clone())?; - Self::do_forward_completion( + Self::forward_completion( ctx, invocation_id, Completion::new(entry_index, completion_result), @@ -2433,7 +2434,7 @@ impl StateMachine { waiting_for_completed_entries: &HashSet, ) -> Result { let resume_invocation = waiting_for_completed_entries.contains(&completion.entry_index); - Self::do_store_completion(ctx, invocation_id, completion).await?; + Self::store_completion(ctx, invocation_id, completion).await?; Ok(resume_invocation) } @@ -2443,8 +2444,9 @@ impl StateMachine { invocation_id: InvocationId, completion: Completion, ) -> Result<(), Error> { - Self::do_store_completion(ctx, invocation_id, completion.clone()).await?; - Self::do_forward_completion(ctx, invocation_id, completion); + if Self::store_completion(ctx, invocation_id, completion.clone()).await? { + Self::forward_completion(ctx, invocation_id, completion); + } Ok(()) } @@ -3162,27 +3164,80 @@ impl StateMachine { Ok(()) } - async fn do_store_completion( + /// Returns `true` if the completion should be forwarded. + async fn store_completion( ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, Completion { entry_index, result, }: Completion, - ) -> Result<(), Error> { + ) -> Result { debug_if_leader!( ctx.is_leader, restate.journal.index = entry_index, - "Effect: Store completion {}", + "Store completion {}", CompletionResultFmt(&result) ); - Self::store_completion(ctx.storage, &invocation_id, entry_index, result).await?; + if let Some(mut journal_entry) = ctx + .storage + .get_journal_entry(&invocation_id, entry_index) + .await? + .and_then(|journal_entry| match journal_entry { + JournalEntry::Entry(entry) => Some(entry), + JournalEntry::Completion(_) => None, + }) + { + if journal_entry.ty() == EntryType::Awakeable + && journal_entry.header().is_completed() == Some(true) + { + // We can ignore when we get an awakeable completion twice as they might be a result of + // some request being retried from the ingress to complete the awakeable. + // We'll use only the first completion, because changing the awakeable result + // after it has been completed for the first time can cause non-deterministic execution. + warn!( + restate.invocation.id = %invocation_id, + restate.journal.index = entry_index, + "Trying to complete an awakeable already completed. Ignoring this completion"); + debug!("Discarded awakeable completion: {:?}", result); + return Ok(false); + } + if journal_entry.header().is_completed() == Some(true) { + // We use error level here as this can happen only in case there is some bug + // in the Partition Processor/Invoker. + error!( + restate.invocation.id = %invocation_id, + restate.journal.index = entry_index, + "Trying to complete the entry {:?}, but it's already completed. This is a bug.", + journal_entry.ty()); + return Ok(false); + } - Ok(()) + Codec::write_completion(&mut journal_entry, result)?; + ctx.storage + .put_journal_entry( + &invocation_id, + entry_index, + &JournalEntry::Entry(journal_entry), + ) + .await; + Ok(true) + } else { + // In case we don't have the journal entry (only awakeables case), + // we'll send the completion afterward once we receive the entry. + ctx.storage + .put_journal_entry( + &invocation_id, + entry_index, + &JournalEntry::Completion(result), + ) + .await; + Ok(false) + } } - fn do_forward_completion( + fn forward_completion( ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, completion: Completion, @@ -3190,7 +3245,7 @@ impl StateMachine { debug_if_leader!( ctx.is_leader, restate.journal.index = completion.entry_index, - "Effect: Forward completion {} to deployment", + "Forward completion {} to deployment", CompletionResultFmt(&completion.result) ); @@ -3357,58 +3412,6 @@ impl StateMachine { Ok(()) } - - /// Stores the given completion. Returns `true` if an [`RawEntry`] was completed. - async fn store_completion( - state_storage: &mut State, - invocation_id: &InvocationId, - entry_index: EntryIndex, - completion_result: CompletionResult, - ) -> Result { - if let Some(mut journal_entry) = state_storage - .get_journal_entry(invocation_id, entry_index) - .await? - .and_then(|journal_entry| match journal_entry { - JournalEntry::Entry(entry) => Some(entry), - JournalEntry::Completion(_) => None, - }) - { - if journal_entry.ty() == EntryType::Awakeable - && journal_entry.header().is_completed() == Some(true) - { - // We can ignore when we get an awakeable completion twice as they might be a result of - // some request being retried from the ingress to complete the awakeable. - // We'll use only the first completion, because changing the awakeable result - // after it has been completed for the first time can cause non-deterministic execution. - warn!( - restate.invocation.id = %invocation_id, - restate.journal.index = entry_index, - "Trying to complete an awakeable already completed. Ignoring this completion"); - debug!("Discarded awakeable completion: {:?}", completion_result); - return Ok(false); - } - Codec::write_completion(&mut journal_entry, completion_result)?; - state_storage - .put_journal_entry( - invocation_id, - entry_index, - &JournalEntry::Entry(journal_entry), - ) - .await; - Ok(true) - } else { - // In case we don't have the journal entry (only awakeables case), - // we'll send the completion afterward once we receive the entry. - state_storage - .put_journal_entry( - invocation_id, - entry_index, - &JournalEntry::Completion(completion_result), - ) - .await; - Ok(false) - } - } } // To write completions in the effects log