diff --git a/Cargo.lock b/Cargo.lock index 818069b9d..58234124a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -685,6 +685,7 @@ dependencies = [ "futures", "futures-concurrency", "futures-time", + "once_cell", "serde", "serde_yaml", "thiserror", @@ -1523,9 +1524,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" +checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" [[package]] name = "opaque-debug" diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 4af1a1616..5b6840fbe 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -12,6 +12,7 @@ eyre = "0.6.7" futures = "0.3.21" futures-concurrency = "2.0.3" futures-time = "1.0.0" +once_cell = "1.13.0" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.8.23" thiserror = "1.0.30" diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index 18bda2530..a3b3ccd01 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -1,9 +1,11 @@ +use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, BTreeSet}, convert::Infallible, - fmt::Write as _, + fmt::{self, Write as _}, str::FromStr, + time::Duration, }; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -84,11 +86,62 @@ impl std::ops::Deref for DataId { } } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct InputMapping { - pub source: NodeId, - pub operator: Option, - pub output: DataId, +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum InputMapping { + Timer { interval: Duration }, + User(UserInputMapping), +} + +impl InputMapping { + pub fn source(&self) -> &NodeId { + static DORA_NODE_ID: OnceCell = OnceCell::new(); + + match self { + InputMapping::User(mapping) => &mapping.source, + InputMapping::Timer { .. } => DORA_NODE_ID.get_or_init(|| NodeId("dora".to_string())), + } + } + + pub fn operator(&self) -> &Option { + match self { + InputMapping::User(mapping) => &mapping.operator, + InputMapping::Timer { .. } => &None, + } + } +} + +impl fmt::Display for InputMapping { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + InputMapping::Timer { interval } => { + let duration = format_duration(*interval); + write!(f, "dora/timer/{duration}") + } + InputMapping::User(mapping) => { + if let Some(operator) = &mapping.operator { + write!(f, "{}/{operator}/{}", mapping.source, mapping.output) + } else { + write!(f, "{}/{}", mapping.source, mapping.output) + } + } + } + } +} + +pub struct FormattedDuration(pub Duration); + +impl fmt::Display for FormattedDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0.subsec_millis() == 0 { + write!(f, "secs/{}", self.0.as_secs()) + } else { + write!(f, "millis/{}", self.0.as_millis()) + } + } +} + +pub fn format_duration(interval: Duration) -> FormattedDuration { + FormattedDuration(interval) } impl Serialize for InputMapping { @@ -96,11 +149,7 @@ impl Serialize for InputMapping { where S: serde::Serializer, { - if let Some(operator) = &self.operator { - serializer.collect_str(&format_args!("{}/{operator}/{}", self.source, self.output)) - } else { - serializer.collect_str(&format_args!("{}/{}", self.source, self.output)) - } + serializer.collect_str(self) } } @@ -119,15 +168,69 @@ impl<'de> Deserialize<'de> for InputMapping { .map(|(op, out)| (Some(op), out)) .unwrap_or((None, rest)); - Ok(Self { - source: source.to_owned().into(), - operator: operator.map(|o| o.to_owned().into()), - output: output.to_owned().into(), - }) + let deserialized = match source { + "dora" => match operator { + Some("timer") => { + let (unit, value) = output.split_once('/').ok_or_else(|| { + serde::de::Error::custom( + "timer input must specify unit and value (e.g. `secs/5` or `millis/100`)", + ) + })?; + let interval = match unit { + "secs" => { + let value = value.parse().map_err(|_| { + serde::de::Error::custom(format!( + "secs must be an integer (got `{value}`)" + )) + })?; + Duration::from_secs(value) + } + "millis" => { + let value = value.parse().map_err(|_| { + serde::de::Error::custom(format!( + "millis must be an integer (got `{value}`)" + )) + })?; + Duration::from_millis(value) + } + other => { + return Err(serde::de::Error::custom(format!( + "timer unit must be either secs or millis (got `{other}`" + ))) + } + }; + Self::Timer { interval } + } + Some(other) => { + return Err(serde::de::Error::custom(format!( + "unknown dora input `{other}`" + ))) + } + None => { + return Err(serde::de::Error::custom(format!( + "dora input has invalid format" + ))) + } + }, + _ => Self::User(UserInputMapping { + source: source.to_owned().into(), + operator: operator.map(|o| o.to_owned().into()), + output: output.to_owned().into(), + }), + }; + + Ok(deserialized) } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct UserInputMapping { + pub source: NodeId, + pub operator: Option, + pub output: DataId, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub enum CommunicationConfig { Zenoh { diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 21510d119..21aa79efd 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -52,19 +52,8 @@ impl DoraNode { pub async fn inputs(&self) -> eyre::Result + '_> { let mut streams = Vec::new(); - for ( - input, - config::InputMapping { - source, - operator, - output, - }, - ) in &self.node_config.inputs - { - let topic = match operator { - Some(operator) => format!("{source}/{operator}/{output}"), - None => format!("{source}/{output}"), - }; + for (input, mapping) in &self.node_config.inputs { + let topic = mapping.to_string(); let sub = self .communication .subscribe(&topic) @@ -81,7 +70,7 @@ impl DoraNode { .node_config .inputs .values() - .map(|v| (&v.source, &v.operator)) + .map(|v| (v.source(), v.operator())) .collect(); for (source, operator) in &sources { let topic = match operator { diff --git a/binaries/coordinator/README.md b/binaries/coordinator/README.md index 3d6c73198..daf75599a 100644 --- a/binaries/coordinator/README.md +++ b/binaries/coordinator/README.md @@ -31,12 +31,12 @@ There are drawbacks too, for example: ```bash cargo build -p dora-runtime --release cargo build -p dora-coordinator --examples --release -cargo build --manifest-path ../runtime/examples/example-operator/Cargo.toml --release +cargo build --manifest-path ../examples/example-operator/Cargo.toml --release ``` - Compile the C example operator through: ```bash -cd ../runtime/examples/c-operator -cp ../../../apis/c/operator/api.h . +cd ../../examples/c-operator +cp ../../apis/c/operator/api.h . clang -c operator.c clang -shared -v operator.o -o operator.so ``` diff --git a/binaries/coordinator/examples/mini-dataflow.yml b/binaries/coordinator/examples/mini-dataflow.yml index 9d5af6113..6723bf9b4 100644 --- a/binaries/coordinator/examples/mini-dataflow.yml +++ b/binaries/coordinator/examples/mini-dataflow.yml @@ -38,20 +38,20 @@ nodes: - id: runtime-node operators: - id: op-1 - shared-library: ../target/release/libexample_operator.so + shared-library: ../../target/release/libexample_operator.so inputs: random: random/number time: timer/time outputs: - timestamped-random - id: op-2 - shared-library: ../runtime/examples/c-operator/operator.so + shared-library: ../../examples/c-operator/operator.so inputs: time: timer/time outputs: - counter - id: op-3 - python: ../runtime/examples/python-operator/op.py + python: ../../examples/python-operator/op.py inputs: time: timer/time test: python-operator/counter @@ -60,8 +60,9 @@ nodes: - id: python-operator operator: - python: ../runtime/examples/python-operator/op.py + python: ../../examples/python-operator/op.py inputs: time: timer/time + dora_time: dora/timer/millis/500 outputs: - counter diff --git a/binaries/coordinator/src/main.rs b/binaries/coordinator/src/main.rs index f6a743f09..e860c58ea 100644 --- a/binaries/coordinator/src/main.rs +++ b/binaries/coordinator/src/main.rs @@ -1,8 +1,12 @@ -use dora_core::descriptor::{self, CoreNodeKind, Descriptor}; -use dora_node_api::config::NodeId; +use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}; +use dora_node_api::{ + communication, + config::{format_duration, NodeId}, +}; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; use std::path::{Path, PathBuf}; +use tokio_stream::wrappers::IntervalStream; #[derive(Debug, Clone, clap::Parser)] #[clap(about = "Dora coordinator")] @@ -57,6 +61,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() })?; let nodes = descriptor.resolve_aliases(); + let dora_timers = collect_dora_timers(&nodes); let mut communication = descriptor.communication; if nodes @@ -94,6 +99,28 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() } } + for interval in dora_timers { + let communication = communication.clone(); + let task = tokio::spawn(async move { + let communication = communication::init(&communication) + .await + .wrap_err("failed to init communication layer")?; + let topic = { + let duration = format_duration(interval); + format!("dora/timer/{duration}") + }; + let mut stream = IntervalStream::new(tokio::time::interval(interval)); + while let Some(_) = stream.next().await { + let publish = communication.publish(&topic, &[]); + publish + .await + .wrap_err("failed to publish timer tick message")?; + } + Ok(()) + }); + tasks.push(task); + } + while let Some(task_result) = tasks.next().await { task_result .wrap_err("failed to join async task")? diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index 0838d644c..27f25b23a 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -4,19 +4,20 @@ use dora_core::descriptor::OperatorDefinition; use dora_node_api::{ self, communication::{self, CommunicationLayer}, - config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, + config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId, UserInputMapping}, STOP_TOPIC, }; use eyre::{bail, eyre, Context}; use futures::{ stream::{self, FuturesUnordered}, - FutureExt, StreamExt, + Future, FutureExt, StreamExt, }; use futures_concurrency::Merge; use operator::{Operator, OperatorEvent}; use std::{ collections::{BTreeMap, HashMap}, mem, + pin::Pin, }; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamMap}; @@ -171,34 +172,34 @@ async fn subscribe_operator<'a>( operator: &'a OperatorDefinition, communication: &'a dyn CommunicationLayer, ) -> Result + 'a, eyre::Error> { - let stop_messages = FuturesUnordered::new(); - for input in operator.config.inputs.values() { - let InputMapping { - source, operator, .. - } = input; - let topic = match operator { - Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), - None => format!("{source}/{STOP_TOPIC}"), - }; - let sub = communication - .subscribe(&topic) - .await - .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - stop_messages.push(sub.into_future()); + let stop_messages: FuturesUnordered>>> = + FuturesUnordered::new(); + for mapping in operator.config.inputs.values() { + match mapping { + InputMapping::User(UserInputMapping { + source, operator, .. + }) => { + let topic = match operator { + Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), + None => format!("{source}/{STOP_TOPIC}"), + }; + let sub = communication + .subscribe(&topic) + .await + .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; + stop_messages.push(Box::pin(sub.into_future().map(|_| ()))); + } + InputMapping::Timer { .. } => { + // dora timer inputs run forever + stop_messages.push(Box::pin(futures::future::pending())); + } + } } - let finished = Box::pin(stop_messages.all(|_| async { true }).shared()); + let finished = Box::pin(stop_messages.all(|()| async { true }).shared()); let mut streams = Vec::new(); for (input, mapping) in &operator.config.inputs { - let InputMapping { - source, - operator: source_operator, - output, - } = mapping; - let topic = match source_operator { - Some(operator) => format!("{source}/{operator}/{output}"), - None => format!("{source}/{output}"), - }; + let topic = mapping.to_string(); let sub = communication .subscribe(&topic) .await @@ -210,16 +211,15 @@ async fn subscribe_operator<'a>( data, }) .map(SubscribeEvent::Input) - .take_until(finished.clone()) - .chain(stream::once(async { - SubscribeEvent::InputsStopped { - target_operator: operator.id.clone(), - } - })); + .take_until(finished.clone()); streams.push(stream); } - Ok(streams.merge()) + Ok(streams.merge().chain(stream::once(async { + SubscribeEvent::InputsStopped { + target_operator: operator.id.clone(), + } + }))) } async fn publish( diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index e4a56171e..5ae271295 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -8,6 +8,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, path::PathBuf, }; +pub use visualize::collect_dora_timers; mod visualize; @@ -43,7 +44,10 @@ impl Descriptor { NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(), NodeKind::Operator(operator) => operator.config.inputs.values_mut().collect(), }; - for mapping in input_mappings { + for mapping in input_mappings.into_iter().filter_map(|m| match m { + InputMapping::Timer { .. } => None, + InputMapping::User(m) => Some(m), + }) { if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() { if mapping.operator.is_none() { mapping.operator = Some(op_name.to_owned()); diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index 6f2566519..8d4a8d0ab 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,8 +1,9 @@ use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode}; -use dora_node_api::config::{DataId, InputMapping, NodeId}; +use dora_node_api::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping}; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, fmt::Write as _, + time::Duration, }; pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { @@ -14,6 +15,18 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { all_nodes.insert(&node.id, node); } + let dora_timers = collect_dora_timers(nodes); + if !dora_timers.is_empty() { + writeln!(flowchart, "subgraph ___dora___ [dora]").unwrap(); + writeln!(flowchart, " subgraph ___timer_timer___ [timer]").unwrap(); + for interval in dora_timers { + let duration = format_duration(interval); + writeln!(flowchart, " dora/timer/{duration}[\\{duration}/]").unwrap(); + } + flowchart.push_str(" end\n"); + flowchart.push_str("end\n"); + } + for node in nodes { visualize_node_inputs(node, &mut flowchart, &all_nodes) } @@ -21,6 +34,37 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { flowchart } +pub fn collect_dora_timers(nodes: &[ResolvedNode]) -> BTreeSet { + let mut dora_timers = BTreeSet::new(); + for node in nodes { + match &node.kind { + CoreNodeKind::Runtime(node) => { + for operator in &node.operators { + collect_dora_nodes(operator.config.inputs.values(), &mut dora_timers); + } + } + CoreNodeKind::Custom(node) => { + collect_dora_nodes(node.run_config.inputs.values(), &mut dora_timers); + } + } + } + dora_timers +} + +fn collect_dora_nodes( + values: std::collections::btree_map::Values, + dora_timers: &mut BTreeSet, +) { + for input in values { + match input { + InputMapping::User(_) => {} + InputMapping::Timer { interval } => { + dora_timers.insert(*interval); + } + } + } +} + fn visualize_node(node: &ResolvedNode, flowchart: &mut String) { let node_id = &node.id; match &node.kind { @@ -100,46 +144,66 @@ fn visualize_inputs( nodes: &HashMap<&NodeId, &ResolvedNode>, ) { for (input_id, mapping) in inputs { - let InputMapping { - source, - operator, - output, - } = mapping; + match mapping { + mapping @ InputMapping::Timer { .. } => { + writeln!( + flowchart, + " {} -- {input_id} --> {target}", + mapping.to_string() + ) + .unwrap(); + } + InputMapping::User(mapping) => { + visualize_user_mapping(mapping, target, nodes, input_id, flowchart) + } + } + } +} - let mut source_found = false; - if let Some(source_node) = nodes.get(source) { - match (&source_node.kind, operator) { - (CoreNodeKind::Custom(custom_node), None) => { - if custom_node.run_config.outputs.contains(output) { +fn visualize_user_mapping( + mapping: &UserInputMapping, + target: &str, + nodes: &HashMap<&NodeId, &ResolvedNode>, + input_id: &DataId, + flowchart: &mut String, +) { + let UserInputMapping { + source, + operator, + output, + } = mapping; + let mut source_found = false; + if let Some(source_node) = nodes.get(source) { + match (&source_node.kind, operator) { + (CoreNodeKind::Custom(custom_node), None) => { + if custom_node.run_config.outputs.contains(output) { + let data = if output == input_id { + format!("{output}") + } else { + format!("{output} as {input_id}") + }; + writeln!(flowchart, " {source} -- {data} --> {target}").unwrap(); + source_found = true; + } + } + (CoreNodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { + if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) { + if operator.config.outputs.contains(output) { let data = if output == input_id { format!("{output}") } else { format!("{output} as {input_id}") }; - writeln!(flowchart, " {source} -- {data} --> {target}").unwrap(); + writeln!(flowchart, " {source}/{operator_id} -- {data} --> {target}") + .unwrap(); source_found = true; } } - (CoreNodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { - if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) { - if operator.config.outputs.contains(output) { - let data = if output == input_id { - format!("{output}") - } else { - format!("{output} as {input_id}") - }; - writeln!(flowchart, " {source}/{operator_id} -- {data} --> {target}") - .unwrap(); - source_found = true; - } - } - } - (CoreNodeKind::Custom(_), Some(_)) | (CoreNodeKind::Runtime(_), None) => {} } + (CoreNodeKind::Custom(_), Some(_)) | (CoreNodeKind::Runtime(_), None) => {} } - - if !source_found { - writeln!(flowchart, " missing>missing] -- {input_id} --> {target}").unwrap(); - } + } + if !source_found { + writeln!(flowchart, " missing>missing] -- {input_id} --> {target}").unwrap(); } }