Skip to content

Commit

Permalink
Merge pull request #50 from dora-rs/python-operator-alias
Browse files Browse the repository at this point in the history
Add support for single operator nodes in YAML dataflow specification
  • Loading branch information
phil-opp authored Jul 27, 2022
2 parents 0db7c6a + 2750ea5 commit e98fc07
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 50 deletions.
2 changes: 1 addition & 1 deletion apis/rust/node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
str::FromStr,
};

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct NodeRunConfig {
#[serde(default)]
Expand Down
9 changes: 9 additions & 0 deletions binaries/coordinator/examples/mini-dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,14 @@ nodes:
python: ../runtime/examples/python-operator/op.py
inputs:
time: timer/time
test: python-operator/counter
outputs:
- counter

- id: python-operator
operator:
python: ../runtime/examples/python-operator/op.py
inputs:
time: timer/time
outputs:
- counter
20 changes: 12 additions & 8 deletions binaries/coordinator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dora_core::descriptor::{self, Descriptor, NodeKind};
use dora_core::descriptor::{self, CoreNodeKind, Descriptor};
use dora_node_api::config::NodeId;
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
Expand Down Expand Up @@ -49,17 +49,21 @@ async fn main() -> eyre::Result<()> {
}

async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> {
let Descriptor {
mut communication,
nodes,
} = read_descriptor(&dataflow_path).await.wrap_err_with(|| {
let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
dataflow_path.display()
)
})?;

if nodes.iter().any(|n| matches!(n.kind, NodeKind::Runtime(_))) && !runtime.is_file() {
let nodes = descriptor.resolve_aliases();
let mut communication = descriptor.communication;

if nodes
.iter()
.any(|n| matches!(n.kind, CoreNodeKind::Runtime(_)))
&& !runtime.is_file()
{
bail!(
"There is no runtime at {}, or it is not a file",
runtime.display()
Expand All @@ -74,12 +78,12 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
let node_id = node.id.clone();

match node.kind {
descriptor::NodeKind::Custom(node) => {
descriptor::CoreNodeKind::Custom(node) => {
let result = spawn_custom_node(node_id.clone(), &node, &communication)
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}
descriptor::NodeKind::Runtime(node) => {
descriptor::CoreNodeKind::Runtime(node) => {
if !node.operators.is_empty() {
let result =
spawn_runtime_node(runtime, node_id.clone(), &node, &communication)
Expand Down
14 changes: 7 additions & 7 deletions binaries/runtime/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_core::descriptor::OperatorConfig;
use dora_core::descriptor::OperatorDefinition;
use dora_node_api::{
self,
communication::{self, CommunicationLayer},
Expand Down Expand Up @@ -37,7 +37,7 @@ async fn main() -> eyre::Result<()> {
.wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize communication config")?
};
let operators: Vec<OperatorConfig> = {
let operators: Vec<OperatorDefinition> = {
let raw =
std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
Expand Down Expand Up @@ -107,7 +107,7 @@ async fn main() -> eyre::Result<()> {
.ok_or_else(|| eyre!("received event from unknown operator {id}"))?;
match event {
OperatorEvent::Output { id: data_id, value } => {
if !operator.config().outputs.contains(&data_id) {
if !operator.definition().config.outputs.contains(&data_id) {
eyre::bail!("unknown output {data_id} for operator {id}");
}
publish(&node_id, id, data_id, &value, communication.as_ref())
Expand Down Expand Up @@ -154,7 +154,7 @@ async fn main() -> eyre::Result<()> {
}

async fn subscribe<'a>(
operators: &'a [OperatorConfig],
operators: &'a [OperatorDefinition],
communication: &'a dyn CommunicationLayer,
) -> eyre::Result<impl futures::Stream<Item = SubscribeEvent> + 'a> {
let mut streams = Vec::new();
Expand All @@ -168,11 +168,11 @@ async fn subscribe<'a>(
}

async fn subscribe_operator<'a>(
operator: &'a OperatorConfig,
operator: &'a OperatorDefinition,
communication: &'a dyn CommunicationLayer,
) -> Result<impl futures::Stream<Item = SubscribeEvent> + 'a, eyre::Error> {
let stop_messages = FuturesUnordered::new();
for input in operator.inputs.values() {
for input in operator.config.inputs.values() {
let InputMapping {
source, operator, ..
} = input;
Expand All @@ -189,7 +189,7 @@ async fn subscribe_operator<'a>(
let finished = Box::pin(stop_messages.all(|_| async { true }).shared());

let mut streams = Vec::new();
for (input, mapping) in &operator.inputs {
for (input, mapping) in &operator.config.inputs {
let InputMapping {
source,
operator: source_operator,
Expand Down
23 changes: 13 additions & 10 deletions binaries/runtime/src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dora_core::descriptor::{OperatorConfig, OperatorSource};
use dora_core::descriptor::{OperatorDefinition, OperatorSource};
use dora_node_api::config::DataId;
use eyre::{eyre, Context};
use std::any::Any;
Expand All @@ -9,28 +9,31 @@ mod shared_lib;

pub struct Operator {
operator_task: Sender<OperatorInput>,
config: OperatorConfig,
definition: OperatorDefinition,
}

impl Operator {
pub async fn init(
operator_config: OperatorConfig,
operator_definition: OperatorDefinition,
events_tx: Sender<OperatorEvent>,
) -> eyre::Result<Self> {
let (operator_task, operator_rx) = mpsc::channel(10);

match &operator_config.source {
match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
shared_lib::spawn(path, events_tx, operator_rx).wrap_err_with(|| {
format!(
"failed ot spawn shared library operator for {}",
operator_config.id
operator_definition.id
)
})?;
}
OperatorSource::Python(path) => {
python::spawn(path, events_tx, operator_rx).wrap_err_with(|| {
format!("failed ot spawn Python operator for {}", operator_config.id)
format!(
"failed ot spawn Python operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Wasm(path) => {
Expand All @@ -39,7 +42,7 @@ impl Operator {
}
Ok(Self {
operator_task,
config: operator_config,
definition: operator_definition,
})
}

Expand All @@ -52,10 +55,10 @@ impl Operator {
})
}

/// Get a reference to the operator's config.
/// Get a reference to the operator's definition.
#[must_use]
pub fn config(&self) -> &OperatorConfig {
&self.config
pub fn definition(&self) -> &OperatorDefinition {
&self.definition
}
}

Expand Down
112 changes: 105 additions & 7 deletions libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use dora_node_api::config::{
CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::{
collections::{BTreeMap, BTreeSet},
Expand All @@ -18,14 +19,68 @@ pub struct Descriptor {
}

impl Descriptor {
pub fn resolve_aliases(&self) -> Vec<ResolvedNode> {
let default_op_id = OperatorId::from("op".to_string());

let single_operator_nodes: HashMap<_, _> = self
.nodes
.iter()
.filter_map(|n| match &n.kind {
NodeKind::Operator(op) => Some((&n.id, op.id.as_ref().unwrap_or(&default_op_id))),
_ => None,
})
.collect();

let mut resolved = vec![];
for mut node in self.nodes.clone() {
// adjust input mappings
let input_mappings: Vec<_> = match &mut node.kind {
NodeKind::Runtime(node) => node
.operators
.iter_mut()
.flat_map(|op| op.config.inputs.values_mut())
.collect(),
NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(),
NodeKind::Operator(operator) => operator.config.inputs.values_mut().collect(),
};
for mapping in input_mappings {
if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() {
if mapping.operator.is_none() {
mapping.operator = Some(op_name.to_owned());
}
}
}

// resolve nodes
let kind = match node.kind {
NodeKind::Custom(node) => CoreNodeKind::Custom(node),
NodeKind::Runtime(node) => CoreNodeKind::Runtime(node),
NodeKind::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
operators: vec![OperatorDefinition {
id: op.id.unwrap_or_else(|| default_op_id.clone()),
config: op.config,
}],
}),
};
resolved.push(ResolvedNode {
id: node.id,
name: node.name,
description: node.description,
kind,
});
}
resolved
}

pub fn visualize_as_mermaid(&self) -> eyre::Result<String> {
let flowchart = visualize::visualize_nodes(&self.nodes);
let resolved = self.resolve_aliases();
let flowchart = visualize::visualize_nodes(&resolved);

Ok(flowchart)
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
pub id: NodeId,
pub name: Option<String>,
Expand All @@ -35,24 +90,58 @@ pub struct Node {
pub kind: NodeKind,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum NodeKind {
/// Dora runtime node
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
Operator(SingleOperatorDefinition),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ResolvedNode {
pub id: NodeId,
pub name: Option<String>,
pub description: Option<String>,

#[serde(flatten)]
pub kind: CoreNodeKind,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CoreNodeKind {
/// Dora runtime node
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RuntimeNode {
pub operators: Vec<OperatorConfig>,
pub operators: Vec<OperatorDefinition>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OperatorConfig {
pub struct OperatorDefinition {
pub id: OperatorId,
#[serde(flatten)]
pub config: OperatorConfig,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SingleOperatorDefinition {
/// ID is optional if there is only a single operator.
pub id: Option<OperatorId>,
#[serde(flatten)]
pub config: OperatorConfig,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OperatorConfig {
pub name: Option<String>,
pub description: Option<String>,

Expand All @@ -73,7 +162,16 @@ pub enum OperatorSource {
Wasm(PathBuf),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PythonOperatorConfig {
pub path: PathBuf,
#[serde(default)]
pub inputs: BTreeMap<DataId, InputMapping>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomNode {
pub run: String,
pub env: Option<BTreeMap<String, EnvValue>>,
Expand All @@ -83,7 +181,7 @@ pub struct CustomNode {
pub run_config: NodeRunConfig,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum EnvValue {
Bool(bool),
Expand Down
Loading

0 comments on commit e98fc07

Please sign in to comment.