Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Virtual journal #1287

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions crates/storage-api/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ impl StatusTimestamps {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct NotificationTarget {
pub service: ServiceId,
pub method: String,
}

/// Status of an invocation.
#[derive(Debug, Default, Clone, PartialEq)]
pub enum InvocationStatus {
Expand All @@ -78,12 +71,6 @@ pub enum InvocationStatus {
metadata: InvocationMetadata,
waiting_for_completed_entries: HashSet<EntryIndex>,
},
Virtual {
journal_metadata: JournalMetadata,
timestamps: StatusTimestamps,
completion_notification_target: NotificationTarget,
kill_notification_target: NotificationTarget,
},
/// Service instance is currently not invoked
#[default]
Free,
Expand All @@ -105,9 +92,6 @@ impl InvocationStatus {
InvocationStatus::Invoked(metadata) => Some(metadata.journal_metadata),
InvocationStatus::Suspended { metadata, .. } => Some(metadata.journal_metadata),
InvocationStatus::Free => None,
InvocationStatus::Virtual {
journal_metadata, ..
} => Some(journal_metadata),
}
}

Expand All @@ -117,9 +101,6 @@ impl InvocationStatus {
InvocationStatus::Invoked(metadata) => Some(&metadata.journal_metadata),
InvocationStatus::Suspended { metadata, .. } => Some(&metadata.journal_metadata),
InvocationStatus::Free => None,
InvocationStatus::Virtual {
journal_metadata, ..
} => Some(journal_metadata),
}
}

Expand All @@ -129,9 +110,6 @@ impl InvocationStatus {
InvocationStatus::Invoked(metadata) => Some(&mut metadata.journal_metadata),
InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.journal_metadata),
InvocationStatus::Free => None,
InvocationStatus::Virtual {
journal_metadata, ..
} => Some(journal_metadata),
}
}

Expand All @@ -141,15 +119,13 @@ impl InvocationStatus {
InvocationStatus::Invoked(metadata) => Some(&metadata.timestamps),
InvocationStatus::Suspended { metadata, .. } => Some(&metadata.timestamps),
InvocationStatus::Free => None,
InvocationStatus::Virtual { timestamps, .. } => Some(timestamps),
}
}

pub fn update_timestamps(&mut self) {
match self {
InvocationStatus::Invoked(metadata) => metadata.timestamps.update(),
InvocationStatus::Suspended { metadata, .. } => metadata.timestamps.update(),
InvocationStatus::Virtual { timestamps, .. } => timestamps.update(),
InvocationStatus::Free => {}
}
}
Expand Down
13 changes: 0 additions & 13 deletions crates/storage-proto/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,10 @@ message InvocationStatus {
message Free {
}

message Virtual {
JournalMeta journal_meta = 2;
string completion_notification_target_service_name = 3;
bytes completion_notification_target_service_key = 4;
string completion_notification_target_method = 5;
uint64 creation_time = 6;
uint64 modification_time = 7;
string kill_notification_target_service_name = 8;
bytes kill_notification_target_service_key = 9;
string kill_notification_target_method = 10;
}

oneof status {
Invoked invoked = 1;
Suspended suspended = 2;
Free free = 3;
Virtual virtual = 4;
}
}

Expand Down
121 changes: 1 addition & 120 deletions crates/storage-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod storage {
Awakeable, BackgroundCall, ClearAllState, ClearState, CompleteAwakeable, Custom,
GetState, GetStateKeys, Invoke, OutputStream, PollInputStream, SetState, Sleep,
};
use crate::storage::v1::invocation_status::{Free, Invoked, Suspended, Virtual};
use crate::storage::v1::invocation_status::{Free, Invoked, Suspended};
use crate::storage::v1::journal_entry::completion_result::{Empty, Failure, Success};
use crate::storage::v1::journal_entry::{
completion_result, CompletionResult, Entry, Kind,
Expand Down Expand Up @@ -147,20 +147,6 @@ pub mod storage {
waiting_for_completed_entries,
}
}
invocation_status::Status::Virtual(r#virtual) => {
let (
journal_metadata,
completion_notification_target,
kill_notification_target,
timestamps,
) = r#virtual.try_into()?;
restate_storage_api::invocation_status_table::InvocationStatus::Virtual {
journal_metadata,
completion_notification_target,
kill_notification_target,
timestamps,
}
}
invocation_status::Status::Free(_) => {
restate_storage_api::invocation_status_table::InvocationStatus::Free
}
Expand All @@ -185,17 +171,6 @@ pub mod storage {
metadata,
waiting_for_completed_entries,
))),
restate_storage_api::invocation_status_table::InvocationStatus::Virtual {
journal_metadata,
completion_notification_target,
kill_notification_target,
timestamps,
} => invocation_status::Status::Virtual(Virtual::from((
journal_metadata,
completion_notification_target,
kill_notification_target,
timestamps,
))),
restate_storage_api::invocation_status_table::InvocationStatus::Free => {
invocation_status::Status::Free(Free {})
}
Expand Down Expand Up @@ -404,100 +379,6 @@ pub mod storage {
}
}

impl TryFrom<Virtual>
for (
restate_storage_api::invocation_status_table::JournalMetadata,
restate_storage_api::invocation_status_table::NotificationTarget,
restate_storage_api::invocation_status_table::NotificationTarget,
restate_storage_api::invocation_status_table::StatusTimestamps,
)
{
type Error = ConversionError;

fn try_from(value: Virtual) -> Result<Self, Self::Error> {
let journal_metadata =
restate_storage_api::invocation_status_table::JournalMetadata::try_from(
value
.journal_meta
.ok_or(ConversionError::missing_field("journal_meta"))?,
)?;
let completion_notification_target =
restate_storage_api::invocation_status_table::NotificationTarget {
service: restate_types::identifiers::ServiceId::new(
value.completion_notification_target_service_name,
value.completion_notification_target_service_key,
),
method: value.completion_notification_target_method,
};
let kill_notification_target =
restate_storage_api::invocation_status_table::NotificationTarget {
service: restate_types::identifiers::ServiceId::new(
value.kill_notification_target_service_name,
value.kill_notification_target_service_key,
),
method: value.kill_notification_target_method,
};
let timestamps =
restate_storage_api::invocation_status_table::StatusTimestamps::new(
MillisSinceEpoch::new(value.creation_time),
MillisSinceEpoch::new(value.modification_time),
);

Ok((
journal_metadata,
completion_notification_target,
kill_notification_target,
timestamps,
))
}
}

impl
From<(
restate_storage_api::invocation_status_table::JournalMetadata,
restate_storage_api::invocation_status_table::NotificationTarget,
restate_storage_api::invocation_status_table::NotificationTarget,
restate_storage_api::invocation_status_table::StatusTimestamps,
)> for Virtual
{
fn from(
(
journal_metadata,
completion_notification_target,
kill_notification_target,
timestamps,
): (
restate_storage_api::invocation_status_table::JournalMetadata,
restate_storage_api::invocation_status_table::NotificationTarget,
restate_storage_api::invocation_status_table::NotificationTarget,
restate_storage_api::invocation_status_table::StatusTimestamps,
),
) -> Self {
let journal_meta = JournalMeta::from(journal_metadata);

Virtual {
journal_meta: Some(journal_meta),
completion_notification_target_service_name: completion_notification_target
.service
.service_name
.to_string(),
completion_notification_target_service_key: completion_notification_target
.service
.key,
completion_notification_target_method: completion_notification_target
.method,
kill_notification_target_service_name: kill_notification_target
.service
.service_name
.to_string(),
kill_notification_target_service_key: kill_notification_target.service.key,
kill_notification_target_method: kill_notification_target.method,
creation_time: timestamps.creation_time().as_u64(),
modification_time: timestamps.modification_time().as_u64(),
}
}
}

impl TryFrom<JournalMeta> for restate_storage_api::invocation_status_table::JournalMetadata {
type Error = ConversionError;

Expand Down
4 changes: 0 additions & 4 deletions crates/storage-query-datafusion/src/invocation_status/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ pub(crate) fn append_invocation_status_row(
row.status("suspended");
Some(metadata)
}
InvocationStatus::Virtual { .. } => {
row.status("virtual");
None
}
InvocationStatus::Free => {
row.status("free");
None
Expand Down
24 changes: 2 additions & 22 deletions crates/wal-protocol/src/effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@
use std::borrow::Cow;

use bytes::Bytes;
use restate_storage_api::invocation_status_table::NotificationTarget;
use restate_storage_api::outbox_table::OutboxMessage;
use restate_types::errors::InvocationError;
use restate_types::identifiers::{EntryIndex, FullInvocationId, InvocationId};
use restate_types::identifiers::{EntryIndex, FullInvocationId};
use restate_types::ingress::IngressResponse;
use restate_types::invocation::{
ServiceInvocationResponseSink, ServiceInvocationSpanContext, Source as InvocationSource,
};
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::invocation::{ServiceInvocationResponseSink, Source as InvocationSource};
use restate_types::time::MillisSinceEpoch;

#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -45,22 +41,6 @@ impl BuiltinServiceEffects {
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum BuiltinServiceEffect {
CreateJournal {
invocation_id: InvocationId,
span_context: ServiceInvocationSpanContext,
completion_notification_target: NotificationTarget,
kill_notification_target: NotificationTarget,
},
StoreEntry {
invocation_id: InvocationId,
entry_index: EntryIndex,
journal_entry: EnrichedRawEntry,
},
DropJournal {
invocation_id: InvocationId,
journal_length: EntryIndex,
},

SetState {
key: Cow<'static, str>,
value: Bytes,
Expand Down
3 changes: 0 additions & 3 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ impl Worker {
channel_size,
invoker_sender,
rocksdb_storage.clone(),
schemas.clone(),
)
})
.collect();
Expand All @@ -289,7 +288,6 @@ impl Worker {
channel_size: usize,
invoker_sender: InvokerChannelServiceHandle,
rocksdb_storage: RocksDBStorage,
schemas: Schemas,
) -> PartitionProcessor {
PartitionProcessor::new(
partition_id,
Expand All @@ -298,7 +296,6 @@ impl Worker {
channel_size,
invoker_sender,
rocksdb_storage,
schemas,
)
}

Expand Down
8 changes: 0 additions & 8 deletions crates/worker/src/partition/action_effect_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,6 @@ impl ActionEffectHandler {
.await?;
}
}
ActionEffect::Invocation(service_invocation) => {
let header = self.create_header(service_invocation.fid.partition_key());
append_envelope_to_bifrost(
&mut self.bifrost,
Envelope::new(header, Command::Invoke(service_invocation)),
)
.await?;
}
};

Ok(())
Expand Down
2 changes: 0 additions & 2 deletions crates/worker/src/partition/leadership/action_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use crate::partition::services::non_deterministic;
use crate::partition::shuffle;
use futures::{Stream, StreamExt};
use restate_types::invocation::ServiceInvocation;
use restate_wal_protocol::effects::BuiltinServiceEffects;
use restate_wal_protocol::timer::TimerValue;
use std::ops::DerefMut;
Expand Down Expand Up @@ -51,7 +50,6 @@ pub(crate) enum ActionEffect {
Shuffle(shuffle::OutboxTruncation),
Timer(TimerValue),
BuiltInInvoker(BuiltinServiceEffects),
Invocation(ServiceInvocation),
}

impl Stream for ActionEffectStream {
Expand Down
Loading
Loading