Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tui): avoid panic on restarting tasks during watch #9032

Merged
merged 2 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCac
use chrono::{DateTime, Local};
use rayon::iter::ParallelBridge;
use tokio::{select, task::JoinHandle};
use tracing::debug;
use tracing::{debug, instrument};
use turbopath::AbsoluteSystemPathBuf;
use turborepo_api_client::{APIAuth, APIClient};
use turborepo_ci::Vendor;
Expand Down Expand Up @@ -130,6 +130,7 @@ impl Run {

// Produces the transitive closure of the filtered packages,
// i.e. the packages relevant for this run.
#[instrument(skip(self), ret)]
pub fn get_relevant_packages(&self) -> HashSet<PackageName> {
let packages: Vec<_> = self
.filtered_pkgs
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::{
sync::{Mutex, Notify},
task::JoinHandle,
};
use tracing::{instrument, trace};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;
use turborepo_ui::{tui, tui::AppSender};
Expand Down Expand Up @@ -200,6 +201,7 @@ impl WatchClient {
}
}

#[instrument(skip(changed_packages))]
async fn handle_change_event(
changed_packages: &Mutex<RefCell<ChangedPackages>>,
event: proto::package_change_event::Event,
Expand Down Expand Up @@ -233,6 +235,7 @@ impl WatchClient {

async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result<i32, Error> {
// Should we recover here?
trace!("handling run with changed packages: {changed_packages:?}");
match changed_packages {
ChangedPackages::Some(packages) => {
let packages = packages
Expand Down
34 changes: 33 additions & 1 deletion crates/turborepo-ui/src/tui/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl<W> App<W> {

#[tracing::instrument(skip(self))]
pub fn update_tasks(&mut self, tasks: Vec<String>) {
if tasks.is_empty() {
debug!("got request to update task list to empty list, ignoring request");
return;
}
debug!("updating task list: {tasks:?}");
let highlighted_task = self.active_task().to_owned();
// Make sure all tasks have a terminal output
Expand Down Expand Up @@ -615,7 +619,7 @@ fn update(
app.copy_selection();
}
Event::RestartTasks { tasks } => {
app.update_tasks(tasks);
app.restart_tasks(tasks);
}
Event::Resize { rows, cols } => {
app.resize(rows, cols);
Expand Down Expand Up @@ -954,4 +958,32 @@ mod test {
);
}
}

#[test]
fn test_update_empty_task_list() {
let mut app: App<()> = App::new(
100,
100,
vec!["a".to_string(), "b".to_string(), "c".to_string()],
);
app.next();
app.update_tasks(Vec::new());

assert_eq!(app.active_task(), "b", "selected b");
}

#[test]
fn test_restart_missing_task() {
let mut app: App<()> = App::new(
100,
100,
vec!["a".to_string(), "b".to_string(), "c".to_string()],
);
app.next();
app.restart_tasks(vec!["d".to_string()]);

assert_eq!(app.active_task(), "b", "selected b");

app.start_task("d", OutputLogs::Full).unwrap();
}
}
13 changes: 12 additions & 1 deletion crates/turborepo-ui/src/tui/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl TasksByStatus {
}

pub fn restart_tasks<'a>(&mut self, tasks: impl Iterator<Item = &'a str>) {
let tasks_to_restart = tasks.collect::<HashSet<_>>();
let mut tasks_to_restart = tasks.collect::<HashSet<_>>();

let (restarted_running, keep_running): (Vec<_>, Vec<_>) = mem::take(&mut self.running)
.into_iter()
Expand All @@ -171,6 +171,17 @@ impl TasksByStatus {
.map(|task| task.restart())
.chain(restarted_finished.into_iter().map(|task| task.restart())),
);
// There is a chance that watch might attempt to restart a task that did not
// exist before.
for task in &self.planned {
tasks_to_restart.remove(task.name());
}
self.planned.extend(
tasks_to_restart
.into_iter()
.map(ToOwned::to_owned)
.map(Task::new),
);
self.planned.sort_unstable();
}
}
Loading