Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix idempotent send with delay #1843

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions crates/partition-store/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::invocation_status_table::{
CompletedInvocation, InFlightInvocationMetadata, InboxedInvocation, InvocationStatus,
InvocationStatusTable, NeoInvocationStatus, ReadOnlyInvocationStatusTable, ScheduledInvocation,
SourceTable,
InvocationStatusTable, NeoInvocationStatus, PreFlightInvocationMetadata,
ReadOnlyInvocationStatusTable, ScheduledInvocation, SourceTable,
};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey};
Expand Down Expand Up @@ -94,8 +94,10 @@ fn put_invocation_status<S: StorageAccess>(
status: InvocationStatus,
) {
match &status {
InvocationStatus::Scheduled(ScheduledInvocation { source_table, .. })
| InvocationStatus::Inboxed(InboxedInvocation { source_table, .. })
InvocationStatus::Inboxed(InboxedInvocation {
metadata: PreFlightInvocationMetadata { source_table, .. },
..
})
| InvocationStatus::Invoked(InFlightInvocationMetadata { source_table, .. })
| InvocationStatus::Suspended {
metadata: InFlightInvocationMetadata { source_table, .. },
Expand All @@ -115,6 +117,20 @@ fn put_invocation_status<S: StorageAccess>(
}
}
}
InvocationStatus::Scheduled(ScheduledInvocation {
metadata: PreFlightInvocationMetadata { source_table, .. },
}) => {
assert_eq!(
*source_table,
SourceTable::New,
"Scheduled status can be stored only for NeoInvocationStatus table"
);
// The scheduled variant is only on the NeoInvocationStatus
storage.put_kv(
create_neo_invocation_status_key(invocation_id),
NeoInvocationStatus(status),
);
}
InvocationStatus::Free => {
// TODO remove this once we remove the old InvocationStatus
storage.delete_key(&create_invocation_status_key(invocation_id));
Expand Down
149 changes: 51 additions & 98 deletions crates/storage-api/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::future::Future;
use std::ops::RangeInclusive;
use std::time::Duration;

#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceTable {
Old,
New,
Expand Down Expand Up @@ -91,8 +91,8 @@ impl InvocationStatus {
#[inline]
pub fn invocation_target(&self) -> Option<&InvocationTarget> {
match self {
InvocationStatus::Scheduled(metadata) => Some(&metadata.invocation_target),
InvocationStatus::Inboxed(metadata) => Some(&metadata.invocation_target),
InvocationStatus::Scheduled(metadata) => Some(&metadata.metadata.invocation_target),
InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.invocation_target),
InvocationStatus::Invoked(metadata) => Some(&metadata.invocation_target),
InvocationStatus::Suspended { metadata, .. } => Some(&metadata.invocation_target),
InvocationStatus::Completed(completed) => Some(&completed.invocation_target),
Expand All @@ -103,8 +103,8 @@ impl InvocationStatus {
#[inline]
pub fn idempotency_key(&self) -> Option<&ByteString> {
match self {
InvocationStatus::Scheduled(metadata) => metadata.idempotency_key.as_ref(),
InvocationStatus::Inboxed(metadata) => metadata.idempotency_key.as_ref(),
InvocationStatus::Scheduled(metadata) => metadata.metadata.idempotency_key.as_ref(),
InvocationStatus::Inboxed(metadata) => metadata.metadata.idempotency_key.as_ref(),
InvocationStatus::Invoked(metadata) => metadata.idempotency_key.as_ref(),
InvocationStatus::Suspended { metadata, .. } => metadata.idempotency_key.as_ref(),
InvocationStatus::Completed(completed) => completed.idempotency_key.as_ref(),
Expand Down Expand Up @@ -171,8 +171,8 @@ impl InvocationStatus {
&mut self,
) -> Option<&mut HashSet<ServiceInvocationResponseSink>> {
match self {
InvocationStatus::Scheduled(metadata) => Some(&mut metadata.response_sinks),
InvocationStatus::Inboxed(metadata) => Some(&mut metadata.response_sinks),
InvocationStatus::Scheduled(metadata) => Some(&mut metadata.metadata.response_sinks),
InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.response_sinks),
InvocationStatus::Invoked(metadata) => Some(&mut metadata.response_sinks),
InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.response_sinks),
_ => None,
Expand All @@ -182,8 +182,8 @@ impl InvocationStatus {
#[inline]
pub fn get_response_sinks(&self) -> Option<&HashSet<ServiceInvocationResponseSink>> {
match self {
InvocationStatus::Scheduled(metadata) => Some(&metadata.response_sinks),
InvocationStatus::Inboxed(metadata) => Some(&metadata.response_sinks),
InvocationStatus::Scheduled(metadata) => Some(&metadata.metadata.response_sinks),
InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.response_sinks),
InvocationStatus::Invoked(metadata) => Some(&metadata.response_sinks),
InvocationStatus::Suspended { metadata, .. } => Some(&metadata.response_sinks),
_ => None,
Expand All @@ -193,8 +193,8 @@ impl InvocationStatus {
#[inline]
pub fn get_timestamps(&self) -> Option<&StatusTimestamps> {
match self {
InvocationStatus::Scheduled(metadata) => Some(&metadata.timestamps),
InvocationStatus::Inboxed(metadata) => Some(&metadata.timestamps),
InvocationStatus::Scheduled(metadata) => Some(&metadata.metadata.timestamps),
InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.timestamps),
InvocationStatus::Invoked(metadata) => Some(&metadata.timestamps),
InvocationStatus::Suspended { metadata, .. } => Some(&metadata.timestamps),
InvocationStatus::Completed(completed) => Some(&completed.timestamps),
Expand All @@ -204,8 +204,8 @@ impl InvocationStatus {

pub fn update_timestamps(&mut self) {
match self {
InvocationStatus::Scheduled(metadata) => metadata.timestamps.update(),
InvocationStatus::Inboxed(metadata) => metadata.timestamps.update(),
InvocationStatus::Scheduled(metadata) => metadata.metadata.timestamps.update(),
InvocationStatus::Inboxed(metadata) => metadata.metadata.timestamps.update(),
InvocationStatus::Invoked(metadata) => metadata.timestamps.update(),
InvocationStatus::Suspended { metadata, .. } => metadata.timestamps.update(),
_ => {}
Expand Down Expand Up @@ -243,7 +243,7 @@ impl JournalMetadata {

/// This is similar to [ServiceInvocation].
#[derive(Debug, Clone, PartialEq)]
pub struct ScheduledInvocation {
pub struct PreFlightInvocationMetadata {
pub response_sinks: HashSet<ServiceInvocationResponseSink>,
pub timestamps: StatusTimestamps,

Expand All @@ -256,7 +256,7 @@ pub struct ScheduledInvocation {
pub span_context: ServiceInvocationSpanContext,
pub headers: Vec<Header>,
/// Time when the request should be executed
pub execution_time: MillisSinceEpoch,
pub execution_time: Option<MillisSinceEpoch>,
/// If zero, the invocation completion will not be retained.
pub completion_retention_time: Duration,
pub idempotency_key: Option<ByteString>,
Expand All @@ -265,7 +265,18 @@ pub struct ScheduledInvocation {
pub source_table: SourceTable,
}

#[derive(Debug, Clone, PartialEq)]
pub struct ScheduledInvocation {
pub metadata: PreFlightInvocationMetadata,
}

impl ScheduledInvocation {
pub fn from_pre_flight_invocation_metadata(metadata: PreFlightInvocationMetadata) -> Self {
Self { metadata }
}
}

impl PreFlightInvocationMetadata {
pub fn from_service_invocation(
service_invocation: ServiceInvocation,
source_table: SourceTable,
Expand All @@ -278,9 +289,7 @@ impl ScheduledInvocation {
source: service_invocation.source,
span_context: service_invocation.span_context,
headers: service_invocation.headers,
execution_time: service_invocation
.execution_time
.expect("Scheduled invocations must have an execution time set"),
execution_time: service_invocation.execution_time,
completion_retention_time: service_invocation
.completion_retention_time
.unwrap_or_default(),
Expand All @@ -295,68 +304,29 @@ impl ScheduledInvocation {
#[derive(Debug, Clone, PartialEq)]
pub struct InboxedInvocation {
pub inbox_sequence_number: u64,
pub response_sinks: HashSet<ServiceInvocationResponseSink>,
pub timestamps: StatusTimestamps,

// --- From ServiceInvocation
pub invocation_target: InvocationTarget,

// Could be split out of ServiceInvocation, e.g. InvocationContent or similar.
pub argument: Bytes,
pub source: Source,
pub span_context: ServiceInvocationSpanContext,
pub headers: Vec<Header>,
/// Time when the request should be executed
pub execution_time: Option<MillisSinceEpoch>,
/// If zero, the invocation completion will not be retained.
pub completion_retention_time: Duration,
pub idempotency_key: Option<ByteString>,

/// Used by the Table implementation to pick where to write
pub source_table: SourceTable,
pub metadata: PreFlightInvocationMetadata,
}

impl InboxedInvocation {
pub fn from_service_invocation(
service_invocation: ServiceInvocation,
pub fn from_pre_flight_invocation_metadata(
metadata: PreFlightInvocationMetadata,
inbox_sequence_number: u64,
source_table: SourceTable,
) -> Self {
Self {
inbox_sequence_number,
response_sinks: service_invocation.response_sink.into_iter().collect(),
timestamps: StatusTimestamps::now(),
invocation_target: service_invocation.invocation_target,
argument: service_invocation.argument,
source: service_invocation.source,
span_context: service_invocation.span_context,
headers: service_invocation.headers,
execution_time: service_invocation.execution_time,
completion_retention_time: service_invocation
.completion_retention_time
.unwrap_or_default(),
idempotency_key: service_invocation.idempotency_key,
source_table,
metadata,
}
}

pub fn from_scheduled_invocation(
scheduled_invocation: ScheduledInvocation,
mut scheduled_invocation: ScheduledInvocation,
inbox_sequence_number: u64,
) -> Self {
scheduled_invocation.metadata.timestamps.update();

Self {
inbox_sequence_number,
response_sinks: scheduled_invocation.response_sinks,
timestamps: scheduled_invocation.timestamps,
invocation_target: scheduled_invocation.invocation_target,
argument: scheduled_invocation.argument,
source: scheduled_invocation.source,
span_context: scheduled_invocation.span_context,
headers: scheduled_invocation.headers,
execution_time: Some(scheduled_invocation.execution_time),
completion_retention_time: scheduled_invocation.completion_retention_time,
idempotency_key: scheduled_invocation.idempotency_key,
source_table: scheduled_invocation.source_table,
metadata: scheduled_invocation.metadata,
}
}
}
Expand All @@ -378,53 +348,36 @@ pub struct InFlightInvocationMetadata {
}

impl InFlightInvocationMetadata {
pub fn from_service_invocation(
service_invocation: ServiceInvocation,
source_table: SourceTable,
pub fn from_pre_flight_invocation_metadata(
pre_flight_invocation_metadata: PreFlightInvocationMetadata,
) -> (Self, InvocationInput) {
(
Self {
invocation_target: service_invocation.invocation_target,
journal_metadata: JournalMetadata::initialize(service_invocation.span_context),
invocation_target: pre_flight_invocation_metadata.invocation_target,
journal_metadata: JournalMetadata::initialize(
pre_flight_invocation_metadata.span_context,
),
pinned_deployment: None,
response_sinks: service_invocation.response_sink.into_iter().collect(),
timestamps: StatusTimestamps::now(),
source: service_invocation.source,
completion_retention_time: service_invocation
.completion_retention_time
.unwrap_or_default(),
idempotency_key: service_invocation.idempotency_key,
source_table,
response_sinks: pre_flight_invocation_metadata.response_sinks,
timestamps: pre_flight_invocation_metadata.timestamps,
source: pre_flight_invocation_metadata.source,
completion_retention_time: pre_flight_invocation_metadata.completion_retention_time,
idempotency_key: pre_flight_invocation_metadata.idempotency_key,
source_table: pre_flight_invocation_metadata.source_table,
},
InvocationInput {
argument: service_invocation.argument,
headers: service_invocation.headers,
argument: pre_flight_invocation_metadata.argument,
headers: pre_flight_invocation_metadata.headers,
},
)
}

pub fn from_inboxed_invocation(
mut inboxed_invocation: InboxedInvocation,
) -> (Self, InvocationInput) {
inboxed_invocation.timestamps.update();
inboxed_invocation.metadata.timestamps.update();

(
Self {
invocation_target: inboxed_invocation.invocation_target,
journal_metadata: JournalMetadata::initialize(inboxed_invocation.span_context),
pinned_deployment: None,
response_sinks: inboxed_invocation.response_sinks,
timestamps: inboxed_invocation.timestamps,
source: inboxed_invocation.source,
completion_retention_time: inboxed_invocation.completion_retention_time,
idempotency_key: inboxed_invocation.idempotency_key,
source_table: inboxed_invocation.source_table,
},
InvocationInput {
argument: inboxed_invocation.argument,
headers: inboxed_invocation.headers,
},
)
Self::from_pre_flight_invocation_metadata(inboxed_invocation.metadata)
}

pub fn set_pinned_deployment(&mut self, pinned_deployment: PinnedDeployment) {
Expand Down
Loading
Loading