Skip to content

Commit

Permalink
More terminology changes in tables (#1283)
Browse files Browse the repository at this point in the history
* Rename sys_service_status to sys_virtual_object_status.
Rename fields using service term to component.
Rename ServiceStatus to VirtualObjectStatus in storage-api

* Fix queries in CLI + more terminology changes
  • Loading branch information
slinkydeveloper authored Mar 19, 2024
1 parent e444dbf commit 37ec635
Show file tree
Hide file tree
Showing 32 changed files with 318 additions and 288 deletions.
210 changes: 105 additions & 105 deletions cli/src/clients/datafusion_helpers.rs

Large diffs are not rendered by default.

44 changes: 22 additions & 22 deletions cli/src/commands/components/status/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod detailed_status;
use crate::c_println;
use crate::cli_env::CliEnv;
use crate::clients::datafusion_helpers::{
ComponentMethodLockedKeysMap, ComponentStatus, ComponentStatusMap, InvocationState,
ComponentHandlerLockedKeysMap, ComponentStatus, ComponentStatusMap, InvocationState,
};
use crate::clients::MetasClient;
use crate::ui::component_methods::icon_for_component_type;
Expand All @@ -40,7 +40,7 @@ pub struct Status {
sample_invocations_limit: usize,

/// Service name, prints all services if omitted
service: Option<String>,
component: Option<String>,

#[clap(flatten)]
watch: Watch,
Expand All @@ -54,7 +54,7 @@ async fn status(env: &CliEnv, opts: &Status) -> Result<()> {
let metas_client = MetasClient::new(env)?;
let sql_client = crate::clients::DataFusionHttpClient::new(env)?;

if let Some(svc) = &opts.service {
if let Some(svc) = &opts.component {
detailed_status::run_detailed_status(env, svc, opts, metas_client, sql_client).await
} else {
agg_status::run_aggregated_status(env, opts, metas_client, sql_client).await
Expand All @@ -78,7 +78,7 @@ async fn render_components_status(
"OLDEST NON-SUSPENDED INVOCATION",
]);
for svc in services {
let svc_status = status_map.get_service_status(&svc.name).unwrap_or(&empty);
let svc_status = status_map.get_component_status(&svc.name).unwrap_or(&empty);
// Service title
let flavor = icon_for_component_type(&svc.ty);
let svc_title = format!("{} {}", svc.name, flavor);
Expand All @@ -93,14 +93,14 @@ async fn render_components_status(
Ok(())
}

fn render_method_state_stats(
fn render_handler_state_stats(
svc_status: &ComponentStatus,
method: &str,
state: InvocationState,
) -> Cell {
use comfy_table::Color;
// Pending
if let Some(state_stats) = svc_status.get_method_stats(state, method) {
if let Some(state_stats) = svc_status.get_handler_stats(state, method) {
let cell = Cell::new(state_stats.num_invocations);
let color = match state {
InvocationState::Unknown => Color::Magenta,
Expand All @@ -121,46 +121,46 @@ async fn render_methods_status(
svc: ComponentMetadata,
svc_status: &ComponentStatus,
) -> Result<()> {
for method in svc.handlers {
for handler in svc.handlers {
let mut row = vec![];
row.push(Cell::new(format!(" {}", &method.name)));
row.push(Cell::new(format!(" {}", &handler.name)));
// Pending
row.push(render_method_state_stats(
row.push(render_handler_state_stats(
svc_status,
&method.name,
&handler.name,
InvocationState::Pending,
));

// Ready
row.push(render_method_state_stats(
row.push(render_handler_state_stats(
svc_status,
&method.name,
&handler.name,
InvocationState::Ready,
));

// Running
row.push(render_method_state_stats(
row.push(render_handler_state_stats(
svc_status,
&method.name,
&handler.name,
InvocationState::Running,
));

// Backing-off
row.push(render_method_state_stats(
row.push(render_handler_state_stats(
svc_status,
&method.name,
&handler.name,
InvocationState::BackingOff,
));

row.push(render_method_state_stats(
row.push(render_handler_state_stats(
svc_status,
&method.name,
&handler.name,
InvocationState::Suspended,
));

let oldest_cell = if let Some(current_method) = svc_status.get_method(&method.name) {
let oldest_cell = if let Some(current_handler) = svc_status.get_handler(&handler.name) {
if let Some((oldest_state, oldest_stats)) =
current_method.oldest_non_suspended_invocation_state()
current_handler.oldest_non_suspended_invocation_state()
{
let dur = chrono::Local::now().signed_duration_since(oldest_stats.oldest_at);
let style = if dur.num_seconds() < 60 {
Expand Down Expand Up @@ -194,7 +194,7 @@ async fn render_methods_status(
}
async fn render_locked_keys(
env: &CliEnv,
locked_keys: ComponentMethodLockedKeysMap,
locked_keys: ComponentHandlerLockedKeysMap,
limit_per_service: usize,
) -> Result<()> {
let locked_keys = locked_keys.into_inner();
Expand All @@ -203,7 +203,7 @@ async fn render_locked_keys(
}

let mut table = Table::new_styled(&env.ui_config);
table.set_styled_header(vec!["", "QUEUE", "LOCKED BY", "METHOD", "NOTES"]);
table.set_styled_header(vec!["", "QUEUE", "LOCKED BY", "HANDLER", "NOTES"]);
for (svc_name, locked_keys) in locked_keys {
let mut keys: Vec<_> = locked_keys.into_iter().collect();
keys.sort_by(|(_, a), (_, b)| b.num_pending.cmp(&a.num_pending));
Expand Down
13 changes: 9 additions & 4 deletions cli/src/commands/invocations/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ async fn describe(env: &CliEnv, opts: &Describe) -> Result<()> {
if let Some(key) = &inv.key {
table.add_kv_row(
"Component:",
format!("{} {} {}", inv.service, style("@").dim(), style(key).dim(),),
format!(
"{} {} {}",
inv.component,
style("@").dim(),
style(key).dim(),
),
);
} else {
table.add_kv_row("Component:", &inv.service);
table.add_kv_row("Component:", &inv.component);
}
table.add_kv_row("Method:", &inv.method);
table.add_kv_row("Method:", &inv.handler);
add_invocation_to_kv_table(&mut table, &inv);
table.add_kv_row_if(
|| inv.state_modified_at.is_some(),
Expand Down Expand Up @@ -97,7 +102,7 @@ async fn describe(env: &CliEnv, opts: &Describe) -> Result<()> {
if let Some(parent) = &inv.invoked_by_id {
c_println!(
"{} {}",
inv.invoked_by_service
inv.invoked_by_component
.as_ref()
.map(|x| style(x.to_owned()).italic().blue())
.unwrap_or_else(|| style("<UNKNOWN>".to_owned()).red()),
Expand Down
18 changes: 9 additions & 9 deletions cli/src/commands/invocations/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ pub struct List {
/// Component to list invocations for
#[clap(long, visible_alias = "component", value_delimiter = ',')]
component: Vec<String>,
/// Filter by invocation on this method name
/// Filter by invocation on this handler name
#[clap(long, value_delimiter = ',')]
method: Vec<String>,
handler: Vec<String>,
/// Filter by status(es)
#[clap(long, ignore_case = true, value_delimiter = ',')]
status: Vec<InvocationState>,
Expand All @@ -44,7 +44,7 @@ pub struct List {
deployment: Vec<String>,
/// Only list invocations on keyed components only
#[clap(long)]
keyed_only: bool,
virtual_objects_only: bool,
/// Filter by invocations on this component key
#[clap(long, value_delimiter = ',')]
key: Vec<String>,
Expand Down Expand Up @@ -96,29 +96,29 @@ async fn list(env: &CliEnv, opts: &List) -> Result<()> {

if !opts.component.is_empty() {
inbox_filters.push(format!(
"ss.service IN ({})",
"ss.component IN ({})",
opts.component
.iter()
.map(|x| format!("'{}'", x))
.format(",")
));
}

if !opts.method.is_empty() {
if !opts.handler.is_empty() {
inbox_filters.push(format!(
"ss.method IN ({})",
opts.method.iter().map(|x| format!("'{}'", x)).format(",")
"ss.handler IN ({})",
opts.handler.iter().map(|x| format!("'{}'", x)).format(",")
));
}

if !opts.key.is_empty() {
inbox_filters.push(format!(
"ss.service_key IN ({})",
"ss.component_key IN ({})",
opts.key.iter().map(|x| format!("'{}'", x)).format(",")
));
}

if opts.keyed_only {
if opts.virtual_objects_only {
inbox_filters.push("comp.ty = 'virtual_object'".to_owned());
}

Expand Down
2 changes: 1 addition & 1 deletion cli/src/commands/state/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) async fn get_current_state(
//
let sql_client = crate::clients::DataFusionHttpClient::new(env)?;
let sql = format!(
"select key, value from state where service = '{}' and service_key = '{}' ;",
"select key, value from state where component = '{}' and component_key = '{}' ;",
service, key
);
let res = sql_client.run_query(sql).await?;
Expand Down
12 changes: 6 additions & 6 deletions cli/src/ui/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ pub fn invocation_qualified_name(invocation: &Invocation) -> String {
let svc = if let Some(key) = &invocation.key {
format!(
"[{} {} {}]",
invocation.service,
invocation.component,
style("@").dim(),
style(key).dim(),
)
} else {
invocation.service.to_string()
invocation.component.to_string()
};
format!("{}{}{}", svc, style("::").dim(), invocation.method)
format!("{}{}{}", svc, style("::").dim(), invocation.handler)
}

// ❯ [2023-12-14 15:38:52.500 +00:00] rIEqK14GCdkAYxo-wzTfrK2e6tJssIrtQ CheckoutProcess::checkout
Expand Down Expand Up @@ -146,7 +146,7 @@ pub fn add_invocation_to_kv_table(table: &mut Table, invocation: &Invocation) {
let invoked_by_msg = format!(
"{} {}",
invocation
.invoked_by_service
.invoked_by_component
.as_ref()
.map(|x| style(x.to_owned()).italic().blue())
.unwrap_or_else(|| style("<UNKNOWN>".to_owned()).red()),
Expand Down Expand Up @@ -256,9 +256,9 @@ pub fn format_entry_type_details(entry_type: &JournalEntryType) -> String {
JournalEntryType::Invoke(inv) | JournalEntryType::BackgroundInvoke(inv) => {
format!(
"{}{}{} {}",
inv.invoked_service.as_ref().unwrap(),
inv.invoked_component.as_ref().unwrap(),
style("::").dim(),
inv.invoked_method.as_ref().unwrap(),
inv.invoked_handler.as_ref().unwrap(),
inv.invocation_id.as_deref().unwrap_or(""),
)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/storage-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub trait Storage {
pub trait Transaction:
state_table::StateTable
+ invocation_status_table::InvocationStatusTable
+ service_status_table::ServiceStatusTable
+ service_status_table::VirtualObjectStatusTable
+ inbox_table::InboxTable
+ outbox_table::OutboxTable
+ deduplication_table::DeduplicationTable
Expand Down
19 changes: 11 additions & 8 deletions crates/storage-api/src/service_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@ use restate_types::identifiers::{InvocationId, ServiceId};
use std::future::Future;

#[derive(Debug, Default, Clone, PartialEq)]
pub enum ServiceStatus {
pub enum VirtualObjectStatus {
Locked(InvocationId),
#[default]
Unlocked,
}

pub trait ReadOnlyServiceStatusTable {
fn get_service_status(
pub trait ReadOnlyVirtualObjectStatusTable {
fn get_virtual_object_status(
&mut self,
service_id: &ServiceId,
) -> impl Future<Output = Result<ServiceStatus>> + Send;
) -> impl Future<Output = Result<VirtualObjectStatus>> + Send;
}

pub trait ServiceStatusTable: ReadOnlyServiceStatusTable {
fn put_service_status(
pub trait VirtualObjectStatusTable: ReadOnlyVirtualObjectStatusTable {
fn put_virtual_object_status(
&mut self,
service_id: &ServiceId,
status: ServiceStatus,
status: VirtualObjectStatus,
) -> impl Future<Output = ()> + Send;

fn delete_service_status(&mut self, service_id: &ServiceId) -> impl Future<Output = ()> + Send;
fn delete_virtual_object_status(
&mut self,
service_id: &ServiceId,
) -> impl Future<Output = ()> + Send;
}
10 changes: 6 additions & 4 deletions crates/storage-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,12 @@ pub mod storage {
}
}

impl From<restate_storage_api::service_status_table::ServiceStatus> for ServiceStatus {
fn from(value: restate_storage_api::service_status_table::ServiceStatus) -> Self {
impl From<restate_storage_api::service_status_table::VirtualObjectStatus> for ServiceStatus {
fn from(
value: restate_storage_api::service_status_table::VirtualObjectStatus,
) -> Self {
match value {
restate_storage_api::service_status_table::ServiceStatus::Locked(
restate_storage_api::service_status_table::VirtualObjectStatus::Locked(
invocation_id,
) => ServiceStatus {
status: Some(service_status::Status::Locked(service_status::Locked {
Expand All @@ -116,7 +118,7 @@ pub mod storage {
.into(),
})),
},
restate_storage_api::service_status_table::ServiceStatus::Unlocked => {
restate_storage_api::service_status_table::VirtualObjectStatus::Unlocked => {
unreachable!("Nothing should be stored for unlocked")
}
}
Expand Down
10 changes: 5 additions & 5 deletions crates/storage-query-datafusion/src/inbox/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ pub(crate) fn append_inbox_row(
let mut row = builder.row();
row.partition_key(fid.partition_key());

row.service(&fid.service_id.service_name);
row.method(&method_name);
row.component(&fid.service_id.service_name);
row.handler(&method_name);

row.service_key(
row.component_key(
std::str::from_utf8(&fid.service_id.key).expect("The key must be a string!"),
);

Expand All @@ -51,8 +51,8 @@ pub(crate) fn append_inbox_row(

match caller {
Source::Service(caller) => {
row.invoked_by("service");
row.invoked_by_service(&caller.service_id.service_name);
row.invoked_by("component");
row.invoked_by_component(&caller.service_id.service_name);
if row.is_invoked_by_id_defined() {
row.invoked_by_id(format_using(output, &caller));
}
Expand Down
8 changes: 4 additions & 4 deletions crates/storage-query-datafusion/src/inbox/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ use datafusion::arrow::datatypes::DataType;
define_table!(inbox(
partition_key: DataType::UInt64,

service: DataType::LargeUtf8,
method: DataType::LargeUtf8,
component: DataType::LargeUtf8,
handler: DataType::LargeUtf8,

service_key: DataType::LargeUtf8,
component_key: DataType::LargeUtf8,

id: DataType::LargeUtf8,

sequence_number: DataType::UInt64,

invoked_by: DataType::LargeUtf8,
invoked_by_service: DataType::LargeUtf8,
invoked_by_component: DataType::LargeUtf8,
invoked_by_id: DataType::LargeUtf8,
trace_id: DataType::LargeUtf8,

Expand Down
4 changes: 2 additions & 2 deletions crates/storage-query-datafusion/src/invocation_state/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub(crate) fn append_state_row(
let invocation_id = status_row.full_invocation_id();

row.partition_key(invocation_id.service_id.partition_key());
row.service(&invocation_id.service_id.service_name);
row.service_key(
row.component(&invocation_id.service_id.service_name);
row.component_key(
std::str::from_utf8(&invocation_id.service_id.key).expect("The key must be a string!"),
);
if row.is_id_defined() {
Expand Down
Loading

0 comments on commit 37ec635

Please sign in to comment.