Skip to content

Commit

Permalink
Fix #1972.
Browse files Browse the repository at this point in the history
I shuffled around the code of store_completion (there was an indirection not needed)
  • Loading branch information
slinkydeveloper committed Sep 18, 2024
1 parent 241bcdf commit b666d60
Showing 1 changed file with 75 additions and 72 deletions.
147 changes: 75 additions & 72 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::time::Duration;
use std::time::Instant;
use tracing::error;
use utils::SpanExt;

pub struct StateMachine<Codec> {
Expand Down Expand Up @@ -1847,7 +1848,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
.unwrap_or(CompletionResult::Empty);
Codec::write_completion(&mut journal_entry, completion_result.clone())?;

Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, completion_result),
Expand All @@ -1857,7 +1858,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
"Trying to process entry {} for a target that has no state",
journal_entry.header().as_entry_type()
);
Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, CompletionResult::Empty),
Expand Down Expand Up @@ -1976,7 +1977,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
Codec::write_completion(&mut journal_entry, completion_result.clone())?;

// We can already forward the completion
Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, completion_result),
Expand Down Expand Up @@ -2008,7 +2009,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
)?;

// Forward completion
Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, completion_result),
Expand Down Expand Up @@ -2047,7 +2048,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
"Trying to process entry {} for a target that has no promises",
journal_entry.header().as_entry_type()
);
Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, CompletionResult::Success(Bytes::new())),
Expand Down Expand Up @@ -2078,7 +2079,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
Codec::write_completion(&mut journal_entry, completion_result.clone())?;

// Forward completion
Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, completion_result),
Expand All @@ -2088,7 +2089,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
"Trying to process entry {} for a target that has no promises",
journal_entry.header().as_entry_type()
);
Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, CompletionResult::Empty),
Expand Down Expand Up @@ -2165,7 +2166,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
Codec::write_completion(&mut journal_entry, completion_result.clone())?;

// Forward completion
Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, completion_result),
Expand All @@ -2175,7 +2176,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
"Trying to process entry {} for a target that has no promises",
journal_entry.header().as_entry_type()
);
Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, CompletionResult::Empty),
Expand Down Expand Up @@ -2337,7 +2338,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
{
Codec::write_completion(&mut journal_entry, completion_result.clone())?;

Self::do_forward_completion(
Self::forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, completion_result),
Expand Down Expand Up @@ -2433,7 +2434,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
waiting_for_completed_entries: &HashSet<EntryIndex>,
) -> Result<bool, Error> {
let resume_invocation = waiting_for_completed_entries.contains(&completion.entry_index);
Self::do_store_completion(ctx, invocation_id, completion).await?;
Self::store_completion(ctx, invocation_id, completion).await?;

Ok(resume_invocation)
}
Expand All @@ -2443,8 +2444,9 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
invocation_id: InvocationId,
completion: Completion,
) -> Result<(), Error> {
Self::do_store_completion(ctx, invocation_id, completion.clone()).await?;
Self::do_forward_completion(ctx, invocation_id, completion);
if Self::store_completion(ctx, invocation_id, completion.clone()).await? {
Self::forward_completion(ctx, invocation_id, completion);
}
Ok(())
}

Expand Down Expand Up @@ -3162,35 +3164,88 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
Ok(())
}

async fn do_store_completion<State: JournalTable>(
/// Returns `true` if the completion should be forwarded.
async fn store_completion<State: JournalTable>(
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
Completion {
entry_index,
result,
}: Completion,
) -> Result<(), Error> {
) -> Result<bool, Error> {
debug_if_leader!(
ctx.is_leader,
restate.journal.index = entry_index,
"Effect: Store completion {}",
"Store completion {}",
CompletionResultFmt(&result)
);

Self::store_completion(ctx.storage, &invocation_id, entry_index, result).await?;
if let Some(mut journal_entry) = ctx
.storage
.get_journal_entry(&invocation_id, entry_index)
.await?
.and_then(|journal_entry| match journal_entry {
JournalEntry::Entry(entry) => Some(entry),
JournalEntry::Completion(_) => None,
})
{
if journal_entry.ty() == EntryType::Awakeable
&& journal_entry.header().is_completed() == Some(true)
{
// We can ignore when we get an awakeable completion twice as they might be a result of
// some request being retried from the ingress to complete the awakeable.
// We'll use only the first completion, because changing the awakeable result
// after it has been completed for the first time can cause non-deterministic execution.
warn!(
restate.invocation.id = %invocation_id,
restate.journal.index = entry_index,
"Trying to complete an awakeable already completed. Ignoring this completion");
debug!("Discarded awakeable completion: {:?}", result);
return Ok(false);
}
if journal_entry.header().is_completed() == Some(true) {
// We use error level here as this can happen only in case there is some bug
// in the Partition Processor/Invoker.
error!(
restate.invocation.id = %invocation_id,
restate.journal.index = entry_index,
"Trying to complete the entry {:?}, but it's already completed. This is a bug.",
journal_entry.ty());
return Ok(false);
}

Ok(())
Codec::write_completion(&mut journal_entry, result)?;
ctx.storage
.put_journal_entry(
&invocation_id,
entry_index,
&JournalEntry::Entry(journal_entry),
)
.await;
Ok(true)
} else {
// In case we don't have the journal entry (only awakeables case),
// we'll send the completion afterward once we receive the entry.
ctx.storage
.put_journal_entry(
&invocation_id,
entry_index,
&JournalEntry::Completion(result),
)
.await;
Ok(false)
}
}

fn do_forward_completion<State>(
fn forward_completion<State>(
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
completion: Completion,
) {
debug_if_leader!(
ctx.is_leader,
restate.journal.index = completion.entry_index,
"Effect: Forward completion {} to deployment",
"Forward completion {} to deployment",
CompletionResultFmt(&completion.result)
);

Expand Down Expand Up @@ -3357,58 +3412,6 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {

Ok(())
}

/// Stores the given completion. Returns `true` if an [`RawEntry`] was completed.
async fn store_completion<State: JournalTable>(
state_storage: &mut State,
invocation_id: &InvocationId,
entry_index: EntryIndex,
completion_result: CompletionResult,
) -> Result<bool, Error> {
if let Some(mut journal_entry) = state_storage
.get_journal_entry(invocation_id, entry_index)
.await?
.and_then(|journal_entry| match journal_entry {
JournalEntry::Entry(entry) => Some(entry),
JournalEntry::Completion(_) => None,
})
{
if journal_entry.ty() == EntryType::Awakeable
&& journal_entry.header().is_completed() == Some(true)
{
// We can ignore when we get an awakeable completion twice as they might be a result of
// some request being retried from the ingress to complete the awakeable.
// We'll use only the first completion, because changing the awakeable result
// after it has been completed for the first time can cause non-deterministic execution.
warn!(
restate.invocation.id = %invocation_id,
restate.journal.index = entry_index,
"Trying to complete an awakeable already completed. Ignoring this completion");
debug!("Discarded awakeable completion: {:?}", completion_result);
return Ok(false);
}
Codec::write_completion(&mut journal_entry, completion_result)?;
state_storage
.put_journal_entry(
invocation_id,
entry_index,
&JournalEntry::Entry(journal_entry),
)
.await;
Ok(true)
} else {
// In case we don't have the journal entry (only awakeables case),
// we'll send the completion afterward once we receive the entry.
state_storage
.put_journal_entry(
invocation_id,
entry_index,
&JournalEntry::Completion(completion_result),
)
.await;
Ok(false)
}
}
}

// To write completions in the effects log
Expand Down

0 comments on commit b666d60

Please sign in to comment.