Skip to content

Commit

Permalink
Split invocation status in invocation status/service status, Change k…
Browse files Browse the repository at this point in the history
…eying of journal to invocation id (#1169)

* Split invocation status in invocation status/service status, Change keying of journal to invocation id
* ServiceStatus::Locked now has InvocationId, and not InvocationUuid
* Make sure CLI uses the right tables
  • Loading branch information
slinkydeveloper authored Feb 28, 2024
1 parent 2f7669b commit 230d139
Show file tree
Hide file tree
Showing 52 changed files with 1,789 additions and 1,450 deletions.
17 changes: 7 additions & 10 deletions cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ pub async fn count_deployment_active_inv(
Ok(client
.run_count_agg_query(format!(
"SELECT COUNT(id) AS inv_count \
FROM sys_status \
FROM sys_invocation_status \
WHERE pinned_deployment_id = '{}' \
GROUP BY pinned_deployment_id",
deployment_id
Expand Down Expand Up @@ -393,7 +393,7 @@ pub async fn count_deployment_active_inv_by_method(
service,
method,
COUNT(id) AS inv_count
FROM sys_status
FROM sys_invocation_status
WHERE pinned_deployment_id = '{}'
GROUP BY pinned_deployment_id, service, method",
deployment_id
Expand Down Expand Up @@ -480,7 +480,7 @@ pub async fn get_services_status(
END AS combined_status,
ss.id,
ss.created_at
FROM sys_status ss
FROM sys_invocation_status ss
LEFT JOIN sys_invocation_state sis ON ss.id = sis.id
WHERE ss.service IN {}
)
Expand Down Expand Up @@ -627,7 +627,7 @@ pub async fn get_locked_keys_status(
sis.last_attempt_deployment_id,
sis.next_retry_at,
sis.last_start_at
FROM sys_status ss
FROM sys_invocation_status ss
LEFT JOIN sys_invocation_state sis ON ss.id = sis.id
WHERE ss.service IN {}
)
Expand Down Expand Up @@ -737,7 +737,7 @@ pub async fn find_active_invocations(
svc.deployment_id as svc_latest_deployment,
dp.id as known_deployment_id,
ss.trace_id
FROM sys_status ss
FROM sys_invocation_status ss
LEFT JOIN sys_invocation_state sis ON ss.id = sis.id
LEFT JOIN sys_service svc ON svc.name = ss.service
LEFT JOIN sys_deployment dp ON dp.id = ss.pinned_deployment_id
Expand Down Expand Up @@ -973,12 +973,9 @@ pub async fn get_invocation_journal(
sj.invoked_method,
sj.invoked_service_key,
sj.sleep_wakeup_at
FROM sys_status ss
LEFT JOIN sys_journal sj
ON ss.service_key = sj.service_key
AND ss.service = sj.service
FROM sys_journal sj
WHERE
ss.id = '{}'
sj.invocation_id = '{}'
ORDER BY index DESC
LIMIT {}",
invocation_id, JOURNAL_QUERY_LIMIT,
Expand Down
12 changes: 7 additions & 5 deletions crates/ingress-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use prost::Message;
use restate_pb::restate::Event;
use restate_schema_api::subscription::{EventReceiverServiceInstanceType, Sink, Subscription};
use restate_types::errors::InvocationError;
use restate_types::identifiers::{FullInvocationId, InvocationUuid, WithPartitionKey};
use restate_types::identifiers::{FullInvocationId, InvocationUuid, ServiceId, WithPartitionKey};
use restate_types::invocation::{ServiceInvocation, ServiceInvocationSpanContext, SpanRelation};
use restate_types::message::MessageIndex;
use restate_types::GenerationalNodeId;
Expand Down Expand Up @@ -186,7 +186,7 @@ impl IngressRequest {
} = subscription.sink();

// Generate fid
let target_fid = FullInvocationId::generate(
let target_fid = FullInvocationId::generate(ServiceId::new(
&**name,
// TODO This should probably live somewhere and be unified with the rest of the key extraction logic
match instance_type {
Expand All @@ -208,7 +208,7 @@ impl IngressRequest {
}
EventReceiverServiceInstanceType::Singleton => Bytes::new(),
},
);
));

// Generate span context
let span_context = ServiceInvocationSpanContext::start(&target_fid, related_span);
Expand All @@ -222,8 +222,10 @@ impl IngressRequest {

Ok(if let Some(proxying_key) = proxying_key {
// For keyed events, we dispatch them through the Proxy service, to avoid scattering the offset info throughout all the partitions
let proxy_fid =
FullInvocationId::generate(restate_pb::PROXY_SERVICE_NAME, proxying_key);
let proxy_fid = FullInvocationId::generate(ServiceId::new(
restate_pb::PROXY_SERVICE_NAME,
proxying_key,
));

(
IngressRequest {
Expand Down
8 changes: 4 additions & 4 deletions crates/ingress-dispatcher/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ impl DispatcherLoopHandler {

(
ServiceInvocation {
fid: FullInvocationId::generate(
fid: FullInvocationId::generate(ServiceId::new(
restate_pb::IDEMPOTENT_INVOKER_SERVICE_NAME,
idempotency_fid_key.freeze(),
),
)),
method_name: restate_pb::IDEMPOTENT_INVOKER_INVOKE_METHOD_NAME
.to_string()
.into(),
Expand Down Expand Up @@ -375,7 +375,7 @@ mod tests {
.unwrap();

// Ask for a response, then drop the receiver
let fid = FullInvocationId::generate("MySvc", "MyKey");
let fid = FullInvocationId::generate(ServiceId::new("MySvc", "MyKey"));
let (invocation, response_rx) = IngressRequest::invocation(
fid.clone(),
"pippo",
Expand Down Expand Up @@ -420,7 +420,7 @@ mod tests {
.unwrap();

// Ask for a response, then drop the receiver
let fid = FullInvocationId::generate("MySvc", "MyKey");
let fid = FullInvocationId::generate(ServiceId::new("MySvc", "MyKey"));
let argument = Bytes::from_static(b"nbfjksdfs");
let idempotency_key = Bytes::copy_from_slice(b"123");
let (invocation, res) = IngressRequest::invocation(
Expand Down
3 changes: 2 additions & 1 deletion crates/ingress-grpc/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use restate_schema_api::key::KeyExtractor;
use restate_schema_api::proto_symbol::ProtoSymbolResolver;
use restate_schema_api::service::ServiceMetadataResolver;
use restate_types::errors::InvocationError;
use restate_types::identifiers::ServiceId;
use restate_types::invocation::SpanRelation;
use std::sync::Arc;
use std::task::Poll;
Expand Down Expand Up @@ -332,7 +333,7 @@ where
})?;


let fid = FullInvocationId::generate(service_name, key);
let fid = FullInvocationId::generate(ServiceId::new(service_name, key));
let span_relation = SpanRelation::Parent(ingress_span_context);

// Check if Idempotency-Key is available
Expand Down
2 changes: 2 additions & 0 deletions crates/invoker-api/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum InvokeInputJournal {
CachedJournal(JournalMetadata, Vec<PlainRawEntry>),
}

// TODO We can remove FullInvocationId awareness from the invoker completely

pub trait ServiceHandle {
type Future: Future<Output = Result<(), NotRunningError>>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::Result;
use bytestring::ByteString;
use futures_util::Stream;
use restate_types::identifiers::{
DeploymentId, EntryIndex, FullInvocationId, InvocationUuid, PartitionKey, ServiceId,
DeploymentId, EntryIndex, FullInvocationId, InvocationId, PartitionKey, ServiceId,
};
use restate_types::invocation::{
ServiceInvocationResponseSink, ServiceInvocationSpanContext, Source,
Expand Down Expand Up @@ -70,7 +70,7 @@ pub struct NotificationTarget {
pub method: String,
}

/// Status of a service instance.
/// Status of an invocation.
#[derive(Debug, Default, Clone, PartialEq)]
pub enum InvocationStatus {
Invoked(InvocationMetadata),
Expand All @@ -79,7 +79,6 @@ pub enum InvocationStatus {
waiting_for_completed_entries: HashSet<EntryIndex>,
},
Virtual {
invocation_uuid: InvocationUuid,
journal_metadata: JournalMetadata,
timestamps: StatusTimestamps,
completion_notification_target: NotificationTarget,
Expand All @@ -92,14 +91,11 @@ pub enum InvocationStatus {

impl InvocationStatus {
#[inline]
pub fn invocation_uuid(&self) -> Option<InvocationUuid> {
pub fn service_id(&self) -> Option<ServiceId> {
match self {
InvocationStatus::Invoked(metadata) => Some(metadata.invocation_uuid),
InvocationStatus::Suspended { metadata, .. } => Some(metadata.invocation_uuid),
InvocationStatus::Virtual {
invocation_uuid, ..
} => Some(*invocation_uuid),
InvocationStatus::Free => None,
InvocationStatus::Invoked(metadata) => Some(metadata.service_id.clone()),
InvocationStatus::Suspended { metadata, .. } => Some(metadata.service_id.clone()),
_ => None,
}
}

Expand Down Expand Up @@ -181,7 +177,7 @@ impl JournalMetadata {

#[derive(Debug, Clone, PartialEq)]
pub struct InvocationMetadata {
pub invocation_uuid: InvocationUuid,
pub service_id: ServiceId,
pub journal_metadata: JournalMetadata,
pub deployment_id: Option<DeploymentId>,
pub method: ByteString,
Expand All @@ -192,7 +188,7 @@ pub struct InvocationMetadata {

impl InvocationMetadata {
pub fn new(
invocation_uuid: InvocationUuid,
service_id: ServiceId,
journal_metadata: JournalMetadata,
deployment_id: Option<DeploymentId>,
method: ByteString,
Expand All @@ -201,7 +197,7 @@ impl InvocationMetadata {
source: Source,
) -> Self {
Self {
invocation_uuid,
service_id,
journal_metadata,
deployment_id,
method,
Expand All @@ -212,35 +208,29 @@ impl InvocationMetadata {
}
}

pub trait StatusTable: ReadOnlyStatusTable {
fn put_invocation_status(
pub trait ReadOnlyInvocationStatusTable {
fn get_invocation_status(
&mut self,
service_id: &ServiceId,
status: InvocationStatus,
) -> impl Future<Output = ()> + Send;
invocation_id: &InvocationId,
) -> impl Future<Output = Result<InvocationStatus>> + Send;

fn delete_invocation_status(
fn invoked_invocations(
&mut self,
service_id: &ServiceId,
) -> impl Future<Output = ()> + Send;
partition_key_range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<FullInvocationId>> + Send;
}

pub trait ReadOnlyStatusTable {
fn get_invocation_status(
&mut self,
service_id: &ServiceId,
) -> impl Future<Output = Result<Option<InvocationStatus>>> + Send;

fn get_invocation_status_from(
pub trait InvocationStatusTable: ReadOnlyInvocationStatusTable {
fn put_invocation_status(
&mut self,
partition_key: PartitionKey,
invocation_uuid: InvocationUuid,
) -> impl Future<Output = Result<Option<(ServiceId, InvocationStatus)>>> + Send;
invocation_id: &InvocationId,
status: InvocationStatus,
) -> impl Future<Output = ()> + Send;

fn invoked_invocations(
fn delete_invocation_status(
&mut self,
partition_key_range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<FullInvocationId>> + Send;
invocation_id: &InvocationId,
) -> impl Future<Output = ()> + Send;
}

#[cfg(any(test, feature = "mocks"))]
Expand All @@ -250,7 +240,7 @@ mod mocks {
impl InvocationMetadata {
pub fn mock() -> Self {
InvocationMetadata {
invocation_uuid: InvocationUuid::new(),
service_id: ServiceId::new("MyService", "MyKey"),
journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()),
deployment_id: None,
method: ByteString::from("mock"),
Expand Down
10 changes: 5 additions & 5 deletions crates/storage-api/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use crate::Result;
use futures_util::Stream;
use restate_types::identifiers::{EntryIndex, ServiceId};
use restate_types::identifiers::{EntryIndex, InvocationId};
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal::CompletionResult;
use std::future::Future;
Expand All @@ -25,28 +25,28 @@ pub enum JournalEntry {
pub trait ReadOnlyJournalTable {
fn get_journal_entry(
&mut self,
service_id: &ServiceId,
invocation_id: &InvocationId,
journal_index: u32,
) -> impl Future<Output = Result<Option<JournalEntry>>> + Send;

fn get_journal(
&mut self,
service_id: &ServiceId,
invocation_id: &InvocationId,
journal_length: EntryIndex,
) -> impl Stream<Item = Result<(EntryIndex, JournalEntry)>> + Send;
}

pub trait JournalTable: ReadOnlyJournalTable {
fn put_journal_entry(
&mut self,
service_id: &ServiceId,
invocation_id: &InvocationId,
journal_index: u32,
journal_entry: JournalEntry,
) -> impl Future<Output = ()> + Send;

fn delete_journal(
&mut self,
service_id: &ServiceId,
invocation_id: &InvocationId,
journal_length: EntryIndex,
) -> impl Future<Output = ()> + Send;
}
6 changes: 4 additions & 2 deletions crates/storage-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ pub type Result<T> = std::result::Result<T, StorageError>;
pub mod deduplication_table;
pub mod fsm_table;
pub mod inbox_table;
pub mod invocation_status_table;
pub mod journal_table;
pub mod outbox_table;
pub mod service_status_table;
pub mod state_table;
pub mod status_table;
pub mod timer_table;

pub trait Storage {
Expand All @@ -44,7 +45,8 @@ pub trait Storage {

pub trait Transaction:
state_table::StateTable
+ status_table::StatusTable
+ invocation_status_table::InvocationStatusTable
+ service_status_table::ServiceStatusTable
+ inbox_table::InboxTable
+ outbox_table::OutboxTable
+ deduplication_table::DeduplicationTable
Expand Down
37 changes: 37 additions & 0 deletions crates/storage-api/src/service_status_table/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::Result;
use restate_types::identifiers::{InvocationId, ServiceId};
use std::future::Future;

#[derive(Debug, Default, Clone, PartialEq)]
pub enum ServiceStatus {
Locked(InvocationId),
#[default]
Unlocked,
}

pub trait ReadOnlyServiceStatusTable {
fn get_service_status(
&mut self,
service_id: &ServiceId,
) -> impl Future<Output = Result<ServiceStatus>> + Send;
}

pub trait ServiceStatusTable: ReadOnlyServiceStatusTable {
fn put_service_status(
&mut self,
service_id: &ServiceId,
status: ServiceStatus,
) -> impl Future<Output = ()> + Send;

fn delete_service_status(&mut self, service_id: &ServiceId) -> impl Future<Output = ()> + Send;
}
Loading

0 comments on commit 230d139

Please sign in to comment.