Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup warnings #208

Merged
merged 4 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions binaries/cli/src/template/rust/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn create_operator(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR

let cargo_toml = CARGO_TOML
.replace("___name___", &name)
.replace("___version___", &VERSION);
.replace("___version___", VERSION);
let cargo_toml_path = root.join("Cargo.toml");
fs::write(&cargo_toml_path, &cargo_toml)
.with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?;
Expand Down Expand Up @@ -122,7 +122,7 @@ fn create_custom_node(name: String, path: Option<PathBuf>) -> Result<(), eyre::E

let cargo_toml = CARGO_TOML
.replace("___name___", &name)
.replace("___version___", &VERSION);
.replace("___version___", VERSION);
let cargo_toml_path = root.join("Cargo.toml");
fs::write(&cargo_toml_path, &cargo_toml)
.with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?;
Expand Down
41 changes: 9 additions & 32 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures_concurrency::stream::Merge;
use run::SpawnedDataflow;
use std::{
collections::{BTreeSet, HashMap},
path::{Path, PathBuf},
path::Path,
time::Duration,
};
use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
Expand All @@ -30,28 +30,11 @@ mod listener;
mod run;
mod tcp_utils;

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora coordinator")]
pub struct Args {
#[clap(long)]
pub runtime: Option<PathBuf>,
}

pub async fn run(args: Args) -> eyre::Result<()> {
let Args { runtime } = args;

let runtime_path = runtime.unwrap_or_else(|| {
std::env::args()
.next()
.map(PathBuf::from)
.unwrap_or_default()
.with_file_name("dora-runtime")
});

pub async fn run() -> eyre::Result<()> {
let mut tasks = FuturesUnordered::new();

// start in daemon mode
start(&runtime_path, &tasks).await?;
start(&tasks).await?;

tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
Expand All @@ -64,7 +47,7 @@ pub async fn run(args: Args) -> eyre::Result<()> {
Ok(())
}

async fn start(runtime_path: &Path, tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
let ctrlc_events = set_up_ctrlc_handler()?;

let listener = listener::create_listener(DORA_COORDINATOR_PORT_DEFAULT).await?;
Expand Down Expand Up @@ -191,13 +174,9 @@ async fn start(runtime_path: &Path, tasks: &FuturesUnordered<JoinHandle<()>>) ->
bail!("there is already a running dataflow with name `{name}`");
}
}
let dataflow = start_dataflow(
&dataflow_path,
name,
runtime_path,
&mut daemon_connections,
)
.await?;
let dataflow =
start_dataflow(&dataflow_path, name, &mut daemon_connections)
.await?;
Ok(dataflow)
};
inner.await.map(|dataflow| {
Expand Down Expand Up @@ -377,6 +356,7 @@ async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> {
}
}

#[allow(dead_code)] // Keeping the communication layer for later use.
struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
Expand Down Expand Up @@ -432,16 +412,13 @@ async fn stop_dataflow(
async fn start_dataflow(
path: &Path,
name: Option<String>,
runtime_path: &Path,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<RunningDataflow> {
let runtime_path = runtime_path.to_owned();

let SpawnedDataflow {
uuid,
communication_config,
machines,
} = spawn_dataflow(&runtime_path, path, daemon_connections).await?;
} = spawn_dataflow(path, daemon_connections).await?;
Ok(RunningDataflow {
uuid,
name,
Expand Down
3 changes: 1 addition & 2 deletions binaries/coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ async fn main() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing().context("failed to set up tracing subscriber")?;

let args = clap::Parser::parse();
dora_coordinator::run(args).await
dora_coordinator::run().await
}
22 changes: 1 addition & 21 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@ use crate::tcp_utils::{tcp_receive, tcp_send};
use dora_core::{
config::CommunicationConfig,
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes},
descriptor::{CoreNodeKind, Descriptor},
descriptor::Descriptor,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{
collections::{BTreeSet, HashMap},
env::consts::EXE_EXTENSION,
path::Path,
};
use tokio::net::TcpStream;
use uuid::Uuid;

pub async fn spawn_dataflow(
runtime: &Path,
dataflow_path: &Path,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<SpawnedDataflow> {
let mut runtime = runtime.with_extension(EXE_EXTENSION);
let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
Expand All @@ -40,23 +37,6 @@ pub async fn spawn_dataflow(
config.add_topic_prefix(&uuid.to_string());
config
};
if nodes
.iter()
.any(|n| matches!(n.kind, CoreNodeKind::Runtime(_)))
{
match which::which(runtime.as_os_str()) {
Ok(path) => {
runtime = path;
}
Err(err) => {
let err = eyre!(err).wrap_err(format!(
"There is no runtime at {}, or it is not a file",
runtime.display()
));
bail!("{err:?}")
}
}
}

let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
Expand Down
11 changes: 2 additions & 9 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
fmt, io,
net::SocketAddr,
path::{Path, PathBuf},
time::{Duration, Instant},
time::Duration,
};
use tcp_utils::tcp_receive;
use tokio::{
Expand Down Expand Up @@ -208,8 +208,6 @@ impl Daemon {
let mut events = incoming_events;

while let Some(event) = events.next().await {
let start = Instant::now();

match event {
Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
let (reply, status) = self.handle_coordinator_event(event).await;
Expand Down Expand Up @@ -256,11 +254,6 @@ impl Daemon {
}
}
}

let elapsed = start.elapsed();
// if elapsed.as_micros() > 10 {
// tracing::debug!("handled event in {elapsed:?}: {event_debug}");
// }
}

Ok(self.dataflow_errors)
Expand Down Expand Up @@ -732,7 +725,7 @@ impl Daemon {
dataflow.subscribe_channels.remove(id);
}

let data_bytes = match data {
let _data_bytes = match data {
Data::SharedMemory(data) => {
let bytes = unsafe { data.as_slice() }.to_owned();

Expand Down
12 changes: 6 additions & 6 deletions binaries/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,17 @@ async fn run(
}
OperatorEvent::Finished { reason } => {
if let StopReason::ExplicitStopAll = reason {
let hlc = dora_core::message::uhlc::HLC::default();
let metadata = dora_core::message::Metadata::new(hlc.new_timestamp());
let data = metadata
.serialize()
.wrap_err("failed to serialize stop message")?;
// let hlc = dora_core::message::uhlc::HLC::default();
// let metadata = dora_core::message::Metadata::new(hlc.new_timestamp());
// let data = metadata
// .serialize()
// .wrap_err("failed to serialize stop message")?;
todo!("instruct dora-daemon/dora-coordinator to stop other nodes");
// manual_stop_publisher
// .publish(&data)
// .map_err(|err| eyre::eyre!(err))
// .wrap_err("failed to send stop message")?;
break;
// break;
}

let Some(config) = operators.get(&operator_id) else {
Expand Down
6 changes: 6 additions & 0 deletions libraries/communication-layer/request-reply/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ impl TcpLayer {
}
}

impl Default for TcpLayer {
fn default() -> Self {
Self::new()
}
}

impl RequestReplyLayer for TcpLayer {
type Address = SocketAddr;
type RequestData = Vec<u8>;
Expand Down
6 changes: 1 addition & 5 deletions libraries/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,7 @@ impl<'de> Deserialize<'de> for InputMapping {
"unknown dora input `{other}`"
)))
}
None => {
return Err(serde::de::Error::custom(format!(
"dora input has invalid format"
)))
}
None => return Err(serde::de::Error::custom("dora input has invalid format")),
},
_ => Self::User(UserInputMapping {
source: source.to_owned().into(),
Expand Down
2 changes: 1 addition & 1 deletion libraries/core/src/descriptor/visualize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn visualize_user_mapping(
if let Some(operator) = operators.iter().find(|o| o.id.as_ref() == operator_id) {
if operator.config.outputs.contains(output) {
let data = if output == input_id.as_str() {
format!("{output}")
output.to_string()
} else {
format!("{output} as {input_id}")
};
Expand Down