From bc56b5947dc59c834cbfc65431b62c62cc7100a7 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 26 Jul 2022 13:35:35 +0200 Subject: [PATCH 1/3] Implement alias syntax for runtime nodes with single python operator --- apis/rust/node/src/config.rs | 2 +- .../coordinator/examples/mini-dataflow.yml | 9 ++ binaries/coordinator/src/main.rs | 20 ++-- libraries/core/src/descriptor/mod.rs | 95 ++++++++++++++++++- libraries/core/src/descriptor/visualize.rs | 28 +++--- 5 files changed, 128 insertions(+), 26 deletions(-) diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index 6130e22e9..18bda2530 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -6,7 +6,7 @@ use std::{ str::FromStr, }; -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct NodeRunConfig { #[serde(default)] diff --git a/binaries/coordinator/examples/mini-dataflow.yml b/binaries/coordinator/examples/mini-dataflow.yml index 8579a5db5..1083d8470 100644 --- a/binaries/coordinator/examples/mini-dataflow.yml +++ b/binaries/coordinator/examples/mini-dataflow.yml @@ -54,5 +54,14 @@ nodes: python: ../runtime/examples/python-operator/op.py inputs: time: timer/time + test: python-operator/counter outputs: - counter + + - id: python-operator + python: + path: ../runtime/examples/python-operator/op.py + inputs: + time: timer/time + outputs: + - counter diff --git a/binaries/coordinator/src/main.rs b/binaries/coordinator/src/main.rs index 376638a60..f6a743f09 100644 --- a/binaries/coordinator/src/main.rs +++ b/binaries/coordinator/src/main.rs @@ -1,4 +1,4 @@ -use dora_core::descriptor::{self, Descriptor, NodeKind}; +use dora_core::descriptor::{self, CoreNodeKind, Descriptor}; use dora_node_api::config::NodeId; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; @@ -49,17 +49,21 @@ async fn main() -> eyre::Result<()> { } async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> { - let Descriptor { - mut communication, - nodes, - } = read_descriptor(&dataflow_path).await.wrap_err_with(|| { + let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { format!( "failed to read dataflow descriptor at {}", dataflow_path.display() ) })?; - if nodes.iter().any(|n| matches!(n.kind, NodeKind::Runtime(_))) && !runtime.is_file() { + let nodes = descriptor.resolve_aliases(); + let mut communication = descriptor.communication; + + if nodes + .iter() + .any(|n| matches!(n.kind, CoreNodeKind::Runtime(_))) + && !runtime.is_file() + { bail!( "There is no runtime at {}, or it is not a file", runtime.display() @@ -74,12 +78,12 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() let node_id = node.id.clone(); match node.kind { - descriptor::NodeKind::Custom(node) => { + descriptor::CoreNodeKind::Custom(node) => { let result = spawn_custom_node(node_id.clone(), &node, &communication) .wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?; tasks.push(result); } - descriptor::NodeKind::Runtime(node) => { + descriptor::CoreNodeKind::Runtime(node) => { if !node.operators.is_empty() { let result = spawn_runtime_node(runtime, node_id.clone(), &node, &communication) diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 2d2240ec5..96d8c7db6 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -2,6 +2,7 @@ use dora_node_api::config::{ CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId, }; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::fmt; use std::{ collections::{BTreeMap, BTreeSet}, @@ -18,14 +19,69 @@ pub struct Descriptor { } impl Descriptor { + pub fn resolve_aliases(&self) -> Vec { + const PYTHON_OP_NAME: &str = "op"; + + let python_nodes: HashSet<_> = self + .nodes + .iter() + .filter(|n| matches!(n.kind, NodeKind::Python(_))) + .map(|n| &n.id) + .collect(); + + let mut resolved = vec![]; + for mut node in self.nodes.clone() { + // adjust input mappings + let input_mappings: Vec<_> = match &mut node.kind { + NodeKind::Runtime(node) => node + .operators + .iter_mut() + .flat_map(|op| op.inputs.values_mut()) + .collect(), + NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(), + NodeKind::Python(node) => node.inputs.values_mut().collect(), + }; + for mapping in input_mappings { + if python_nodes.contains(&mapping.source) { + assert_eq!(mapping.operator, None); + mapping.operator = Some(OperatorId::from(PYTHON_OP_NAME.to_string())); + } + } + + // resolve nodes + let kind = match node.kind { + NodeKind::Custom(node) => CoreNodeKind::Custom(node), + NodeKind::Runtime(node) => CoreNodeKind::Runtime(node), + NodeKind::Python(node) => CoreNodeKind::Runtime(RuntimeNode { + operators: vec![OperatorConfig { + id: OperatorId::from(PYTHON_OP_NAME.to_string()), + name: None, + description: None, + inputs: node.inputs, + outputs: node.outputs, + source: OperatorSource::Python(node.path), + }], + }), + }; + resolved.push(ResolvedNode { + id: node.id, + name: node.name, + description: node.description, + kind, + }); + } + resolved + } + pub fn visualize_as_mermaid(&self) -> eyre::Result { - let flowchart = visualize::visualize_nodes(&self.nodes); + let resolved = self.resolve_aliases(); + let flowchart = visualize::visualize_nodes(&resolved); Ok(flowchart) } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Node { pub id: NodeId, pub name: Option, @@ -35,16 +91,36 @@ pub struct Node { pub kind: NodeKind, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum NodeKind { /// Dora runtime node #[serde(rename = "operators")] Runtime(RuntimeNode), Custom(CustomNode), + Python(PythonOperatorConfig), } #[derive(Debug, Serialize, Deserialize)] +pub struct ResolvedNode { + pub id: NodeId, + pub name: Option, + pub description: Option, + + #[serde(flatten)] + pub kind: CoreNodeKind, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CoreNodeKind { + /// Dora runtime node + #[serde(rename = "operators")] + Runtime(RuntimeNode), + Custom(CustomNode), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(transparent)] pub struct RuntimeNode { pub operators: Vec, @@ -73,7 +149,16 @@ pub enum OperatorSource { Wasm(PathBuf), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PythonOperatorConfig { + pub path: PathBuf, + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CustomNode { pub run: String, pub env: Option>, @@ -83,7 +168,7 @@ pub struct CustomNode { pub run_config: NodeRunConfig, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum EnvValue { Bool(bool), diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index 0603dd24f..f39b2c932 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,11 +1,11 @@ -use super::{CustomNode, Node, NodeKind, OperatorConfig, RuntimeNode}; +use super::{CoreNodeKind, CustomNode, OperatorConfig, ResolvedNode, RuntimeNode}; use dora_node_api::config::{DataId, InputMapping, NodeId}; use std::{ collections::{BTreeMap, HashMap}, fmt::Write as _, }; -pub fn visualize_nodes(nodes: &[Node]) -> String { +pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { let mut flowchart = "flowchart TB\n".to_owned(); let mut all_nodes = HashMap::new(); @@ -21,11 +21,11 @@ pub fn visualize_nodes(nodes: &[Node]) -> String { flowchart } -fn visualize_node(node: &Node, flowchart: &mut String) { +fn visualize_node(node: &ResolvedNode, flowchart: &mut String) { let node_id = &node.id; match &node.kind { - NodeKind::Custom(node) => visualize_custom_node(node_id, node, flowchart), - NodeKind::Runtime(RuntimeNode { operators }) => { + CoreNodeKind::Custom(node) => visualize_custom_node(node_id, node, flowchart), + CoreNodeKind::Runtime(RuntimeNode { operators }) => { visualize_runtime_node(node_id, operators, flowchart) } } @@ -63,16 +63,20 @@ fn visualize_runtime_node(node_id: &NodeId, operators: &[OperatorConfig], flowch flowchart.push_str("end\n"); } -fn visualize_node_inputs(node: &Node, flowchart: &mut String, nodes: &HashMap<&NodeId, &Node>) { +fn visualize_node_inputs( + node: &ResolvedNode, + flowchart: &mut String, + nodes: &HashMap<&NodeId, &ResolvedNode>, +) { let node_id = &node.id; match &node.kind { - NodeKind::Custom(node) => visualize_inputs( + CoreNodeKind::Custom(node) => visualize_inputs( &node_id.to_string(), &node.run_config.inputs, flowchart, nodes, ), - NodeKind::Runtime(RuntimeNode { operators }) => { + CoreNodeKind::Runtime(RuntimeNode { operators }) => { for operator in operators { visualize_inputs( &format!("{node_id}/{}", operator.id), @@ -89,7 +93,7 @@ fn visualize_inputs( target: &str, inputs: &BTreeMap, flowchart: &mut String, - nodes: &HashMap<&NodeId, &Node>, + nodes: &HashMap<&NodeId, &ResolvedNode>, ) { for (input_id, mapping) in inputs { let InputMapping { @@ -101,7 +105,7 @@ fn visualize_inputs( let mut source_found = false; if let Some(source_node) = nodes.get(source) { match (&source_node.kind, operator) { - (NodeKind::Custom(custom_node), None) => { + (CoreNodeKind::Custom(custom_node), None) => { if custom_node.run_config.outputs.contains(output) { let data = if output == input_id { format!("{output}") @@ -112,7 +116,7 @@ fn visualize_inputs( source_found = true; } } - (NodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { + (CoreNodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) { if operator.outputs.contains(output) { let data = if output == input_id { @@ -126,7 +130,7 @@ fn visualize_inputs( } } } - (NodeKind::Custom(_), Some(_)) | (NodeKind::Runtime(_), None) => {} + (CoreNodeKind::Custom(_), Some(_)) | (CoreNodeKind::Runtime(_), None) => {} } } From 45378870ad06111f94d1f2b0f78874a517697556 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 26 Jul 2022 15:18:09 +0200 Subject: [PATCH 2/3] Generalize support for single-operator nodes, independent of their type --- .../coordinator/examples/mini-dataflow.yml | 4 +- binaries/runtime/src/main.rs | 14 ++--- binaries/runtime/src/operator/mod.rs | 10 ++-- libraries/core/src/descriptor/mod.rs | 55 ++++++++++++------- libraries/core/src/descriptor/visualize.rs | 16 ++++-- 5 files changed, 58 insertions(+), 41 deletions(-) diff --git a/binaries/coordinator/examples/mini-dataflow.yml b/binaries/coordinator/examples/mini-dataflow.yml index 1083d8470..9d5af6113 100644 --- a/binaries/coordinator/examples/mini-dataflow.yml +++ b/binaries/coordinator/examples/mini-dataflow.yml @@ -59,8 +59,8 @@ nodes: - counter - id: python-operator - python: - path: ../runtime/examples/python-operator/op.py + operator: + python: ../runtime/examples/python-operator/op.py inputs: time: timer/time outputs: diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index f61326187..b7a73fdf2 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,6 +1,6 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_core::descriptor::OperatorConfig; +use dora_core::descriptor::OperatorDefinition; use dora_node_api::{ self, communication::{self, CommunicationLayer}, @@ -37,7 +37,7 @@ async fn main() -> eyre::Result<()> { .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; serde_yaml::from_str(&raw).context("failed to deserialize communication config")? }; - let operators: Vec = { + let operators: Vec = { let raw = std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; serde_yaml::from_str(&raw).context("failed to deserialize operator config")? @@ -107,7 +107,7 @@ async fn main() -> eyre::Result<()> { .ok_or_else(|| eyre!("received event from unknown operator {id}"))?; match event { OperatorEvent::Output { id: data_id, value } => { - if !operator.config().outputs.contains(&data_id) { + if !operator.config().config.outputs.contains(&data_id) { eyre::bail!("unknown output {data_id} for operator {id}"); } publish(&node_id, id, data_id, &value, communication.as_ref()) @@ -154,7 +154,7 @@ async fn main() -> eyre::Result<()> { } async fn subscribe<'a>( - operators: &'a [OperatorConfig], + operators: &'a [OperatorDefinition], communication: &'a dyn CommunicationLayer, ) -> eyre::Result + 'a> { let mut streams = Vec::new(); @@ -168,11 +168,11 @@ async fn subscribe<'a>( } async fn subscribe_operator<'a>( - operator: &'a OperatorConfig, + operator: &'a OperatorDefinition, communication: &'a dyn CommunicationLayer, ) -> Result + 'a, eyre::Error> { let stop_messages = FuturesUnordered::new(); - for input in operator.inputs.values() { + for input in operator.config.inputs.values() { let InputMapping { source, operator, .. } = input; @@ -189,7 +189,7 @@ async fn subscribe_operator<'a>( let finished = Box::pin(stop_messages.all(|_| async { true }).shared()); let mut streams = Vec::new(); - for (input, mapping) in &operator.inputs { + for (input, mapping) in &operator.config.inputs { let InputMapping { source, operator: source_operator, diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index d3c3d9007..60f7da78f 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,4 +1,4 @@ -use dora_core::descriptor::{OperatorConfig, OperatorSource}; +use dora_core::descriptor::{OperatorDefinition, OperatorSource}; use dora_node_api::config::DataId; use eyre::{eyre, Context}; use std::any::Any; @@ -9,17 +9,17 @@ mod shared_lib; pub struct Operator { operator_task: Sender, - config: OperatorConfig, + config: OperatorDefinition, } impl Operator { pub async fn init( - operator_config: OperatorConfig, + operator_config: OperatorDefinition, events_tx: Sender, ) -> eyre::Result { let (operator_task, operator_rx) = mpsc::channel(10); - match &operator_config.source { + match &operator_config.config.source { OperatorSource::SharedLibrary(path) => { shared_lib::spawn(path, events_tx, operator_rx).wrap_err_with(|| { format!( @@ -54,7 +54,7 @@ impl Operator { /// Get a reference to the operator's config. #[must_use] - pub fn config(&self) -> &OperatorConfig { + pub fn config(&self) -> &OperatorDefinition { &self.config } } diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 96d8c7db6..e4a56171e 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -2,7 +2,7 @@ use dora_node_api::config::{ CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId, }; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::HashMap; use std::fmt; use std::{ collections::{BTreeMap, BTreeSet}, @@ -20,13 +20,15 @@ pub struct Descriptor { impl Descriptor { pub fn resolve_aliases(&self) -> Vec { - const PYTHON_OP_NAME: &str = "op"; + let default_op_id = OperatorId::from("op".to_string()); - let python_nodes: HashSet<_> = self + let single_operator_nodes: HashMap<_, _> = self .nodes .iter() - .filter(|n| matches!(n.kind, NodeKind::Python(_))) - .map(|n| &n.id) + .filter_map(|n| match &n.kind { + NodeKind::Operator(op) => Some((&n.id, op.id.as_ref().unwrap_or(&default_op_id))), + _ => None, + }) .collect(); let mut resolved = vec![]; @@ -36,15 +38,16 @@ impl Descriptor { NodeKind::Runtime(node) => node .operators .iter_mut() - .flat_map(|op| op.inputs.values_mut()) + .flat_map(|op| op.config.inputs.values_mut()) .collect(), NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(), - NodeKind::Python(node) => node.inputs.values_mut().collect(), + NodeKind::Operator(operator) => operator.config.inputs.values_mut().collect(), }; for mapping in input_mappings { - if python_nodes.contains(&mapping.source) { - assert_eq!(mapping.operator, None); - mapping.operator = Some(OperatorId::from(PYTHON_OP_NAME.to_string())); + if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() { + if mapping.operator.is_none() { + mapping.operator = Some(op_name.to_owned()); + } } } @@ -52,14 +55,10 @@ impl Descriptor { let kind = match node.kind { NodeKind::Custom(node) => CoreNodeKind::Custom(node), NodeKind::Runtime(node) => CoreNodeKind::Runtime(node), - NodeKind::Python(node) => CoreNodeKind::Runtime(RuntimeNode { - operators: vec![OperatorConfig { - id: OperatorId::from(PYTHON_OP_NAME.to_string()), - name: None, - description: None, - inputs: node.inputs, - outputs: node.outputs, - source: OperatorSource::Python(node.path), + NodeKind::Operator(op) => CoreNodeKind::Runtime(RuntimeNode { + operators: vec![OperatorDefinition { + id: op.id.unwrap_or_else(|| default_op_id.clone()), + config: op.config, }], }), }; @@ -98,7 +97,7 @@ pub enum NodeKind { #[serde(rename = "operators")] Runtime(RuntimeNode), Custom(CustomNode), - Python(PythonOperatorConfig), + Operator(SingleOperatorDefinition), } #[derive(Debug, Serialize, Deserialize)] @@ -123,12 +122,26 @@ pub enum CoreNodeKind { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(transparent)] pub struct RuntimeNode { - pub operators: Vec, + pub operators: Vec, } #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct OperatorConfig { +pub struct OperatorDefinition { pub id: OperatorId, + #[serde(flatten)] + pub config: OperatorConfig, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct SingleOperatorDefinition { + /// ID is optional if there is only a single operator. + pub id: Option, + #[serde(flatten)] + pub config: OperatorConfig, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct OperatorConfig { pub name: Option, pub description: Option, diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index f39b2c932..6f2566519 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,4 +1,4 @@ -use super::{CoreNodeKind, CustomNode, OperatorConfig, ResolvedNode, RuntimeNode}; +use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode}; use dora_node_api::config::{DataId, InputMapping, NodeId}; use std::{ collections::{BTreeMap, HashMap}, @@ -44,14 +44,18 @@ fn visualize_custom_node(node_id: &NodeId, node: &CustomNode, flowchart: &mut St } } -fn visualize_runtime_node(node_id: &NodeId, operators: &[OperatorConfig], flowchart: &mut String) { +fn visualize_runtime_node( + node_id: &NodeId, + operators: &[OperatorDefinition], + flowchart: &mut String, +) { writeln!(flowchart, "subgraph {node_id}").unwrap(); for operator in operators { let operator_id = &operator.id; - if operator.inputs.is_empty() { + if operator.config.inputs.is_empty() { // source operator writeln!(flowchart, " {node_id}/{operator_id}[\\{operator_id}/]").unwrap(); - } else if operator.outputs.is_empty() { + } else if operator.config.outputs.is_empty() { // sink operator writeln!(flowchart, " {node_id}/{operator_id}[/{operator_id}\\]").unwrap(); } else { @@ -80,7 +84,7 @@ fn visualize_node_inputs( for operator in operators { visualize_inputs( &format!("{node_id}/{}", operator.id), - &operator.inputs, + &operator.config.inputs, flowchart, nodes, ) @@ -118,7 +122,7 @@ fn visualize_inputs( } (CoreNodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) { - if operator.outputs.contains(output) { + if operator.config.outputs.contains(output) { let data = if output == input_id { format!("{output}") } else { From 2750ea59b1114bc424875b8e80738b154d02b1ff Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 27 Jul 2022 11:37:19 +0200 Subject: [PATCH 3/3] Rename config fields to 'definition' --- binaries/runtime/src/main.rs | 2 +- binaries/runtime/src/operator/mod.rs | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index b7a73fdf2..0838d644c 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -107,7 +107,7 @@ async fn main() -> eyre::Result<()> { .ok_or_else(|| eyre!("received event from unknown operator {id}"))?; match event { OperatorEvent::Output { id: data_id, value } => { - if !operator.config().config.outputs.contains(&data_id) { + if !operator.definition().config.outputs.contains(&data_id) { eyre::bail!("unknown output {data_id} for operator {id}"); } publish(&node_id, id, data_id, &value, communication.as_ref()) diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 60f7da78f..299347e2f 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -9,28 +9,31 @@ mod shared_lib; pub struct Operator { operator_task: Sender, - config: OperatorDefinition, + definition: OperatorDefinition, } impl Operator { pub async fn init( - operator_config: OperatorDefinition, + operator_definition: OperatorDefinition, events_tx: Sender, ) -> eyre::Result { let (operator_task, operator_rx) = mpsc::channel(10); - match &operator_config.config.source { + match &operator_definition.config.source { OperatorSource::SharedLibrary(path) => { shared_lib::spawn(path, events_tx, operator_rx).wrap_err_with(|| { format!( "failed ot spawn shared library operator for {}", - operator_config.id + operator_definition.id ) })?; } OperatorSource::Python(path) => { python::spawn(path, events_tx, operator_rx).wrap_err_with(|| { - format!("failed ot spawn Python operator for {}", operator_config.id) + format!( + "failed ot spawn Python operator for {}", + operator_definition.id + ) })?; } OperatorSource::Wasm(path) => { @@ -39,7 +42,7 @@ impl Operator { } Ok(Self { operator_task, - config: operator_config, + definition: operator_definition, }) } @@ -52,10 +55,10 @@ impl Operator { }) } - /// Get a reference to the operator's config. + /// Get a reference to the operator's definition. #[must_use] - pub fn config(&self) -> &OperatorDefinition { - &self.config + pub fn definition(&self) -> &OperatorDefinition { + &self.definition } }