Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator stopped on bad control command #650

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 84 additions & 84 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,65 +390,104 @@ async fn start_inner(
dataflow_uuid,
grace_duration,
} => {
stop_dataflow_by_uuid(
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid, &clock),
};
let _ = reply_sender.send(Ok(reply));

continue;
}

let dataflow = stop_dataflow(
&mut running_dataflows,
&dataflow_results,
dataflow_uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
&clock,
)
.await?;
.await;

match dataflow {
Ok(dataflow) => {
dataflow.reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::StopByName {
name,
grace_duration,
} => match resolve_name(name, &running_dataflows, &archived_dataflows) {
Ok(uuid) => {
stop_dataflow_by_uuid(
Ok(dataflow_uuid) => {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid, &clock),
};
let _ = reply_sender.send(Ok(reply));

continue;
}

let dataflow = stop_dataflow(
&mut running_dataflows,
&dataflow_results,
uuid,
dataflow_uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
&clock,
)
.await?
.await;

match dataflow {
Ok(dataflow) => {
dataflow.reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
},
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
uuid
Ok(uuid)
} else if let Some(name) = name {
resolve_name(name, &running_dataflows, &archived_dataflows)?
resolve_name(name, &running_dataflows, &archived_dataflows)
} else {
bail!("No uuid")
Err(eyre!("No uuid"))
};

let reply = retrieve_logs(
&running_dataflows,
&archived_dataflows,
dataflow_uuid,
node.into(),
&mut daemon_connections,
clock.new_timestamp(),
)
.await
.map(ControlRequestReply::Logs);
let _ = reply_sender.send(reply);
match dataflow_uuid {
Ok(uuid) => {
let reply = retrieve_logs(
&running_dataflows,
&archived_dataflows,
uuid,
node.into(),
&mut daemon_connections,
clock.new_timestamp(),
)
.await
.map(ControlRequestReply::Logs);
let _ = reply_sender.send(reply);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::Destroy => {
tracing::info!("Received destroy command");

let reply = handle_destroy(
&running_dataflows,
&mut running_dataflows,
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
Expand Down Expand Up @@ -556,7 +595,7 @@ async fn start_inner(
Event::CtrlC => {
tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
handle_destroy(
&running_dataflows,
&mut running_dataflows,
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
Expand Down Expand Up @@ -592,50 +631,6 @@ async fn start_inner(
Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn stop_dataflow_by_uuid(
running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
dataflow_results: &HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>>,
dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
clock: &uhlc::HLC,
) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid, clock),
};
let _ = reply_sender.send(Ok(reply));
return Ok(());
}
bail!("no known dataflow found with UUID `{dataflow_uuid}`")
};
let stop = async {
stop_dataflow(
dataflow,
dataflow_uuid,
daemon_connections,
timestamp,
grace_duration,
)
.await?;
Result::<_, eyre::Report>::Ok(())
};
match stop.await {
Ok(()) => {
dataflow.reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
};
Ok(())
}

fn dataflow_result(
results: &BTreeMap<String, DataflowDaemonResult>,
dataflow_uuid: Uuid,
Expand Down Expand Up @@ -663,17 +658,17 @@ struct DaemonConnection {
}

async fn handle_destroy(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
abortable_events: &futures::stream::AbortHandle,
daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
clock: &HLC,
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for (&uuid, dataflow) in running_dataflows {
stop_dataflow(
dataflow,
uuid,
for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
let _ = stop_dataflow(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to change the stop_dataflow return type if we are not using it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using it in the control loop for dora stop but not for the dora destroy command. But its more readable if we don't have 2 different functions to do that.

let dataflow = stop_dataflow(
    &mut running_dataflows,
    dataflow_uuid,
    &mut daemon_connections,
    clock.new_timestamp(),
    grace_duration,
)
.await;

match dataflow {
    Ok(dataflow) => {
        dataflow.reply_senders.push(reply_sender);
    }
    Err(err) => {
        let _ = reply_sender.send(Err(err));
    }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok make sense! Thanks!

running_dataflows,
dataflow_uuid,
daemon_connections,
clock.new_timestamp(),
None,
Expand Down Expand Up @@ -737,16 +732,20 @@ impl PartialEq for RunningDataflow {

impl Eq for RunningDataflow {}

async fn stop_dataflow(
dataflow: &RunningDataflow,
uuid: Uuid,
async fn stop_dataflow<'a>(
running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> eyre::Result<()> {
) -> eyre::Result<&'a mut RunningDataflow> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
};

let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::StopDataflow {
dataflow_id: uuid,
dataflow_id: dataflow_uuid,
grace_duration,
},
timestamp,
Expand All @@ -773,9 +772,10 @@ async fn stop_dataflow(
other => bail!("unexpected reply after sending stop: {other:?}"),
}
}
tracing::info!("successfully send stop dataflow `{uuid}` to all daemons");

Ok(())
tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");

Ok(dataflow)
}

async fn reload_dataflow(
Expand Down
Loading