From 3c8ee3779508f562c5e88841e130192df3a053c2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 27 Jul 2022 19:35:16 +0200 Subject: [PATCH 1/9] Implement parsing of new dora timer input keys --- Cargo.lock | 5 +- apis/rust/node/Cargo.toml | 1 + apis/rust/node/src/config.rs | 125 ++++++++++++++++++++++++++++++----- apis/rust/node/src/lib.rs | 17 +---- 4 files changed, 117 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 818069b9d..58234124a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -685,6 +685,7 @@ dependencies = [ "futures", "futures-concurrency", "futures-time", + "once_cell", "serde", "serde_yaml", "thiserror", @@ -1523,9 +1524,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" +checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" [[package]] name = "opaque-debug" diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 4af1a1616..5b6840fbe 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -12,6 +12,7 @@ eyre = "0.6.7" futures = "0.3.21" futures-concurrency = "2.0.3" futures-time = "1.0.0" +once_cell = "1.13.0" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.8.23" thiserror = "1.0.30" diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index 18bda2530..5102b0b8d 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -1,9 +1,11 @@ +use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, BTreeSet}, convert::Infallible, fmt::Write as _, str::FromStr, + time::Duration, }; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -84,11 +86,54 @@ impl std::ops::Deref for DataId { } } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct InputMapping { - pub source: NodeId, - pub operator: Option, - pub output: DataId, +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum InputMapping { + Timer { interval: Duration }, + User(UserInputMapping), +} + +impl InputMapping { + pub fn source(&self) -> &NodeId { + static DORA_NODE_ID: OnceCell = OnceCell::new(); + + match self { + InputMapping::User(mapping) => &mapping.source, + InputMapping::Timer { .. } => DORA_NODE_ID.get_or_init(|| NodeId("dora".to_string())), + } + } + + pub fn operator(&self) -> &Option { + match self { + InputMapping::User(mapping) => &mapping.operator, + InputMapping::Timer { .. } => &None, + } + } +} + +impl ToString for InputMapping { + fn to_string(&self) -> String { + match self { + InputMapping::Timer { interval } => { + let duration = format_duration(*interval); + format!("dora/timer/{duration}") + } + InputMapping::User(mapping) => { + if let Some(operator) = &mapping.operator { + format!("{}/{operator}/{}", mapping.source, mapping.output) + } else { + format!("{}/{}", mapping.source, mapping.output) + } + } + } + } +} + +pub fn format_duration(interval: Duration) -> String { + if interval.subsec_millis() == 0 { + format!("secs/{}", interval.as_secs()) + } else { + format!("millis/{}", interval.as_millis()) + } } impl Serialize for InputMapping { @@ -96,11 +141,7 @@ impl Serialize for InputMapping { where S: serde::Serializer, { - if let Some(operator) = &self.operator { - serializer.collect_str(&format_args!("{}/{operator}/{}", self.source, self.output)) - } else { - serializer.collect_str(&format_args!("{}/{}", self.source, self.output)) - } + serializer.serialize_str(&self.to_string()) } } @@ -119,14 +160,68 @@ impl<'de> Deserialize<'de> for InputMapping { .map(|(op, out)| (Some(op), out)) .unwrap_or((None, rest)); - Ok(Self { - source: source.to_owned().into(), - operator: operator.map(|o| o.to_owned().into()), - output: output.to_owned().into(), - }) + let deserialized = match source { + "dora" => match operator { + Some("timer") => { + let (unit, value) = output.split_once('/').ok_or_else(|| { + serde::de::Error::custom( + "timer input must specify unit and value (e.g. `secs/5` or `millis/100`)", + ) + })?; + let interval = match unit { + "secs" => { + let value = value.parse().map_err(|_| { + serde::de::Error::custom(format!( + "secs must be an integer (got `{value}`)" + )) + })?; + Duration::from_secs(value) + } + "millis" => { + let value = value.parse().map_err(|_| { + serde::de::Error::custom(format!( + "millis must be an integer (got `{value}`)" + )) + })?; + Duration::from_millis(value) + } + other => { + return Err(serde::de::Error::custom(format!( + "timer unit must be either secs or millis (got `{other}`" + ))) + } + }; + Self::Timer { interval } + } + Some(other) => { + return Err(serde::de::Error::custom(format!( + "unknown dora input `{other}`" + ))) + } + None => { + return Err(serde::de::Error::custom(format!( + "dora input has invalid format" + ))) + } + }, + _ => Self::User(UserInputMapping { + source: source.to_owned().into(), + operator: operator.map(|o| o.to_owned().into()), + output: output.to_owned().into(), + }), + }; + + Ok(deserialized) } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct UserInputMapping { + pub source: NodeId, + pub operator: Option, + pub output: DataId, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub enum CommunicationConfig { diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 21510d119..21aa79efd 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -52,19 +52,8 @@ impl DoraNode { pub async fn inputs(&self) -> eyre::Result + '_> { let mut streams = Vec::new(); - for ( - input, - config::InputMapping { - source, - operator, - output, - }, - ) in &self.node_config.inputs - { - let topic = match operator { - Some(operator) => format!("{source}/{operator}/{output}"), - None => format!("{source}/{output}"), - }; + for (input, mapping) in &self.node_config.inputs { + let topic = mapping.to_string(); let sub = self .communication .subscribe(&topic) @@ -81,7 +70,7 @@ impl DoraNode { .node_config .inputs .values() - .map(|v| (&v.source, &v.operator)) + .map(|v| (v.source(), v.operator())) .collect(); for (source, operator) in &sources { let topic = match operator { From acd0f6193f2fccf91ee2907f27dc638da5497cd0 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 27 Jul 2022 19:38:16 +0200 Subject: [PATCH 2/9] Adjust libraries and binaries for new `InputMapping` format --- .../coordinator/examples/mini-dataflow.yml | 1 + binaries/runtime/src/main.rs | 19 ++--- libraries/core/src/descriptor/mod.rs | 5 +- libraries/core/src/descriptor/visualize.rs | 79 +++++++++++-------- 4 files changed, 59 insertions(+), 45 deletions(-) diff --git a/binaries/coordinator/examples/mini-dataflow.yml b/binaries/coordinator/examples/mini-dataflow.yml index 9d5af6113..5647525c4 100644 --- a/binaries/coordinator/examples/mini-dataflow.yml +++ b/binaries/coordinator/examples/mini-dataflow.yml @@ -63,5 +63,6 @@ nodes: python: ../runtime/examples/python-operator/op.py inputs: time: timer/time + dora_time: dora/timer/secs/5 outputs: - counter diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index 0838d644c..ab1ff1627 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -4,7 +4,7 @@ use dora_core::descriptor::OperatorDefinition; use dora_node_api::{ self, communication::{self, CommunicationLayer}, - config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, + config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId, UserInputMapping}, STOP_TOPIC, }; use eyre::{bail, eyre, Context}; @@ -172,8 +172,11 @@ async fn subscribe_operator<'a>( communication: &'a dyn CommunicationLayer, ) -> Result + 'a, eyre::Error> { let stop_messages = FuturesUnordered::new(); - for input in operator.config.inputs.values() { - let InputMapping { + for input in operator.config.inputs.values().filter_map(|m| match m { + InputMapping::Timer { .. } => None, + InputMapping::User(m) => Some(m), + }) { + let UserInputMapping { source, operator, .. } = input; let topic = match operator { @@ -190,15 +193,7 @@ async fn subscribe_operator<'a>( let mut streams = Vec::new(); for (input, mapping) in &operator.config.inputs { - let InputMapping { - source, - operator: source_operator, - output, - } = mapping; - let topic = match source_operator { - Some(operator) => format!("{source}/{operator}/{output}"), - None => format!("{source}/{output}"), - }; + let topic = mapping.to_string(); let sub = communication .subscribe(&topic) .await diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index e4a56171e..4797f06cb 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -43,7 +43,10 @@ impl Descriptor { NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(), NodeKind::Operator(operator) => operator.config.inputs.values_mut().collect(), }; - for mapping in input_mappings { + for mapping in input_mappings.into_iter().filter_map(|m| match m { + InputMapping::Timer { .. } => None, + InputMapping::User(m) => Some(m), + }) { if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() { if mapping.operator.is_none() { mapping.operator = Some(op_name.to_owned()); diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index 6f2566519..1801743fd 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,8 +1,9 @@ use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode}; -use dora_node_api::config::{DataId, InputMapping, NodeId}; +use dora_node_api::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping}; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, fmt::Write as _, + time::Duration, }; pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { @@ -100,46 +101,60 @@ fn visualize_inputs( nodes: &HashMap<&NodeId, &ResolvedNode>, ) { for (input_id, mapping) in inputs { - let InputMapping { - source, - operator, - output, - } = mapping; + match mapping { + mapping @ InputMapping::Timer { .. } => { + } + InputMapping::User(mapping) => { + visualize_user_mapping(mapping, target, nodes, input_id, flowchart) + } + } + } +} - let mut source_found = false; - if let Some(source_node) = nodes.get(source) { - match (&source_node.kind, operator) { - (CoreNodeKind::Custom(custom_node), None) => { - if custom_node.run_config.outputs.contains(output) { +fn visualize_user_mapping( + mapping: &UserInputMapping, + target: &str, + nodes: &HashMap<&NodeId, &ResolvedNode>, + input_id: &DataId, + flowchart: &mut String, +) { + let UserInputMapping { + source, + operator, + output, + } = mapping; + let mut source_found = false; + if let Some(source_node) = nodes.get(source) { + match (&source_node.kind, operator) { + (CoreNodeKind::Custom(custom_node), None) => { + if custom_node.run_config.outputs.contains(output) { + let data = if output == input_id { + format!("{output}") + } else { + format!("{output} as {input_id}") + }; + writeln!(flowchart, " {source} -- {data} --> {target}").unwrap(); + source_found = true; + } + } + (CoreNodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { + if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) { + if operator.config.outputs.contains(output) { let data = if output == input_id { format!("{output}") } else { format!("{output} as {input_id}") }; - writeln!(flowchart, " {source} -- {data} --> {target}").unwrap(); + writeln!(flowchart, " {source}/{operator_id} -- {data} --> {target}") + .unwrap(); source_found = true; } } - (CoreNodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { - if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) { - if operator.config.outputs.contains(output) { - let data = if output == input_id { - format!("{output}") - } else { - format!("{output} as {input_id}") - }; - writeln!(flowchart, " {source}/{operator_id} -- {data} --> {target}") - .unwrap(); - source_found = true; - } - } - } - (CoreNodeKind::Custom(_), Some(_)) | (CoreNodeKind::Runtime(_), None) => {} } + (CoreNodeKind::Custom(_), Some(_)) | (CoreNodeKind::Runtime(_), None) => {} } - - if !source_found { - writeln!(flowchart, " missing>missing] -- {input_id} --> {target}").unwrap(); - } + } + if !source_found { + writeln!(flowchart, " missing>missing] -- {input_id} --> {target}").unwrap(); } } From 37c7970e492493691fbc054821032f1ac9c952e6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 27 Jul 2022 19:38:35 +0200 Subject: [PATCH 3/9] Add dora input nodes in visualization --- libraries/core/src/descriptor/visualize.rs | 44 ++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index 1801743fd..aeed8842d 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -15,6 +15,30 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { all_nodes.insert(&node.id, node); } + let mut dora_timers = BTreeSet::new(); + for node in nodes { + match &node.kind { + CoreNodeKind::Runtime(node) => { + for operator in &node.operators { + collect_dora_nodes(operator.config.inputs.values(), &mut dora_timers); + } + } + CoreNodeKind::Custom(node) => { + collect_dora_nodes(node.run_config.inputs.values(), &mut dora_timers); + } + } + } + if !dora_timers.is_empty() { + writeln!(flowchart, "subgraph ___dora___ [dora]").unwrap(); + writeln!(flowchart, " subgraph ___timer_timer___ [timer]").unwrap(); + for interval in dora_timers { + let duration = format_duration(interval); + writeln!(flowchart, " dora/timer/{duration}[\\{duration}/]").unwrap(); + } + flowchart.push_str(" end\n"); + flowchart.push_str("end\n"); + } + for node in nodes { visualize_node_inputs(node, &mut flowchart, &all_nodes) } @@ -22,6 +46,20 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { flowchart } +fn collect_dora_nodes( + values: std::collections::btree_map::Values, + dora_timers: &mut BTreeSet, +) { + for input in values { + match input { + InputMapping::User(_) => {} + InputMapping::Timer { interval } => { + dora_timers.insert(*interval); + } + } + } +} + fn visualize_node(node: &ResolvedNode, flowchart: &mut String) { let node_id = &node.id; match &node.kind { @@ -103,6 +141,12 @@ fn visualize_inputs( for (input_id, mapping) in inputs { match mapping { mapping @ InputMapping::Timer { .. } => { + writeln!( + flowchart, + " {} -- {input_id} --> {target}", + mapping.to_string() + ) + .unwrap(); } InputMapping::User(mapping) => { visualize_user_mapping(mapping, target, nodes, input_id, flowchart) From 20501ca461fa7fe28339af80dbb3b325be17b399 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 28 Jul 2022 11:33:07 +0200 Subject: [PATCH 4/9] Send dora timer messages from coordinator --- apis/rust/node/src/config.rs | 2 +- binaries/coordinator/src/main.rs | 31 ++++++++++++++++++++-- libraries/core/src/descriptor/mod.rs | 1 + libraries/core/src/descriptor/visualize.rs | 31 +++++++++++++--------- 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index 5102b0b8d..d2a206430 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -222,7 +222,7 @@ pub struct UserInputMapping { pub output: DataId, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub enum CommunicationConfig { Zenoh { diff --git a/binaries/coordinator/src/main.rs b/binaries/coordinator/src/main.rs index f6a743f09..e860c58ea 100644 --- a/binaries/coordinator/src/main.rs +++ b/binaries/coordinator/src/main.rs @@ -1,8 +1,12 @@ -use dora_core::descriptor::{self, CoreNodeKind, Descriptor}; -use dora_node_api::config::NodeId; +use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}; +use dora_node_api::{ + communication, + config::{format_duration, NodeId}, +}; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; use std::path::{Path, PathBuf}; +use tokio_stream::wrappers::IntervalStream; #[derive(Debug, Clone, clap::Parser)] #[clap(about = "Dora coordinator")] @@ -57,6 +61,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() })?; let nodes = descriptor.resolve_aliases(); + let dora_timers = collect_dora_timers(&nodes); let mut communication = descriptor.communication; if nodes @@ -94,6 +99,28 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() } } + for interval in dora_timers { + let communication = communication.clone(); + let task = tokio::spawn(async move { + let communication = communication::init(&communication) + .await + .wrap_err("failed to init communication layer")?; + let topic = { + let duration = format_duration(interval); + format!("dora/timer/{duration}") + }; + let mut stream = IntervalStream::new(tokio::time::interval(interval)); + while let Some(_) = stream.next().await { + let publish = communication.publish(&topic, &[]); + publish + .await + .wrap_err("failed to publish timer tick message")?; + } + Ok(()) + }); + tasks.push(task); + } + while let Some(task_result) = tasks.next().await { task_result .wrap_err("failed to join async task")? diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 4797f06cb..5ae271295 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -8,6 +8,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, path::PathBuf, }; +pub use visualize::collect_dora_timers; mod visualize; diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index aeed8842d..8d4a8d0ab 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -15,19 +15,7 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { all_nodes.insert(&node.id, node); } - let mut dora_timers = BTreeSet::new(); - for node in nodes { - match &node.kind { - CoreNodeKind::Runtime(node) => { - for operator in &node.operators { - collect_dora_nodes(operator.config.inputs.values(), &mut dora_timers); - } - } - CoreNodeKind::Custom(node) => { - collect_dora_nodes(node.run_config.inputs.values(), &mut dora_timers); - } - } - } + let dora_timers = collect_dora_timers(nodes); if !dora_timers.is_empty() { writeln!(flowchart, "subgraph ___dora___ [dora]").unwrap(); writeln!(flowchart, " subgraph ___timer_timer___ [timer]").unwrap(); @@ -46,6 +34,23 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { flowchart } +pub fn collect_dora_timers(nodes: &[ResolvedNode]) -> BTreeSet { + let mut dora_timers = BTreeSet::new(); + for node in nodes { + match &node.kind { + CoreNodeKind::Runtime(node) => { + for operator in &node.operators { + collect_dora_nodes(operator.config.inputs.values(), &mut dora_timers); + } + } + CoreNodeKind::Custom(node) => { + collect_dora_nodes(node.run_config.inputs.values(), &mut dora_timers); + } + } + } + dora_timers +} + fn collect_dora_nodes( values: std::collections::btree_map::Values, dora_timers: &mut BTreeSet, From 9b711d39646936798c22165ad664ebd4c55bec6b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 28 Jul 2022 12:54:01 +0200 Subject: [PATCH 5/9] Fix paths in `mini-dataflow.yml` example --- binaries/coordinator/examples/mini-dataflow.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/binaries/coordinator/examples/mini-dataflow.yml b/binaries/coordinator/examples/mini-dataflow.yml index 5647525c4..6723bf9b4 100644 --- a/binaries/coordinator/examples/mini-dataflow.yml +++ b/binaries/coordinator/examples/mini-dataflow.yml @@ -38,20 +38,20 @@ nodes: - id: runtime-node operators: - id: op-1 - shared-library: ../target/release/libexample_operator.so + shared-library: ../../target/release/libexample_operator.so inputs: random: random/number time: timer/time outputs: - timestamped-random - id: op-2 - shared-library: ../runtime/examples/c-operator/operator.so + shared-library: ../../examples/c-operator/operator.so inputs: time: timer/time outputs: - counter - id: op-3 - python: ../runtime/examples/python-operator/op.py + python: ../../examples/python-operator/op.py inputs: time: timer/time test: python-operator/counter @@ -60,9 +60,9 @@ nodes: - id: python-operator operator: - python: ../runtime/examples/python-operator/op.py + python: ../../examples/python-operator/op.py inputs: time: timer/time - dora_time: dora/timer/secs/5 + dora_time: dora/timer/millis/500 outputs: - counter From ed59e69eca8b95e6bfc79c62084cc706bcb144a2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 28 Jul 2022 12:54:14 +0200 Subject: [PATCH 6/9] Fix coordinator run instructions in README --- binaries/coordinator/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/binaries/coordinator/README.md b/binaries/coordinator/README.md index 3d6c73198..daf75599a 100644 --- a/binaries/coordinator/README.md +++ b/binaries/coordinator/README.md @@ -31,12 +31,12 @@ There are drawbacks too, for example: ```bash cargo build -p dora-runtime --release cargo build -p dora-coordinator --examples --release -cargo build --manifest-path ../runtime/examples/example-operator/Cargo.toml --release +cargo build --manifest-path ../examples/example-operator/Cargo.toml --release ``` - Compile the C example operator through: ```bash -cd ../runtime/examples/c-operator -cp ../../../apis/c/operator/api.h . +cd ../../examples/c-operator +cp ../../apis/c/operator/api.h . clang -c operator.c clang -shared -v operator.o -o operator.so ``` From e677adbd58f261a7d33fdfec1f2e3d79774f0417 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 28 Jul 2022 13:01:46 +0200 Subject: [PATCH 7/9] Keep operators that use dora timers as inputs running indefinitely The dora timers produce values forever. We only want to stop operators when all their input streams are closed. --- binaries/runtime/src/main.rs | 44 ++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index ab1ff1627..aed8592a7 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -10,13 +10,14 @@ use dora_node_api::{ use eyre::{bail, eyre, Context}; use futures::{ stream::{self, FuturesUnordered}, - FutureExt, StreamExt, + Future, FutureExt, StreamExt, }; use futures_concurrency::Merge; use operator::{Operator, OperatorEvent}; use std::{ collections::{BTreeMap, HashMap}, mem, + pin::Pin, }; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamMap}; @@ -171,25 +172,30 @@ async fn subscribe_operator<'a>( operator: &'a OperatorDefinition, communication: &'a dyn CommunicationLayer, ) -> Result + 'a, eyre::Error> { - let stop_messages = FuturesUnordered::new(); - for input in operator.config.inputs.values().filter_map(|m| match m { - InputMapping::Timer { .. } => None, - InputMapping::User(m) => Some(m), - }) { - let UserInputMapping { - source, operator, .. - } = input; - let topic = match operator { - Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), - None => format!("{source}/{STOP_TOPIC}"), - }; - let sub = communication - .subscribe(&topic) - .await - .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - stop_messages.push(sub.into_future()); + let stop_messages: FuturesUnordered>>> = + FuturesUnordered::new(); + for mapping in operator.config.inputs.values() { + match mapping { + InputMapping::User(UserInputMapping { + source, operator, .. + }) => { + let topic = match operator { + Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), + None => format!("{source}/{STOP_TOPIC}"), + }; + let sub = communication + .subscribe(&topic) + .await + .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; + stop_messages.push(Box::pin(sub.into_future().map(|_| ()))); + } + InputMapping::Timer { .. } => { + // dora timer inputs run forever + stop_messages.push(Box::pin(futures::future::pending())); + } + } } - let finished = Box::pin(stop_messages.all(|_| async { true }).shared()); + let finished = Box::pin(stop_messages.all(|()| async { true }).shared()); let mut streams = Vec::new(); for (input, mapping) in &operator.config.inputs { From 7158ff2d38c1b964865144dbed13834cca666fec Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 28 Jul 2022 13:03:45 +0200 Subject: [PATCH 8/9] Send `InputsStopped` event only once, after all inputs are closed Ensures that the event is only sent after all input straems are exhausted. This avoids a potential race when there are multiples inputs. --- binaries/runtime/src/main.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index aed8592a7..27f25b23a 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -211,16 +211,15 @@ async fn subscribe_operator<'a>( data, }) .map(SubscribeEvent::Input) - .take_until(finished.clone()) - .chain(stream::once(async { - SubscribeEvent::InputsStopped { - target_operator: operator.id.clone(), - } - })); + .take_until(finished.clone()); streams.push(stream); } - Ok(streams.merge()) + Ok(streams.merge().chain(stream::once(async { + SubscribeEvent::InputsStopped { + target_operator: operator.id.clone(), + } + }))) } async fn publish( From 047252be6bea9e8608b11e61e11ba27a6bc2479e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 29 Jul 2022 11:20:12 +0200 Subject: [PATCH 9/9] Avoid intermediate allocations by implementing Display instead of ToString --- apis/rust/node/src/config.rs | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index d2a206430..a3b3ccd01 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, BTreeSet}, convert::Infallible, - fmt::Write as _, + fmt::{self, Write as _}, str::FromStr, time::Duration, }; @@ -110,38 +110,46 @@ impl InputMapping { } } -impl ToString for InputMapping { - fn to_string(&self) -> String { +impl fmt::Display for InputMapping { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { InputMapping::Timer { interval } => { let duration = format_duration(*interval); - format!("dora/timer/{duration}") + write!(f, "dora/timer/{duration}") } InputMapping::User(mapping) => { if let Some(operator) = &mapping.operator { - format!("{}/{operator}/{}", mapping.source, mapping.output) + write!(f, "{}/{operator}/{}", mapping.source, mapping.output) } else { - format!("{}/{}", mapping.source, mapping.output) + write!(f, "{}/{}", mapping.source, mapping.output) } } } } } -pub fn format_duration(interval: Duration) -> String { - if interval.subsec_millis() == 0 { - format!("secs/{}", interval.as_secs()) - } else { - format!("millis/{}", interval.as_millis()) +pub struct FormattedDuration(pub Duration); + +impl fmt::Display for FormattedDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0.subsec_millis() == 0 { + write!(f, "secs/{}", self.0.as_secs()) + } else { + write!(f, "millis/{}", self.0.as_millis()) + } } } +pub fn format_duration(interval: Duration) -> FormattedDuration { + FormattedDuration(interval) +} + impl Serialize for InputMapping { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { - serializer.serialize_str(&self.to_string()) + serializer.collect_str(self) } }