From 37ec635e8f045ec51506e8679fc822527776245b Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Tue, 19 Mar 2024 11:32:19 +0100 Subject: [PATCH] More terminology changes in tables (#1283) * Rename sys_service_status to sys_virtual_object_status. Rename fields using service term to component. Rename ServiceStatus to VirtualObjectStatus in storage-api * Fix queries in CLI + more terminology changes --- cli/src/clients/datafusion_helpers.rs | 210 +++++++++--------- cli/src/commands/components/status/mod.rs | 44 ++-- cli/src/commands/invocations/describe.rs | 13 +- cli/src/commands/invocations/list.rs | 18 +- cli/src/commands/state/util.rs | 2 +- cli/src/ui/invocations.rs | 12 +- crates/storage-api/src/lib.rs | 2 +- .../src/service_status_table/mod.rs | 19 +- crates/storage-proto/src/lib.rs | 10 +- .../storage-query-datafusion/src/inbox/row.rs | 10 +- .../src/inbox/schema.rs | 8 +- .../src/invocation_state/row.rs | 4 +- .../src/invocation_state/schema.rs | 4 +- .../src/invocation_status/row.rs | 10 +- .../src/invocation_status/schema.rs | 8 +- .../src/journal/row.rs | 8 +- .../src/journal/schema.rs | 6 +- crates/storage-query-datafusion/src/lib.rs | 2 +- .../storage-query-datafusion/src/options.rs | 2 +- .../storage-query-datafusion/src/state/row.rs | 4 +- .../src/state/schema.rs | 4 +- .../mod.rs | 0 .../row.rs | 20 +- .../schema.rs | 6 +- .../table.rs | 26 +-- .../src/service_status_table/mod.rs | 72 +++--- .../storage-rocksdb/tests/integration_test.rs | 4 +- .../mod.rs | 18 +- .../state_machine/command_interpreter/mod.rs | 18 +- .../command_interpreter/tests.rs | 15 +- .../state_machine/effect_interpreter.rs | 8 +- crates/worker/src/partition/storage/mod.rs | 19 +- 32 files changed, 318 insertions(+), 288 deletions(-) rename crates/storage-query-datafusion/src/{service_status => virtual_object_status}/mod.rs (100%) rename crates/storage-query-datafusion/src/{service_status => virtual_object_status}/row.rs (55%) rename crates/storage-query-datafusion/src/{service_status => virtual_object_status}/schema.rs (84%) rename crates/storage-query-datafusion/src/{service_status => virtual_object_status}/table.rs (74%) rename crates/storage-rocksdb/tests/{service_status_table_test => virtual_object_status_table_test}/mod.rs (62%) diff --git a/cli/src/clients/datafusion_helpers.rs b/cli/src/clients/datafusion_helpers.rs index ed8c07403..55d9adf96 100644 --- a/cli/src/clients/datafusion_helpers.rs +++ b/cli/src/clients/datafusion_helpers.rs @@ -157,9 +157,9 @@ impl Display for InvocationState { #[derive(Debug, Clone)] pub struct OutgoingInvoke { pub invocation_id: Option, - pub invoked_service: Option, - pub invoked_method: Option, - pub invoked_service_key: Option, + pub invoked_component: Option, + pub invoked_handler: Option, + pub invoked_component_key: Option, } #[derive(Debug, Clone)] @@ -243,13 +243,13 @@ pub struct InvocationDetailed { #[derive(Debug, Clone, Default)] pub struct Invocation { pub id: String, - pub service: String, - pub method: String, + pub component: String, + pub handler: String, pub key: Option, // Set only on keyed service pub created_at: chrono::DateTime, // None if invoked directly (e.g. ingress) pub invoked_by_id: Option, - pub invoked_by_service: Option, + pub invoked_by_component: Option, pub status: InvocationState, pub trace_id: Option, @@ -300,32 +300,32 @@ pub struct ComponentHandlerUsage { pub struct ComponentStatusMap(HashMap); impl ComponentStatusMap { - fn set_method_stats( + fn set_handler_stats( &mut self, - service: &str, - method: &str, + component: &str, + handler: &str, state: InvocationState, stats: HandlerStateStats, ) { - let svc_methods = self + let comp_handlers = self .0 - .entry(service.to_owned()) + .entry(component.to_owned()) .or_insert_with(|| ComponentStatus { handlers: HashMap::new(), }); - let method_info = svc_methods + let handler_info = comp_handlers .handlers - .entry(method.to_owned()) + .entry(handler.to_owned()) .or_insert_with(|| HandlerInfo { per_state_totals: HashMap::new(), }); - method_info.per_state_totals.insert(state, stats); + handler_info.per_state_totals.insert(state, stats); } - pub fn get_service_status(&self, service: &str) -> Option<&ComponentStatus> { - self.0.get(service) + pub fn get_component_status(&self, component: &str) -> Option<&ComponentStatus> { + self.0.get(component) } } @@ -335,7 +335,7 @@ pub struct ComponentStatus { } impl ComponentStatus { - pub fn get_method_stats( + pub fn get_handler_stats( &self, state: InvocationState, method: &str, @@ -343,8 +343,8 @@ impl ComponentStatus { self.handlers.get(method).and_then(|x| x.get_stats(state)) } - pub fn get_method(&self, method: &str) -> Option<&HandlerInfo> { - self.handlers.get(method) + pub fn get_handler(&self, handler: &str) -> Option<&HandlerInfo> { + self.handlers.get(handler) } } @@ -390,12 +390,12 @@ pub async fn count_deployment_active_inv_by_method( let query = format!( "SELECT - service, - method, + component, + handler, COUNT(id) AS inv_count FROM sys_invocation_status WHERE pinned_deployment_id = '{}' - GROUP BY pinned_deployment_id, service, method", + GROUP BY pinned_deployment_id, component, handler", deployment_id ); @@ -413,13 +413,13 @@ pub async fn count_deployment_active_inv_by_method( pub async fn get_components_status( client: &DataFusionHttpClient, - services_filter: impl IntoIterator>, + components_filter: impl IntoIterator>, ) -> Result { let mut status_map = ComponentStatusMap::default(); let query_filter = format!( "({})", - services_filter + components_filter .into_iter() .map(|x| format!("'{}'", x.as_ref())) .collect::>() @@ -429,20 +429,20 @@ pub async fn get_components_status( { let query = format!( "SELECT - service, - method, + component, + handler, COUNT(id), MIN(created_at), FIRST_VALUE(id ORDER BY created_at ASC) - FROM sys_inbox WHERE service IN {} - GROUP BY service, method", + FROM sys_inbox WHERE component IN {} + GROUP BY component, handler", query_filter ); let resp = client.run_query(query).await?; for batch in resp.batches { for i in 0..batch.num_rows() { - let service = batch.column(0).as_string::().value_string(i); - let method = batch.column(1).as_string::().value_string(i); + let component = batch.column(0).as_string::().value_string(i); + let handler = batch.column(1).as_string::().value_string(i); let num_invocations = batch .column(2) .as_primitive::() @@ -460,7 +460,7 @@ pub async fn get_components_status( oldest_at, oldest_invocation, }; - status_map.set_method_stats(&service, &method, InvocationState::Pending, stats); + status_map.set_handler_stats(&component, &handler, InvocationState::Pending, stats); } } } @@ -470,8 +470,8 @@ pub async fn get_components_status( let query = format!( "WITH enriched_invokes AS (SELECT - ss.service, - ss.method, + ss.component, + ss.handler, CASE WHEN ss.status = 'suspended' THEN 'suspended' WHEN sis.in_flight THEN 'running' @@ -482,17 +482,17 @@ pub async fn get_components_status( ss.created_at FROM sys_invocation_status ss LEFT JOIN sys_invocation_state sis ON ss.id = sis.id - WHERE ss.service IN {} + WHERE ss.component IN {} ) - SELECT service, method, combined_status, COUNT(id), MIN(created_at), FIRST_VALUE(id ORDER BY created_at ASC) - FROM enriched_invokes GROUP BY service, method, combined_status ORDER BY method", + SELECT component, handler, combined_status, COUNT(id), MIN(created_at), FIRST_VALUE(id ORDER BY created_at ASC) + FROM enriched_invokes GROUP BY component, handler, combined_status ORDER BY method", query_filter ); let resp = client.run_query(query).await?; for batch in resp.batches { for i in 0..batch.num_rows() { - let service = value_as_string(&batch, 0, i); - let method = value_as_string(&batch, 1, i); + let component = value_as_string(&batch, 0, i); + let handler = value_as_string(&batch, 1, i); let status = value_as_string(&batch, 2, i); let stats = HandlerStateStats { @@ -501,7 +501,7 @@ pub async fn get_components_status( oldest_invocation: value_as_string(&batch, 5, i), }; - status_map.set_method_stats(&service, &method, status.parse().unwrap(), stats); + status_map.set_handler_stats(&component, &handler, status.parse().unwrap(), stats); } } } @@ -509,9 +509,9 @@ pub async fn get_components_status( Ok(status_map) } -// Service -> Locked Keys +// Component -> Locked Keys #[derive(Default)] -pub struct ComponentMethodLockedKeysMap { +pub struct ComponentHandlerLockedKeysMap { components: HashMap>, } @@ -537,14 +537,14 @@ pub struct LockedKeyInfo { pub last_attempt_deployment_id: Option, } -impl ComponentMethodLockedKeysMap { - fn insert(&mut self, service: &str, key: String, info: LockedKeyInfo) { - let locked_keys = self.components.entry(service.to_owned()).or_default(); +impl ComponentHandlerLockedKeysMap { + fn insert(&mut self, component: &str, key: String, info: LockedKeyInfo) { + let locked_keys = self.components.entry(component.to_owned()).or_default(); locked_keys.insert(key.to_owned(), info); } - fn locked_key_info_mut(&mut self, service: &str, key: &str) -> &mut LockedKeyInfo { - let locked_keys = self.components.entry(service.to_owned()).or_default(); + fn locked_key_info_mut(&mut self, component: &str, key: &str) -> &mut LockedKeyInfo { + let locked_keys = self.components.entry(component.to_owned()).or_default(); locked_keys.entry(key.to_owned()).or_default() } @@ -559,37 +559,37 @@ impl ComponentMethodLockedKeysMap { pub async fn get_locked_keys_status( client: &DataFusionHttpClient, - services_filter: impl IntoIterator>, -) -> Result { - let mut key_map = ComponentMethodLockedKeysMap::default(); - let quoted_service_names = services_filter + components_filter: impl IntoIterator>, +) -> Result { + let mut key_map = ComponentHandlerLockedKeysMap::default(); + let quoted_component_names = components_filter .into_iter() .map(|x| format!("'{}'", x.as_ref())) .collect::>(); - if quoted_service_names.is_empty() { + if quoted_component_names.is_empty() { return Ok(key_map); } - let query_filter = format!("({})", quoted_service_names.join(",")); + let query_filter = format!("({})", quoted_component_names.join(",")); // Inbox analysis (pending invocations).... { let query = format!( "SELECT - service, - service_key, + component, + component_key, COUNT(id), MIN(created_at) FROM sys_inbox - WHERE service IN {} - GROUP BY service, service_key + WHERE component IN {} + GROUP BY component, component_key ORDER BY COUNT(id) DESC", query_filter ); let resp = client.run_query(query).await?; for batch in resp.batches { for i in 0..batch.num_rows() { - let service = batch.column(0).as_string::().value(i); + let component = batch.column(0).as_string::().value(i); let key = value_as_string(&batch, 1, i); let num_pending = value_as_i64(&batch, 2, i); let oldest_pending = value_as_dt_opt(&batch, 3, i); @@ -599,7 +599,7 @@ pub async fn get_locked_keys_status( oldest_pending, ..LockedKeyInfo::default() }; - key_map.insert(service, key, info); + key_map.insert(component, key, info); } } } @@ -609,9 +609,9 @@ pub async fn get_locked_keys_status( let query = format!( "WITH enriched_invokes AS (SELECT - ss.service, - ss.method, - ss.service_key, + ss.component, + ss.handler, + ss.component_key, CASE WHEN ss.status = 'suspended' THEN 'suspended' WHEN sis.in_flight THEN 'running' @@ -632,11 +632,11 @@ pub async fn get_locked_keys_status( WHERE ss.service IN {} ) SELECT - service, - service_key, + component, + component_key, combined_status, first_value(id), - first_value(method), + first_value(handler), first_value(created_at), first_value(modified_at), first_value(pinned_deployment_id), @@ -645,14 +645,14 @@ pub async fn get_locked_keys_status( first_value(next_retry_at), first_value(last_start_at), sum(retry_count) - FROM enriched_invokes GROUP BY service, service_key, combined_status", + FROM enriched_invokes GROUP BY component, component_key, combined_status", query_filter ); let resp = client.run_query(query).await?; for batch in resp.batches { for i in 0..batch.num_rows() { - let service = value_as_string(&batch, 0, i); + let component = value_as_string(&batch, 0, i); let key = value_as_string(&batch, 1, i); let status = batch .column(2) @@ -661,7 +661,7 @@ pub async fn get_locked_keys_status( .parse() .expect("Unexpected status"); let id = value_as_string_opt(&batch, 3, i); - let method = value_as_string_opt(&batch, 4, i); + let handler = value_as_string_opt(&batch, 4, i); let created_at = value_as_dt_opt(&batch, 5, i); let modified_at = value_as_dt_opt(&batch, 6, i); let pinned_deployment_id = value_as_string_opt(&batch, 7, i); @@ -671,11 +671,11 @@ pub async fn get_locked_keys_status( let last_start = value_as_dt_opt(&batch, 11, i); let num_retries = value_as_u64_opt(&batch, 12, i); - let info = key_map.locked_key_info_mut(&service, &key); + let info = key_map.locked_key_info_mut(&component, &key); info.invocation_status = Some(status); info.invocation_holding_lock = id; - info.invocation_method_holding_lock = method; + info.invocation_method_holding_lock = handler; info.invocation_created_at = created_at; // Running duration @@ -714,9 +714,9 @@ pub async fn find_active_invocations( "WITH enriched_invocations AS (SELECT ss.id, - ss.service, - ss.method, - ss.service_key, + ss.component, + ss.handler, + ss.component_key, CASE WHEN ss.status = 'suspended' THEN 'suspended' WHEN sis.in_flight THEN 'running' @@ -732,14 +732,14 @@ pub async fn find_active_invocations( sis.next_retry_at, sis.last_start_at, ss.invoked_by_id, - ss.invoked_by_service, + ss.invoked_by_component, comp.ty, comp.deployment_id as comp_latest_deployment, dp.id as known_deployment_id, ss.trace_id FROM sys_invocation_status ss LEFT JOIN sys_invocation_state sis ON ss.id = sis.id - LEFT JOIN sys_component comp ON comp.name = ss.service + LEFT JOIN sys_component comp ON comp.name = ss.component LEFT JOIN sys_deployment dp ON dp.id = ss.pinned_deployment_id {} {} @@ -756,9 +756,9 @@ pub async fn find_active_invocations( full_count = value_as_i64(&batch, batch.num_columns() - 1, i) as usize; } let id = value_as_string(&batch, 0, i); - let service = value_as_string(&batch, 1, i); - let method = value_as_string(&batch, 2, i); - let service_key = value_as_string_opt(&batch, 3, i); + let component = value_as_string(&batch, 1, i); + let handler = value_as_string(&batch, 2, i); + let component_key = value_as_string_opt(&batch, 3, i); let status: InvocationState = value_as_string(&batch, 4, i) .parse() .expect("Unexpected status"); @@ -777,7 +777,7 @@ pub async fn find_active_invocations( let last_start = value_as_dt_opt(&batch, 12, i); let invoked_by_id = value_as_string_opt(&batch, 13, i); - let invoked_by_service = value_as_string_opt(&batch, 14, i); + let invoked_by_component = value_as_string_opt(&batch, 14, i); let component_type = parse_component_type(&value_as_string(&batch, 15, i)); let deployment_id_at_latest_svc_revision = value_as_string(&batch, 16, i); @@ -785,7 +785,7 @@ pub async fn find_active_invocations( let trace_id = value_as_string_opt(&batch, 18, i); let key = if component_type == ComponentType::VirtualObject { - service_key + component_key } else { None }; @@ -793,12 +793,12 @@ pub async fn find_active_invocations( let mut invocation = Invocation { id, status, - service, + component, key, - method, + handler, created_at, invoked_by_id, - invoked_by_service, + invoked_by_component, state_modified_at, num_retries, next_retry_at, @@ -840,17 +840,17 @@ pub async fn find_inbox_invocations( let query = format!( "WITH inbox_table AS (SELECT - ss.service, - ss.method, + ss.component, + ss.handler, ss.id, ss.created_at, ss.invoked_by_id, - ss.invoked_by_service, - ss.service_key, + ss.invoked_by_component, + ss.component_key, comp.ty, ss.trace_id FROM sys_inbox ss - LEFT JOIN sys_component comp ON comp.name = ss.service + LEFT JOIN sys_component comp ON comp.name = ss.component {} {} ) @@ -873,13 +873,13 @@ pub async fn find_inbox_invocations( let invocation = Invocation { status: InvocationState::Pending, - service: value_as_string(&batch, 0, i), - method: value_as_string(&batch, 1, i), + component: value_as_string(&batch, 0, i), + handler: value_as_string(&batch, 1, i), id: value_as_string(&batch, 2, i), created_at: value_as_dt_opt(&batch, 3, i).expect("Missing created_at"), key, invoked_by_id: value_as_string_opt(&batch, 4, i), - invoked_by_service: value_as_string_opt(&batch, 5, i), + invoked_by_component: value_as_string_opt(&batch, 5, i), trace_id: value_as_string_opt(&batch, 8, i), ..Default::default() }; @@ -892,14 +892,14 @@ pub async fn find_inbox_invocations( pub async fn get_component_invocations( client: &DataFusionHttpClient, - service: &str, + component: &str, limit_inbox: usize, limit_active: usize, ) -> Result<(Vec, Vec)> { // Inbox... let inbox: Vec = find_inbox_invocations( client, - &format!("WHERE ss.service = '{}'", service), + &format!("WHERE ss.component = '{}'", component), "ORDER BY ss.created_at DESC", limit_inbox, ) @@ -909,7 +909,7 @@ pub async fn get_component_invocations( // Active invocations analysis let active: Vec = find_active_invocations( client, - &format!("WHERE ss.service = '{}'", service), + &format!("WHERE ss.component = '{}'", component), "", "ORDER BY ss.created_at DESC", limit_active, @@ -968,9 +968,9 @@ pub async fn get_invocation_journal( sj.entry_type, sj.completed, sj.invoked_id, - sj.invoked_service, - sj.invoked_method, - sj.invoked_service_key, + sj.invoked_component, + sj.invoked_handler, + sj.invoked_component_key, sj.sleep_wakeup_at FROM sys_journal sj WHERE @@ -993,24 +993,24 @@ pub async fn get_invocation_journal( let entry_type = value_as_string(&batch, 1, i); let completed = batch.column(2).as_boolean().value(i); let outgoing_invocation_id = value_as_string_opt(&batch, 3, i); - let invoked_service = value_as_string_opt(&batch, 4, i); - let invoked_method = value_as_string_opt(&batch, 5, i); - let invoked_service_key = value_as_string_opt(&batch, 6, i); + let invoked_component = value_as_string_opt(&batch, 4, i); + let invoked_handler = value_as_string_opt(&batch, 5, i); + let invoked_component_key = value_as_string_opt(&batch, 6, i); let wakeup_at = value_as_dt_opt(&batch, 7, i); let entry_type = match entry_type.as_str() { "Sleep" => JournalEntryType::Sleep { wakeup_at }, "Invoke" => JournalEntryType::Invoke(OutgoingInvoke { invocation_id: outgoing_invocation_id, - invoked_service, - invoked_method, - invoked_service_key, + invoked_component, + invoked_handler, + invoked_component_key, }), "BackgroundInvoke" => JournalEntryType::BackgroundInvoke(OutgoingInvoke { invocation_id: outgoing_invocation_id, - invoked_service, - invoked_method, - invoked_service_key, + invoked_component, + invoked_handler, + invoked_component_key, }), "Awakeable" => JournalEntryType::Awakeable(AwakeableIdentifier::new( my_invocation_id.clone(), diff --git a/cli/src/commands/components/status/mod.rs b/cli/src/commands/components/status/mod.rs index b0fd89cb2..eb81f96e4 100644 --- a/cli/src/commands/components/status/mod.rs +++ b/cli/src/commands/components/status/mod.rs @@ -14,7 +14,7 @@ mod detailed_status; use crate::c_println; use crate::cli_env::CliEnv; use crate::clients::datafusion_helpers::{ - ComponentMethodLockedKeysMap, ComponentStatus, ComponentStatusMap, InvocationState, + ComponentHandlerLockedKeysMap, ComponentStatus, ComponentStatusMap, InvocationState, }; use crate::clients::MetasClient; use crate::ui::component_methods::icon_for_component_type; @@ -40,7 +40,7 @@ pub struct Status { sample_invocations_limit: usize, /// Service name, prints all services if omitted - service: Option, + component: Option, #[clap(flatten)] watch: Watch, @@ -54,7 +54,7 @@ async fn status(env: &CliEnv, opts: &Status) -> Result<()> { let metas_client = MetasClient::new(env)?; let sql_client = crate::clients::DataFusionHttpClient::new(env)?; - if let Some(svc) = &opts.service { + if let Some(svc) = &opts.component { detailed_status::run_detailed_status(env, svc, opts, metas_client, sql_client).await } else { agg_status::run_aggregated_status(env, opts, metas_client, sql_client).await @@ -78,7 +78,7 @@ async fn render_components_status( "OLDEST NON-SUSPENDED INVOCATION", ]); for svc in services { - let svc_status = status_map.get_service_status(&svc.name).unwrap_or(&empty); + let svc_status = status_map.get_component_status(&svc.name).unwrap_or(&empty); // Service title let flavor = icon_for_component_type(&svc.ty); let svc_title = format!("{} {}", svc.name, flavor); @@ -93,14 +93,14 @@ async fn render_components_status( Ok(()) } -fn render_method_state_stats( +fn render_handler_state_stats( svc_status: &ComponentStatus, method: &str, state: InvocationState, ) -> Cell { use comfy_table::Color; // Pending - if let Some(state_stats) = svc_status.get_method_stats(state, method) { + if let Some(state_stats) = svc_status.get_handler_stats(state, method) { let cell = Cell::new(state_stats.num_invocations); let color = match state { InvocationState::Unknown => Color::Magenta, @@ -121,46 +121,46 @@ async fn render_methods_status( svc: ComponentMetadata, svc_status: &ComponentStatus, ) -> Result<()> { - for method in svc.handlers { + for handler in svc.handlers { let mut row = vec![]; - row.push(Cell::new(format!(" {}", &method.name))); + row.push(Cell::new(format!(" {}", &handler.name))); // Pending - row.push(render_method_state_stats( + row.push(render_handler_state_stats( svc_status, - &method.name, + &handler.name, InvocationState::Pending, )); // Ready - row.push(render_method_state_stats( + row.push(render_handler_state_stats( svc_status, - &method.name, + &handler.name, InvocationState::Ready, )); // Running - row.push(render_method_state_stats( + row.push(render_handler_state_stats( svc_status, - &method.name, + &handler.name, InvocationState::Running, )); // Backing-off - row.push(render_method_state_stats( + row.push(render_handler_state_stats( svc_status, - &method.name, + &handler.name, InvocationState::BackingOff, )); - row.push(render_method_state_stats( + row.push(render_handler_state_stats( svc_status, - &method.name, + &handler.name, InvocationState::Suspended, )); - let oldest_cell = if let Some(current_method) = svc_status.get_method(&method.name) { + let oldest_cell = if let Some(current_handler) = svc_status.get_handler(&handler.name) { if let Some((oldest_state, oldest_stats)) = - current_method.oldest_non_suspended_invocation_state() + current_handler.oldest_non_suspended_invocation_state() { let dur = chrono::Local::now().signed_duration_since(oldest_stats.oldest_at); let style = if dur.num_seconds() < 60 { @@ -194,7 +194,7 @@ async fn render_methods_status( } async fn render_locked_keys( env: &CliEnv, - locked_keys: ComponentMethodLockedKeysMap, + locked_keys: ComponentHandlerLockedKeysMap, limit_per_service: usize, ) -> Result<()> { let locked_keys = locked_keys.into_inner(); @@ -203,7 +203,7 @@ async fn render_locked_keys( } let mut table = Table::new_styled(&env.ui_config); - table.set_styled_header(vec!["", "QUEUE", "LOCKED BY", "METHOD", "NOTES"]); + table.set_styled_header(vec!["", "QUEUE", "LOCKED BY", "HANDLER", "NOTES"]); for (svc_name, locked_keys) in locked_keys { let mut keys: Vec<_> = locked_keys.into_iter().collect(); keys.sort_by(|(_, a), (_, b)| b.num_pending.cmp(&a.num_pending)); diff --git a/cli/src/commands/invocations/describe.rs b/cli/src/commands/invocations/describe.rs index de1c251a9..f69fec061 100644 --- a/cli/src/commands/invocations/describe.rs +++ b/cli/src/commands/invocations/describe.rs @@ -62,12 +62,17 @@ async fn describe(env: &CliEnv, opts: &Describe) -> Result<()> { if let Some(key) = &inv.key { table.add_kv_row( "Component:", - format!("{} {} {}", inv.service, style("@").dim(), style(key).dim(),), + format!( + "{} {} {}", + inv.component, + style("@").dim(), + style(key).dim(), + ), ); } else { - table.add_kv_row("Component:", &inv.service); + table.add_kv_row("Component:", &inv.component); } - table.add_kv_row("Method:", &inv.method); + table.add_kv_row("Method:", &inv.handler); add_invocation_to_kv_table(&mut table, &inv); table.add_kv_row_if( || inv.state_modified_at.is_some(), @@ -97,7 +102,7 @@ async fn describe(env: &CliEnv, opts: &Describe) -> Result<()> { if let Some(parent) = &inv.invoked_by_id { c_println!( "{} {}", - inv.invoked_by_service + inv.invoked_by_component .as_ref() .map(|x| style(x.to_owned()).italic().blue()) .unwrap_or_else(|| style("".to_owned()).red()), diff --git a/cli/src/commands/invocations/list.rs b/cli/src/commands/invocations/list.rs index 99e75359c..9af3525cd 100644 --- a/cli/src/commands/invocations/list.rs +++ b/cli/src/commands/invocations/list.rs @@ -33,9 +33,9 @@ pub struct List { /// Component to list invocations for #[clap(long, visible_alias = "component", value_delimiter = ',')] component: Vec, - /// Filter by invocation on this method name + /// Filter by invocation on this handler name #[clap(long, value_delimiter = ',')] - method: Vec, + handler: Vec, /// Filter by status(es) #[clap(long, ignore_case = true, value_delimiter = ',')] status: Vec, @@ -44,7 +44,7 @@ pub struct List { deployment: Vec, /// Only list invocations on keyed components only #[clap(long)] - keyed_only: bool, + virtual_objects_only: bool, /// Filter by invocations on this component key #[clap(long, value_delimiter = ',')] key: Vec, @@ -96,7 +96,7 @@ async fn list(env: &CliEnv, opts: &List) -> Result<()> { if !opts.component.is_empty() { inbox_filters.push(format!( - "ss.service IN ({})", + "ss.component IN ({})", opts.component .iter() .map(|x| format!("'{}'", x)) @@ -104,21 +104,21 @@ async fn list(env: &CliEnv, opts: &List) -> Result<()> { )); } - if !opts.method.is_empty() { + if !opts.handler.is_empty() { inbox_filters.push(format!( - "ss.method IN ({})", - opts.method.iter().map(|x| format!("'{}'", x)).format(",") + "ss.handler IN ({})", + opts.handler.iter().map(|x| format!("'{}'", x)).format(",") )); } if !opts.key.is_empty() { inbox_filters.push(format!( - "ss.service_key IN ({})", + "ss.component_key IN ({})", opts.key.iter().map(|x| format!("'{}'", x)).format(",") )); } - if opts.keyed_only { + if opts.virtual_objects_only { inbox_filters.push("comp.ty = 'virtual_object'".to_owned()); } diff --git a/cli/src/commands/state/util.rs b/cli/src/commands/state/util.rs index bf4a1804b..6a704f059 100644 --- a/cli/src/commands/state/util.rs +++ b/cli/src/commands/state/util.rs @@ -34,7 +34,7 @@ pub(crate) async fn get_current_state( // let sql_client = crate::clients::DataFusionHttpClient::new(env)?; let sql = format!( - "select key, value from state where service = '{}' and service_key = '{}' ;", + "select key, value from state where component = '{}' and component_key = '{}' ;", service, key ); let res = sql_client.run_query(sql).await?; diff --git a/cli/src/ui/invocations.rs b/cli/src/ui/invocations.rs index 66ac9a828..4648bf876 100644 --- a/cli/src/ui/invocations.rs +++ b/cli/src/ui/invocations.rs @@ -98,14 +98,14 @@ pub fn invocation_qualified_name(invocation: &Invocation) -> String { let svc = if let Some(key) = &invocation.key { format!( "[{} {} {}]", - invocation.service, + invocation.component, style("@").dim(), style(key).dim(), ) } else { - invocation.service.to_string() + invocation.component.to_string() }; - format!("{}{}{}", svc, style("::").dim(), invocation.method) + format!("{}{}{}", svc, style("::").dim(), invocation.handler) } // ❯ [2023-12-14 15:38:52.500 +00:00] rIEqK14GCdkAYxo-wzTfrK2e6tJssIrtQ CheckoutProcess::checkout @@ -146,7 +146,7 @@ pub fn add_invocation_to_kv_table(table: &mut Table, invocation: &Invocation) { let invoked_by_msg = format!( "{} {}", invocation - .invoked_by_service + .invoked_by_component .as_ref() .map(|x| style(x.to_owned()).italic().blue()) .unwrap_or_else(|| style("".to_owned()).red()), @@ -256,9 +256,9 @@ pub fn format_entry_type_details(entry_type: &JournalEntryType) -> String { JournalEntryType::Invoke(inv) | JournalEntryType::BackgroundInvoke(inv) => { format!( "{}{}{} {}", - inv.invoked_service.as_ref().unwrap(), + inv.invoked_component.as_ref().unwrap(), style("::").dim(), - inv.invoked_method.as_ref().unwrap(), + inv.invoked_handler.as_ref().unwrap(), inv.invocation_id.as_deref().unwrap_or(""), ) } diff --git a/crates/storage-api/src/lib.rs b/crates/storage-api/src/lib.rs index 2f140b8f2..0bf5aaca3 100644 --- a/crates/storage-api/src/lib.rs +++ b/crates/storage-api/src/lib.rs @@ -46,7 +46,7 @@ pub trait Storage { pub trait Transaction: state_table::StateTable + invocation_status_table::InvocationStatusTable - + service_status_table::ServiceStatusTable + + service_status_table::VirtualObjectStatusTable + inbox_table::InboxTable + outbox_table::OutboxTable + deduplication_table::DeduplicationTable diff --git a/crates/storage-api/src/service_status_table/mod.rs b/crates/storage-api/src/service_status_table/mod.rs index a021975a3..c9c156f42 100644 --- a/crates/storage-api/src/service_status_table/mod.rs +++ b/crates/storage-api/src/service_status_table/mod.rs @@ -13,25 +13,28 @@ use restate_types::identifiers::{InvocationId, ServiceId}; use std::future::Future; #[derive(Debug, Default, Clone, PartialEq)] -pub enum ServiceStatus { +pub enum VirtualObjectStatus { Locked(InvocationId), #[default] Unlocked, } -pub trait ReadOnlyServiceStatusTable { - fn get_service_status( +pub trait ReadOnlyVirtualObjectStatusTable { + fn get_virtual_object_status( &mut self, service_id: &ServiceId, - ) -> impl Future> + Send; + ) -> impl Future> + Send; } -pub trait ServiceStatusTable: ReadOnlyServiceStatusTable { - fn put_service_status( +pub trait VirtualObjectStatusTable: ReadOnlyVirtualObjectStatusTable { + fn put_virtual_object_status( &mut self, service_id: &ServiceId, - status: ServiceStatus, + status: VirtualObjectStatus, ) -> impl Future + Send; - fn delete_service_status(&mut self, service_id: &ServiceId) -> impl Future + Send; + fn delete_virtual_object_status( + &mut self, + service_id: &ServiceId, + ) -> impl Future + Send; } diff --git a/crates/storage-proto/src/lib.rs b/crates/storage-proto/src/lib.rs index 7272cd3fb..cbe1d6462 100644 --- a/crates/storage-proto/src/lib.rs +++ b/crates/storage-proto/src/lib.rs @@ -102,10 +102,12 @@ pub mod storage { } } - impl From for ServiceStatus { - fn from(value: restate_storage_api::service_status_table::ServiceStatus) -> Self { + impl From for ServiceStatus { + fn from( + value: restate_storage_api::service_status_table::VirtualObjectStatus, + ) -> Self { match value { - restate_storage_api::service_status_table::ServiceStatus::Locked( + restate_storage_api::service_status_table::VirtualObjectStatus::Locked( invocation_id, ) => ServiceStatus { status: Some(service_status::Status::Locked(service_status::Locked { @@ -116,7 +118,7 @@ pub mod storage { .into(), })), }, - restate_storage_api::service_status_table::ServiceStatus::Unlocked => { + restate_storage_api::service_status_table::VirtualObjectStatus::Unlocked => { unreachable!("Nothing should be stored for unlocked") } } diff --git a/crates/storage-query-datafusion/src/inbox/row.rs b/crates/storage-query-datafusion/src/inbox/row.rs index 658a53707..2d060e4a8 100644 --- a/crates/storage-query-datafusion/src/inbox/row.rs +++ b/crates/storage-query-datafusion/src/inbox/row.rs @@ -36,10 +36,10 @@ pub(crate) fn append_inbox_row( let mut row = builder.row(); row.partition_key(fid.partition_key()); - row.service(&fid.service_id.service_name); - row.method(&method_name); + row.component(&fid.service_id.service_name); + row.handler(&method_name); - row.service_key( + row.component_key( std::str::from_utf8(&fid.service_id.key).expect("The key must be a string!"), ); @@ -51,8 +51,8 @@ pub(crate) fn append_inbox_row( match caller { Source::Service(caller) => { - row.invoked_by("service"); - row.invoked_by_service(&caller.service_id.service_name); + row.invoked_by("component"); + row.invoked_by_component(&caller.service_id.service_name); if row.is_invoked_by_id_defined() { row.invoked_by_id(format_using(output, &caller)); } diff --git a/crates/storage-query-datafusion/src/inbox/schema.rs b/crates/storage-query-datafusion/src/inbox/schema.rs index 43c36ee66..d87f68537 100644 --- a/crates/storage-query-datafusion/src/inbox/schema.rs +++ b/crates/storage-query-datafusion/src/inbox/schema.rs @@ -17,17 +17,17 @@ use datafusion::arrow::datatypes::DataType; define_table!(inbox( partition_key: DataType::UInt64, - service: DataType::LargeUtf8, - method: DataType::LargeUtf8, + component: DataType::LargeUtf8, + handler: DataType::LargeUtf8, - service_key: DataType::LargeUtf8, + component_key: DataType::LargeUtf8, id: DataType::LargeUtf8, sequence_number: DataType::UInt64, invoked_by: DataType::LargeUtf8, - invoked_by_service: DataType::LargeUtf8, + invoked_by_component: DataType::LargeUtf8, invoked_by_id: DataType::LargeUtf8, trace_id: DataType::LargeUtf8, diff --git a/crates/storage-query-datafusion/src/invocation_state/row.rs b/crates/storage-query-datafusion/src/invocation_state/row.rs index 901f1fe86..f0234048c 100644 --- a/crates/storage-query-datafusion/src/invocation_state/row.rs +++ b/crates/storage-query-datafusion/src/invocation_state/row.rs @@ -25,8 +25,8 @@ pub(crate) fn append_state_row( let invocation_id = status_row.full_invocation_id(); row.partition_key(invocation_id.service_id.partition_key()); - row.service(&invocation_id.service_id.service_name); - row.service_key( + row.component(&invocation_id.service_id.service_name); + row.component_key( std::str::from_utf8(&invocation_id.service_id.key).expect("The key must be a string!"), ); if row.is_id_defined() { diff --git a/crates/storage-query-datafusion/src/invocation_state/schema.rs b/crates/storage-query-datafusion/src/invocation_state/schema.rs index 57791e695..009c972a0 100644 --- a/crates/storage-query-datafusion/src/invocation_state/schema.rs +++ b/crates/storage-query-datafusion/src/invocation_state/schema.rs @@ -16,8 +16,8 @@ use datafusion::arrow::datatypes::DataType; define_table!(state( partition_key: DataType::UInt64, - service: DataType::LargeUtf8, - service_key: DataType::LargeUtf8, + component: DataType::LargeUtf8, + component_key: DataType::LargeUtf8, id: DataType::LargeUtf8, in_flight: DataType::Boolean, retry_count: DataType::UInt64, diff --git a/crates/storage-query-datafusion/src/invocation_status/row.rs b/crates/storage-query-datafusion/src/invocation_status/row.rs index 8bf55effe..3983253c1 100644 --- a/crates/storage-query-datafusion/src/invocation_status/row.rs +++ b/crates/storage-query-datafusion/src/invocation_status/row.rs @@ -27,8 +27,8 @@ pub(crate) fn append_invocation_status_row( row.partition_key(status_row.partition_key); if let Some(service_id) = status_row.invocation_status.service_id() { - row.service(&service_id.service_name); - row.service_key(std::str::from_utf8(&service_id.key).expect("The key must be a string!")); + row.component(&service_id.service_name); + row.component_key(std::str::from_utf8(&service_id.key).expect("The key must be a string!")); } // Invocation id @@ -80,14 +80,14 @@ fn fill_invocation_metadata( meta: InvocationMetadata, ) { // journal_metadata and stats are filled by other functions - row.method(meta.method); + row.handler(meta.method); if let Some(deployment_id) = meta.deployment_id { row.pinned_deployment_id(deployment_id.to_string()); } match meta.source { Source::Service(caller) => { - row.invoked_by("service"); - row.invoked_by_service(&caller.service_id.service_name); + row.invoked_by("component"); + row.invoked_by_component(&caller.service_id.service_name); if row.is_invoked_by_id_defined() { row.invoked_by_id(format_using(output, &caller)); } diff --git a/crates/storage-query-datafusion/src/invocation_status/schema.rs b/crates/storage-query-datafusion/src/invocation_status/schema.rs index 612297117..e7e4f05ee 100644 --- a/crates/storage-query-datafusion/src/invocation_status/schema.rs +++ b/crates/storage-query-datafusion/src/invocation_status/schema.rs @@ -16,13 +16,13 @@ use datafusion::arrow::datatypes::DataType; define_table!(invocation_status( partition_key: DataType::UInt64, - service: DataType::LargeUtf8, - method: DataType::LargeUtf8, - service_key: DataType::LargeUtf8, + component: DataType::LargeUtf8, + handler: DataType::LargeUtf8, + component_key: DataType::LargeUtf8, status: DataType::LargeUtf8, id: DataType::LargeUtf8, invoked_by: DataType::LargeUtf8, - invoked_by_service: DataType::LargeUtf8, + invoked_by_component: DataType::LargeUtf8, invoked_by_id: DataType::LargeUtf8, pinned_deployment_id: DataType::LargeUtf8, trace_id: DataType::LargeUtf8, diff --git a/crates/storage-query-datafusion/src/journal/row.rs b/crates/storage-query-datafusion/src/journal/row.rs index efb820c42..14702ab91 100644 --- a/crates/storage-query-datafusion/src/journal/row.rs +++ b/crates/storage-query-datafusion/src/journal/row.rs @@ -53,12 +53,12 @@ pub(crate) fn append_journal_row( | EnrichedEntryHeader::BackgroundInvoke { enrichment_result, .. } => { - row.invoked_service_key( + row.invoked_component_key( std::str::from_utf8(&enrichment_result.service_key) .expect("The key must be a string!"), ); - row.invoked_service(&enrichment_result.service_name); + row.invoked_component(&enrichment_result.service_name); if row.is_invoked_id_defined() { let partition_key = ServiceId::new( @@ -73,9 +73,9 @@ pub(crate) fn append_journal_row( )); } - if row.is_invoked_method_defined() { + if row.is_invoked_handler_defined() { if let Some(request) = deserialize_invocation_request(&entry) { - row.invoked_method(&request.method_name); + row.invoked_handler(&request.method_name); } } } diff --git a/crates/storage-query-datafusion/src/journal/schema.rs b/crates/storage-query-datafusion/src/journal/schema.rs index 8681bc9fa..4309f0e8d 100644 --- a/crates/storage-query-datafusion/src/journal/schema.rs +++ b/crates/storage-query-datafusion/src/journal/schema.rs @@ -21,8 +21,8 @@ define_table!(journal( entry_type: DataType::LargeUtf8, completed: DataType::Boolean, invoked_id: DataType::LargeUtf8, - invoked_service: DataType::LargeUtf8, - invoked_method: DataType::LargeUtf8, - invoked_service_key: DataType::LargeUtf8, + invoked_component: DataType::LargeUtf8, + invoked_handler: DataType::LargeUtf8, + invoked_component_key: DataType::LargeUtf8, sleep_wakeup_at: DataType::Date64, )); diff --git a/crates/storage-query-datafusion/src/lib.rs b/crates/storage-query-datafusion/src/lib.rs index 44df4038a..05916408e 100644 --- a/crates/storage-query-datafusion/src/lib.rs +++ b/crates/storage-query-datafusion/src/lib.rs @@ -19,9 +19,9 @@ mod invocation_status; mod journal; mod options; mod physical_optimizer; -mod service_status; mod state; mod table_macro; mod table_util; +mod virtual_object_status; pub use crate::options::{BuildError, Options, OptionsBuilder, OptionsBuilderError}; diff --git a/crates/storage-query-datafusion/src/options.rs b/crates/storage-query-datafusion/src/options.rs index 173f3d528..c1afa1c59 100644 --- a/crates/storage-query-datafusion/src/options.rs +++ b/crates/storage-query-datafusion/src/options.rs @@ -63,7 +63,7 @@ impl Options { let ctx = QueryContext::new(memory_limit, temp_folder, query_parallelism); crate::invocation_status::register_self(&ctx, rocksdb.clone())?; - crate::service_status::register_self(&ctx, rocksdb.clone())?; + crate::virtual_object_status::register_self(&ctx, rocksdb.clone())?; crate::state::register_self(&ctx, rocksdb.clone())?; crate::journal::register_self(&ctx, rocksdb.clone())?; crate::invocation_state::register_self(&ctx, status)?; diff --git a/crates/storage-query-datafusion/src/state/row.rs b/crates/storage-query-datafusion/src/state/row.rs index eff20ba38..5ef8c29c3 100644 --- a/crates/storage-query-datafusion/src/state/row.rs +++ b/crates/storage-query-datafusion/src/state/row.rs @@ -23,8 +23,8 @@ pub(crate) fn append_state_row(builder: &mut StateBuilder, state_row: OwnedState let mut row = builder.row(); row.partition_key(partition_key); - row.service(&service); - row.service_key(std::str::from_utf8(&service_key).expect("The key must be a string!")); + row.component(&service); + row.component_key(std::str::from_utf8(&service_key).expect("The key must be a string!")); if row.is_key_defined() { if let Ok(str) = std::str::from_utf8(&state_key) { row.key(str); diff --git a/crates/storage-query-datafusion/src/state/schema.rs b/crates/storage-query-datafusion/src/state/schema.rs index 4c502fc66..37534762e 100644 --- a/crates/storage-query-datafusion/src/state/schema.rs +++ b/crates/storage-query-datafusion/src/state/schema.rs @@ -16,8 +16,8 @@ use datafusion::arrow::datatypes::DataType; define_table!(state( partition_key: DataType::UInt64, - service: DataType::LargeUtf8, - service_key: DataType::LargeUtf8, + component: DataType::LargeUtf8, + component_key: DataType::LargeUtf8, key: DataType::LargeUtf8, value_utf8: DataType::LargeUtf8, value: DataType::LargeBinary, diff --git a/crates/storage-query-datafusion/src/service_status/mod.rs b/crates/storage-query-datafusion/src/virtual_object_status/mod.rs similarity index 100% rename from crates/storage-query-datafusion/src/service_status/mod.rs rename to crates/storage-query-datafusion/src/virtual_object_status/mod.rs diff --git a/crates/storage-query-datafusion/src/service_status/row.rs b/crates/storage-query-datafusion/src/virtual_object_status/row.rs similarity index 55% rename from crates/storage-query-datafusion/src/service_status/row.rs rename to crates/storage-query-datafusion/src/virtual_object_status/row.rs index 8909c10d5..4c0a42a53 100644 --- a/crates/storage-query-datafusion/src/service_status/row.rs +++ b/crates/storage-query-datafusion/src/virtual_object_status/row.rs @@ -8,28 +8,26 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::service_status::schema::ServiceStatusBuilder; use crate::table_util::format_using; -use restate_storage_api::service_status_table::ServiceStatus; -use restate_storage_rocksdb::service_status_table::OwnedServiceStatusRow; +use crate::virtual_object_status::schema::VirtualObjectStatusBuilder; +use restate_storage_api::service_status_table::VirtualObjectStatus; +use restate_storage_rocksdb::service_status_table::OwnedVirtualObjectStatusRow; #[inline] -pub(crate) fn append_service_status_row( - builder: &mut ServiceStatusBuilder, +pub(crate) fn append_virtual_object_status_row( + builder: &mut VirtualObjectStatusBuilder, output: &mut String, - status_row: OwnedServiceStatusRow, + status_row: OwnedVirtualObjectStatusRow, ) { let mut row = builder.row(); row.partition_key(status_row.partition_key); - row.service(&status_row.service); - row.service_key( - std::str::from_utf8(&status_row.service_key).expect("The key must be a string!"), - ); + row.name(&status_row.name); + row.key(std::str::from_utf8(&status_row.key).expect("The key must be a string!")); // Invocation id if row.is_invocation_id_defined() { - if let ServiceStatus::Locked(invocation_id) = status_row.service_status { + if let VirtualObjectStatus::Locked(invocation_id) = status_row.status { row.invocation_id(format_using(output, &invocation_id)); } } diff --git a/crates/storage-query-datafusion/src/service_status/schema.rs b/crates/storage-query-datafusion/src/virtual_object_status/schema.rs similarity index 84% rename from crates/storage-query-datafusion/src/service_status/schema.rs rename to crates/storage-query-datafusion/src/virtual_object_status/schema.rs index 1c3698f28..d83ebd4dc 100644 --- a/crates/storage-query-datafusion/src/service_status/schema.rs +++ b/crates/storage-query-datafusion/src/virtual_object_status/schema.rs @@ -14,9 +14,9 @@ use crate::table_macro::*; use datafusion::arrow::datatypes::DataType; -define_table!(service_status( +define_table!(virtual_object_status( partition_key: DataType::UInt64, - service: DataType::LargeUtf8, - service_key: DataType::LargeUtf8, + name: DataType::LargeUtf8, + key: DataType::LargeUtf8, invocation_id: DataType::LargeUtf8, )); diff --git a/crates/storage-query-datafusion/src/service_status/table.rs b/crates/storage-query-datafusion/src/virtual_object_status/table.rs similarity index 74% rename from crates/storage-query-datafusion/src/service_status/table.rs rename to crates/storage-query-datafusion/src/virtual_object_status/table.rs index 82d350339..c8ffe8f89 100644 --- a/crates/storage-query-datafusion/src/service_status/table.rs +++ b/crates/storage-query-datafusion/src/virtual_object_status/table.rs @@ -17,12 +17,12 @@ use datafusion::arrow::record_batch::RecordBatch; use crate::context::QueryContext; use crate::generic_table::{GenericTableProvider, RangeScanner}; -use crate::service_status::row::append_service_status_row; -use crate::service_status::schema::ServiceStatusBuilder; +use crate::virtual_object_status::row::append_virtual_object_status_row; +use crate::virtual_object_status::schema::VirtualObjectStatusBuilder; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; -use restate_storage_rocksdb::service_status_table::OwnedServiceStatusRow; +use restate_storage_rocksdb::service_status_table::OwnedVirtualObjectStatusRow; use restate_storage_rocksdb::RocksDBStorage; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; @@ -32,19 +32,19 @@ pub(crate) fn register_self( storage: RocksDBStorage, ) -> datafusion::common::Result<()> { let status_table = GenericTableProvider::new( - ServiceStatusBuilder::schema(), - Arc::new(ServiceStatusScanner(storage)), + VirtualObjectStatusBuilder::schema(), + Arc::new(VirtualObjectStatusScanner(storage)), ); ctx.as_ref() - .register_table("sys_service_status", Arc::new(status_table)) + .register_table("sys_virtual_object_status", Arc::new(status_table)) .map(|_| ()) } #[derive(Debug, Clone)] -struct ServiceStatusScanner(RocksDBStorage); +struct VirtualObjectStatusScanner(RocksDBStorage); -impl RangeScanner for ServiceStatusScanner { +impl RangeScanner for VirtualObjectStatusScanner { fn scan( &self, range: RangeInclusive, @@ -55,7 +55,7 @@ impl RangeScanner for ServiceStatusScanner { let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); let tx = stream_builder.tx(); let background_task = move || { - let rows = db.all_service_status(range); + let rows = db.all_virtual_object_status(range); for_each_status(schema, tx, rows); Ok(()) }; @@ -69,12 +69,12 @@ fn for_each_status<'a, I>( tx: Sender>, rows: I, ) where - I: Iterator + 'a, + I: Iterator + 'a, { - let mut builder = ServiceStatusBuilder::new(schema.clone()); + let mut builder = VirtualObjectStatusBuilder::new(schema.clone()); let mut temp = String::new(); for row in rows { - append_service_status_row(&mut builder, &mut temp, row); + append_virtual_object_status_row(&mut builder, &mut temp, row); if builder.full() { let batch = builder.finish(); if tx.blocking_send(Ok(batch)).is_err() { @@ -83,7 +83,7 @@ fn for_each_status<'a, I>( // we probably don't want to panic, is it will cause the entire process to exit return; } - builder = ServiceStatusBuilder::new(schema.clone()); + builder = VirtualObjectStatusBuilder::new(schema.clone()); } } if !builder.empty() { diff --git a/crates/storage-rocksdb/src/service_status_table/mod.rs b/crates/storage-rocksdb/src/service_status_table/mod.rs index aa382ad75..af44691bf 100644 --- a/crates/storage-rocksdb/src/service_status_table/mod.rs +++ b/crates/storage-rocksdb/src/service_status_table/mod.rs @@ -18,7 +18,7 @@ use bytes::Bytes; use bytestring::ByteString; use prost::Message; use restate_storage_api::service_status_table::{ - ReadOnlyServiceStatusTable, ServiceStatus, ServiceStatusTable, + ReadOnlyVirtualObjectStatusTable, VirtualObjectStatus, VirtualObjectStatusTable, }; use restate_storage_api::{Result, StorageError}; use restate_storage_proto::storage; @@ -45,24 +45,24 @@ fn write_status_key(service_id: &ServiceId) -> ServiceStatusKey { fn to_service_status( partition_key: PartitionKey, pb_status: storage::v1::ServiceStatus, -) -> Result { +) -> Result { let invocation_uuid = InvocationUuid::try_from(pb_status).map_err(StorageError::from)?; - Ok(ServiceStatus::Locked(InvocationId::new( + Ok(VirtualObjectStatus::Locked(InvocationId::new( partition_key, invocation_uuid, ))) } -fn put_service_status( +fn put_virtual_object_status( storage: &mut S, service_id: &ServiceId, - status: ServiceStatus, + status: VirtualObjectStatus, ) { let key = ServiceStatusKey::default() .partition_key(service_id.partition_key()) .service_name(service_id.service_name.clone()) .service_key(service_id.key.clone()); - if status == ServiceStatus::Unlocked { + if status == VirtualObjectStatus::Unlocked { storage.delete_key(&key); } else { let value = ProtoValue(storage::v1::ServiceStatus::from(status)); @@ -70,10 +70,10 @@ fn put_service_status( } } -fn get_service_status( +fn get_virtual_object_status( storage: &mut S, service_id: &ServiceId, -) -> Result { +) -> Result { let key = ServiceStatusKey::default() .partition_key(service_id.partition_key()) .service_name(service_id.service_name.clone()) @@ -81,7 +81,7 @@ fn get_service_status( storage.get_blocking(key, move |_, v| { if v.is_none() { - return Ok(ServiceStatus::Unlocked); + return Ok(VirtualObjectStatus::Unlocked); } let v = v.unwrap(); let proto = storage::v1::ServiceStatus::decode(v) @@ -90,57 +90,67 @@ fn get_service_status( }) } -fn delete_service_status(storage: &mut S, service_id: &ServiceId) { +fn delete_virtual_object_status(storage: &mut S, service_id: &ServiceId) { let key = write_status_key(service_id); storage.delete_key(&key); } -impl ReadOnlyServiceStatusTable for RocksDBStorage { - async fn get_service_status(&mut self, service_id: &ServiceId) -> Result { - get_service_status(self, service_id) +impl ReadOnlyVirtualObjectStatusTable for RocksDBStorage { + async fn get_virtual_object_status( + &mut self, + service_id: &ServiceId, + ) -> Result { + get_virtual_object_status(self, service_id) } } -impl<'a> ReadOnlyServiceStatusTable for RocksDBTransaction<'a> { - async fn get_service_status(&mut self, service_id: &ServiceId) -> Result { - get_service_status(self, service_id) +impl<'a> ReadOnlyVirtualObjectStatusTable for RocksDBTransaction<'a> { + async fn get_virtual_object_status( + &mut self, + service_id: &ServiceId, + ) -> Result { + get_virtual_object_status(self, service_id) } } -impl<'a> ServiceStatusTable for RocksDBTransaction<'a> { - async fn put_service_status(&mut self, service_id: &ServiceId, status: ServiceStatus) { - put_service_status(self, service_id, status) +impl<'a> VirtualObjectStatusTable for RocksDBTransaction<'a> { + async fn put_virtual_object_status( + &mut self, + service_id: &ServiceId, + status: VirtualObjectStatus, + ) { + put_virtual_object_status(self, service_id, status) } - async fn delete_service_status(&mut self, service_id: &ServiceId) { - delete_service_status(self, service_id) + async fn delete_virtual_object_status(&mut self, service_id: &ServiceId) { + delete_virtual_object_status(self, service_id) } } #[derive(Clone, Debug)] -pub struct OwnedServiceStatusRow { +pub struct OwnedVirtualObjectStatusRow { pub partition_key: PartitionKey, - pub service: ByteString, - pub service_key: Bytes, - pub service_status: ServiceStatus, + pub name: ByteString, + pub key: Bytes, + pub status: VirtualObjectStatus, } impl RocksDBStorage { - pub fn all_service_status( + pub fn all_virtual_object_status( &self, range: RangeInclusive, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { let iter = self.iterator_from(PartitionKeyRange::(range)); OwnedIterator::new(iter).map(|(mut key, value)| { let state_key = ServiceStatusKey::deserialize_from(&mut key).unwrap(); let state_value = storage::v1::ServiceStatus::decode(value).unwrap(); let state_value = to_service_status(state_key.partition_key.unwrap(), state_value).unwrap(); - OwnedServiceStatusRow { + OwnedVirtualObjectStatusRow { partition_key: state_key.partition_key.unwrap(), - service: state_key.service_name.unwrap(), - service_key: state_key.service_key.unwrap(), - service_status: state_value, + name: state_key.service_name.unwrap(), + key: state_key.service_key.unwrap(), + status: state_value, } }) } diff --git a/crates/storage-rocksdb/tests/integration_test.rs b/crates/storage-rocksdb/tests/integration_test.rs index 4950ce87e..dec1637d9 100644 --- a/crates/storage-rocksdb/tests/integration_test.rs +++ b/crates/storage-rocksdb/tests/integration_test.rs @@ -27,9 +27,9 @@ mod inbox_table_test; mod invocation_status_table_test; mod journal_table_test; mod outbox_table_test; -mod service_status_table_test; mod state_table_test; mod timer_table_test; +mod virtual_object_status_table_test; fn storage_test_environment() -> (RocksDBStorage, impl Future) { // @@ -71,7 +71,7 @@ async fn test_read_write() { outbox_table_test::run_tests(rocksdb.clone()).await; state_table_test::run_tests(rocksdb.clone()).await; invocation_status_table_test::run_tests(rocksdb.clone()).await; - service_status_table_test::run_tests(rocksdb.clone()).await; + virtual_object_status_table_test::run_tests(rocksdb.clone()).await; timer_table_test::run_tests(rocksdb).await; close.await; diff --git a/crates/storage-rocksdb/tests/service_status_table_test/mod.rs b/crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs similarity index 62% rename from crates/storage-rocksdb/tests/service_status_table_test/mod.rs rename to crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs index 140a5b7e6..ccac99437 100644 --- a/crates/storage-rocksdb/tests/service_status_table_test/mod.rs +++ b/crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs @@ -8,36 +8,36 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::service_status_table::{ServiceStatus, ServiceStatusTable}; +use restate_storage_api::service_status_table::{VirtualObjectStatus, VirtualObjectStatusTable}; use restate_storage_rocksdb::RocksDBStorage; use restate_types::identifiers::{InvocationId, InvocationUuid, ServiceId}; const FIXTURE_INVOCATION: InvocationUuid = InvocationUuid::from_parts(1706027034946, 12345678900001); -async fn populate_data(txn: &mut T) { - txn.put_service_status( +async fn populate_data(txn: &mut T) { + txn.put_virtual_object_status( &ServiceId::with_partition_key(1337, "svc-1", "key-1"), - ServiceStatus::Locked(InvocationId::new(1337, FIXTURE_INVOCATION)), + VirtualObjectStatus::Locked(InvocationId::new(1337, FIXTURE_INVOCATION)), ) .await; - txn.put_service_status( + txn.put_virtual_object_status( &ServiceId::with_partition_key(1337, "svc-1", "key-2"), - ServiceStatus::Locked(InvocationId::new(1337, FIXTURE_INVOCATION)), + VirtualObjectStatus::Locked(InvocationId::new(1337, FIXTURE_INVOCATION)), ) .await; } -async fn verify_point_lookups(txn: &mut T) { +async fn verify_point_lookups(txn: &mut T) { let status = txn - .get_service_status(&ServiceId::with_partition_key(1337, "svc-1", "key-1")) + .get_virtual_object_status(&ServiceId::with_partition_key(1337, "svc-1", "key-1")) .await .expect("should not fail"); assert_eq!( status, - ServiceStatus::Locked(InvocationId::new(1337, FIXTURE_INVOCATION)) + VirtualObjectStatus::Locked(InvocationId::new(1337, FIXTURE_INVOCATION)) ); } diff --git a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs index b52697ac2..0fd19e04a 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs @@ -27,7 +27,7 @@ use restate_storage_api::invocation_status_table::{ }; use restate_storage_api::journal_table::JournalEntry; use restate_storage_api::outbox_table::OutboxMessage; -use restate_storage_api::service_status_table::ServiceStatus; +use restate_storage_api::service_status_table::VirtualObjectStatus; use restate_storage_api::timer_table::{Timer, TimerKey}; use restate_storage_api::Result as StorageResult; use restate_types::errors::{ @@ -62,10 +62,10 @@ use std::pin::pin; use tracing::{debug, instrument, trace}; pub trait StateReader { - fn get_service_status( + fn get_virtual_object_status( &mut self, service_id: &ServiceId, - ) -> impl Future> + Send; + ) -> impl Future> + Send; fn get_invocation_status( &mut self, @@ -154,7 +154,7 @@ where match command { Command::Invoke(service_invocation) => { let service_status = state - .get_service_status(&service_invocation.fid.service_id) + .get_virtual_object_status(&service_invocation.fid.service_id) .await?; let fid = service_invocation.fid.clone(); @@ -167,7 +167,7 @@ where effects, ) .await; - } else if let ServiceStatus::Unlocked = service_status { + } else if let VirtualObjectStatus::Unlocked = service_status { effects.invoke_service(service_invocation); } else { self.enqueue_into_inbox(effects, InboxEntry::Invocation(service_invocation)); @@ -226,13 +226,15 @@ where state: &mut State, effects: &mut Effects, ) -> Result<(Option, SpanRelation), Error> { - let service_status = state.get_service_status(&mutation.component_id).await?; + let service_status = state + .get_virtual_object_status(&mutation.component_id) + .await?; match service_status { - ServiceStatus::Locked(_) => { + VirtualObjectStatus::Locked(_) => { self.enqueue_into_inbox(effects, InboxEntry::StateMutation(mutation)) } - ServiceStatus::Unlocked => effects.apply_state_mutation(mutation), + VirtualObjectStatus::Unlocked => effects.apply_state_mutation(mutation), } Ok((None, SpanRelation::None)) diff --git a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs index f985d93ab..68219f6df 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs @@ -27,7 +27,7 @@ use crate::partition::state_machine::effects::Effect; #[derive(Default)] struct StateReaderMock { - services: HashMap, + services: HashMap, inboxes: HashMap>, invocations: HashMap, journals: HashMap>, @@ -51,7 +51,7 @@ impl StateReaderMock { fn lock_service(&mut self, service_id: ServiceId) { self.services.insert( service_id.clone(), - ServiceStatus::Locked(InvocationId::new( + VirtualObjectStatus::Locked(InvocationId::new( service_id.partition_key(), InvocationUuid::new(), )), @@ -68,7 +68,7 @@ impl StateReaderMock { self.services.insert( service_id.clone(), - ServiceStatus::Locked(invocation_id.clone()), + VirtualObjectStatus::Locked(invocation_id.clone()), ); self.register_invocation_status( invocation_id, @@ -91,7 +91,7 @@ impl StateReaderMock { self.services.insert( service_id.clone(), - ServiceStatus::Locked(invocation_id.clone()), + VirtualObjectStatus::Locked(invocation_id.clone()), ); self.register_invocation_status( invocation_id, @@ -154,12 +154,15 @@ impl StateReaderMock { } impl StateReader for StateReaderMock { - async fn get_service_status(&mut self, service_id: &ServiceId) -> StorageResult { + async fn get_virtual_object_status( + &mut self, + service_id: &ServiceId, + ) -> StorageResult { Ok(self .services .get(service_id) .cloned() - .unwrap_or(ServiceStatus::Unlocked)) + .unwrap_or(VirtualObjectStatus::Unlocked)) } async fn get_invocation_status( diff --git a/crates/worker/src/partition/state_machine/effect_interpreter.rs b/crates/worker/src/partition/state_machine/effect_interpreter.rs index a2ab655c7..c7cc2331b 100644 --- a/crates/worker/src/partition/state_machine/effect_interpreter.rs +++ b/crates/worker/src/partition/state_machine/effect_interpreter.rs @@ -21,7 +21,7 @@ use restate_storage_api::invocation_status_table::{ InvocationMetadata, InvocationStatus, JournalMetadata, StatusTimestamps, }; use restate_storage_api::outbox_table::OutboxMessage; -use restate_storage_api::service_status_table::ServiceStatus; +use restate_storage_api::service_status_table::VirtualObjectStatus; use restate_storage_api::timer_table::{Timer, TimerKey}; use restate_storage_api::Result as StorageResult; use restate_types::identifiers::{EntryIndex, FullInvocationId, InvocationId, ServiceId}; @@ -41,7 +41,7 @@ pub trait StateStorage { fn store_service_status( &mut self, service_id: &ServiceId, - service_status: ServiceStatus, + service_status: VirtualObjectStatus, ) -> impl Future> + Send; fn store_invocation_status( @@ -447,7 +447,7 @@ impl EffectInterpreter { } state_storage - .store_service_status(service_id, ServiceStatus::Unlocked) + .store_service_status(service_id, VirtualObjectStatus::Unlocked) .await?; Ok(()) @@ -508,7 +508,7 @@ impl EffectInterpreter { state_storage .store_service_status( &service_invocation.fid.service_id, - ServiceStatus::Locked(invocation_id.clone()), + VirtualObjectStatus::Locked(invocation_id.clone()), ) .await?; state_storage diff --git a/crates/worker/src/partition/storage/mod.rs b/crates/worker/src/partition/storage/mod.rs index 2d004d9c4..7e40271cd 100644 --- a/crates/worker/src/partition/storage/mod.rs +++ b/crates/worker/src/partition/storage/mod.rs @@ -23,7 +23,9 @@ use restate_storage_api::invocation_status_table::{ }; use restate_storage_api::journal_table::{JournalEntry, ReadOnlyJournalTable}; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; -use restate_storage_api::service_status_table::{ReadOnlyServiceStatusTable, ServiceStatus}; +use restate_storage_api::service_status_table::{ + ReadOnlyVirtualObjectStatusTable, VirtualObjectStatus, +}; use restate_storage_api::state_table::ReadOnlyStateTable; use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable}; use restate_storage_api::Result as StorageResult; @@ -104,7 +106,7 @@ impl PartitionStorage where Storage: ReadOnlyFsmTable + ReadOnlyInvocationStatusTable - + ReadOnlyServiceStatusTable + + ReadOnlyVirtualObjectStatusTable + ReadOnlyJournalTable + ReadOnlyStateTable + Send, @@ -276,9 +278,12 @@ impl super::state_machine::StateReader for Transaction StorageResult { + async fn get_virtual_object_status( + &mut self, + service_id: &ServiceId, + ) -> StorageResult { self.assert_partition_key(service_id); - self.inner.get_service_status(service_id).await + self.inner.get_virtual_object_status(service_id).await } async fn get_invocation_status( @@ -358,10 +363,12 @@ where async fn store_service_status( &mut self, service_id: &ServiceId, - status: ServiceStatus, + status: VirtualObjectStatus, ) -> StorageResult<()> { self.assert_partition_key(service_id); - self.inner.put_service_status(service_id, status).await; + self.inner + .put_virtual_object_status(service_id, status) + .await; Ok(()) }