Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement runtime-provided periodic timer input for source operators #51

Merged
merged 9 commits into from
Aug 5, 2022
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/rust/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ eyre = "0.6.7"
futures = "0.3.21"
futures-concurrency = "2.0.3"
futures-time = "1.0.0"
once_cell = "1.13.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
thiserror = "1.0.30"
Expand Down
137 changes: 120 additions & 17 deletions apis/rust/node/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, BTreeSet},
convert::Infallible,
fmt::Write as _,
fmt::{self, Write as _},
str::FromStr,
time::Duration,
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -84,23 +86,70 @@ impl std::ops::Deref for DataId {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputMapping {
pub source: NodeId,
pub operator: Option<OperatorId>,
pub output: DataId,
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum InputMapping {
Timer { interval: Duration },
User(UserInputMapping),
}

impl InputMapping {
pub fn source(&self) -> &NodeId {
static DORA_NODE_ID: OnceCell<NodeId> = OnceCell::new();

match self {
InputMapping::User(mapping) => &mapping.source,
InputMapping::Timer { .. } => DORA_NODE_ID.get_or_init(|| NodeId("dora".to_string())),
}
}

pub fn operator(&self) -> &Option<OperatorId> {
match self {
InputMapping::User(mapping) => &mapping.operator,
InputMapping::Timer { .. } => &None,
}
}
}

impl fmt::Display for InputMapping {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InputMapping::Timer { interval } => {
let duration = format_duration(*interval);
write!(f, "dora/timer/{duration}")
}
InputMapping::User(mapping) => {
if let Some(operator) = &mapping.operator {
write!(f, "{}/{operator}/{}", mapping.source, mapping.output)
} else {
write!(f, "{}/{}", mapping.source, mapping.output)
}
}
}
}
}

pub struct FormattedDuration(pub Duration);

impl fmt::Display for FormattedDuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.0.subsec_millis() == 0 {
write!(f, "secs/{}", self.0.as_secs())
} else {
write!(f, "millis/{}", self.0.as_millis())
}
}
}

pub fn format_duration(interval: Duration) -> FormattedDuration {
FormattedDuration(interval)
}

impl Serialize for InputMapping {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if let Some(operator) = &self.operator {
serializer.collect_str(&format_args!("{}/{operator}/{}", self.source, self.output))
} else {
serializer.collect_str(&format_args!("{}/{}", self.source, self.output))
}
serializer.collect_str(self)
}
}

Expand All @@ -119,15 +168,69 @@ impl<'de> Deserialize<'de> for InputMapping {
.map(|(op, out)| (Some(op), out))
.unwrap_or((None, rest));

Ok(Self {
source: source.to_owned().into(),
operator: operator.map(|o| o.to_owned().into()),
output: output.to_owned().into(),
})
let deserialized = match source {
"dora" => match operator {
Some("timer") => {
let (unit, value) = output.split_once('/').ok_or_else(|| {
serde::de::Error::custom(
"timer input must specify unit and value (e.g. `secs/5` or `millis/100`)",
)
})?;
let interval = match unit {
"secs" => {
let value = value.parse().map_err(|_| {
serde::de::Error::custom(format!(
"secs must be an integer (got `{value}`)"
))
})?;
Duration::from_secs(value)
}
"millis" => {
let value = value.parse().map_err(|_| {
serde::de::Error::custom(format!(
"millis must be an integer (got `{value}`)"
))
})?;
Duration::from_millis(value)
}
other => {
return Err(serde::de::Error::custom(format!(
"timer unit must be either secs or millis (got `{other}`"
)))
}
};
Self::Timer { interval }
}
Some(other) => {
return Err(serde::de::Error::custom(format!(
"unknown dora input `{other}`"
)))
}
None => {
return Err(serde::de::Error::custom(format!(
"dora input has invalid format"
)))
}
},
_ => Self::User(UserInputMapping {
source: source.to_owned().into(),
operator: operator.map(|o| o.to_owned().into()),
output: output.to_owned().into(),
}),
};

Ok(deserialized)
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct UserInputMapping {
pub source: NodeId,
pub operator: Option<OperatorId>,
pub output: DataId,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum CommunicationConfig {
Zenoh {
Expand Down
17 changes: 3 additions & 14 deletions apis/rust/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,8 @@ impl DoraNode {

pub async fn inputs(&self) -> eyre::Result<impl futures::Stream<Item = Input> + '_> {
let mut streams = Vec::new();
for (
input,
config::InputMapping {
source,
operator,
output,
},
) in &self.node_config.inputs
{
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{output}"),
None => format!("{source}/{output}"),
};
for (input, mapping) in &self.node_config.inputs {
let topic = mapping.to_string();
let sub = self
.communication
.subscribe(&topic)
Expand All @@ -81,7 +70,7 @@ impl DoraNode {
.node_config
.inputs
.values()
.map(|v| (&v.source, &v.operator))
.map(|v| (v.source(), v.operator()))
.collect();
for (source, operator) in &sources {
let topic = match operator {
Expand Down
6 changes: 3 additions & 3 deletions binaries/coordinator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ There are drawbacks too, for example:
```bash
cargo build -p dora-runtime --release
cargo build -p dora-coordinator --examples --release
cargo build --manifest-path ../runtime/examples/example-operator/Cargo.toml --release
cargo build --manifest-path ../examples/example-operator/Cargo.toml --release
```
- Compile the C example operator through:
```bash
cd ../runtime/examples/c-operator
cp ../../../apis/c/operator/api.h .
cd ../../examples/c-operator
cp ../../apis/c/operator/api.h .
clang -c operator.c
clang -shared -v operator.o -o operator.so
```
Expand Down
9 changes: 5 additions & 4 deletions binaries/coordinator/examples/mini-dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ nodes:
- id: runtime-node
operators:
- id: op-1
shared-library: ../target/release/libexample_operator.so
shared-library: ../../target/release/libexample_operator.so
inputs:
random: random/number
time: timer/time
outputs:
- timestamped-random
- id: op-2
shared-library: ../runtime/examples/c-operator/operator.so
shared-library: ../../examples/c-operator/operator.so
inputs:
time: timer/time
outputs:
- counter
- id: op-3
python: ../runtime/examples/python-operator/op.py
python: ../../examples/python-operator/op.py
inputs:
time: timer/time
test: python-operator/counter
Expand All @@ -60,8 +60,9 @@ nodes:

- id: python-operator
operator:
python: ../runtime/examples/python-operator/op.py
python: ../../examples/python-operator/op.py
inputs:
time: timer/time
dora_time: dora/timer/millis/500
outputs:
- counter
31 changes: 29 additions & 2 deletions binaries/coordinator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use dora_core::descriptor::{self, CoreNodeKind, Descriptor};
use dora_node_api::config::NodeId;
use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor};
use dora_node_api::{
communication,
config::{format_duration, NodeId},
};
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
use std::path::{Path, PathBuf};
use tokio_stream::wrappers::IntervalStream;

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora coordinator")]
Expand Down Expand Up @@ -57,6 +61,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
})?;

let nodes = descriptor.resolve_aliases();
let dora_timers = collect_dora_timers(&nodes);
let mut communication = descriptor.communication;

if nodes
Expand Down Expand Up @@ -94,6 +99,28 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
}
}

for interval in dora_timers {
let communication = communication.clone();
let task = tokio::spawn(async move {
let communication = communication::init(&communication)
.await
.wrap_err("failed to init communication layer")?;
let topic = {
let duration = format_duration(interval);
format!("dora/timer/{duration}")
};
let mut stream = IntervalStream::new(tokio::time::interval(interval));
while let Some(_) = stream.next().await {
let publish = communication.publish(&topic, &[]);
publish
.await
.wrap_err("failed to publish timer tick message")?;
}
Ok(())
});
tasks.push(task);
}

while let Some(task_result) = tasks.next().await {
task_result
.wrap_err("failed to join async task")?
Expand Down
Loading