Skip to content

Commit

Permalink
Added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasLYang committed Sep 5, 2024
1 parent 3f50fff commit 2221f05
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 12 deletions.
3 changes: 3 additions & 0 deletions crates/turborepo-ui/src/wui/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub struct WebUISender {
}

impl WebUISender {
pub fn new(tx: tokio::sync::mpsc::UnboundedSender<WebUIEvent>) -> Self {
Self { tx }
}
pub fn start_task(&self, task: String, output_logs: OutputLogs) {
self.tx
.send(WebUIEvent::StartTask { task, output_logs })
Expand Down
16 changes: 14 additions & 2 deletions crates/turborepo-ui/src/wui/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ impl<'a> CurrentRun<'a> {
}
}

struct Query {
pub struct Query {
state: Arc<Mutex<RefCell<WebUIState>>>,
}

impl Query {
pub fn new(state: Arc<Mutex<RefCell<WebUIState>>>) -> Self {
Self { state }
}
}

#[Object]
impl Query {
async fn current_run(&self) -> CurrentRun {
Expand All @@ -67,8 +73,14 @@ pub async fn start_server(
) -> Result<(), crate::Error> {
let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default())));
let subscriber = Subscriber::new(rx);
subscriber.watch(state.clone());
tokio::spawn(subscriber.watch(state.clone()));

run_server(state.clone()).await?;

Ok(())
}

pub(crate) async fn run_server(state: Arc<Mutex<RefCell<WebUIState>>>) -> Result<(), crate::Error> {
let cors = CorsLayer::new()
// allow `GET` and `POST` when accessing the resource
.allow_methods([Method::GET, Method::POST])
Expand Down
156 changes: 146 additions & 10 deletions crates/turborepo-ui/src/wui/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@ impl Subscriber {
Self { rx }
}

pub fn watch(
pub async fn watch(
self,
// We use a tokio::sync::Mutex here because we want this future to be Send.
#[allow(clippy::type_complexity)] state: Arc<Mutex<RefCell<WebUIState>>>,
) {
tokio::spawn(async move {
let mut rx = self.rx;
while let Some(event) = rx.recv().await {
Self::add_message(&state, event).await;
}
});
let mut rx = self.rx;
while let Some(event) = rx.recv().await {
Self::add_message(&state, event).await;
}
}

async fn add_message(state: &Arc<Mutex<RefCell<WebUIState>>>, event: WebUIEvent) {
Expand Down Expand Up @@ -69,8 +67,10 @@ impl Subscriber {
} => {
let mut state_ref = state.borrow_mut();

if result == CacheResult::Hit {
state_ref.tasks.get_mut(&task).unwrap().status = TaskStatus::Cached;
}
state_ref.tasks.get_mut(&task).unwrap().cache_result = Some(result);

state_ref.tasks.get_mut(&task).unwrap().cache_message = Some(message);
}
WebUIEvent::Stop => {
Expand Down Expand Up @@ -118,13 +118,13 @@ pub enum TaskStatus {
Running,
Cached,
Failed,
Success,
Succeeded,
}

impl From<TaskResult> for TaskStatus {
fn from(result: TaskResult) -> Self {
match result {
TaskResult::Success => Self::Success,
TaskResult::Success => Self::Succeeded,
TaskResult::CacheHit => Self::Cached,
TaskResult::Failure => Self::Failed,
}
Expand All @@ -150,3 +150,139 @@ impl WebUIState {
&self.tasks
}
}

#[cfg(test)]
mod test {
use async_graphql::{EmptyMutation, EmptySubscription, Schema};

use super::*;
use crate::{
tui::event::OutputLogs,
wui::{sender::WebUISender, server::Query},
};

#[tokio::test]
async fn test_web_ui_state() -> Result<(), crate::Error> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default())));
let subscriber = Subscriber::new(rx);

let sender = WebUISender::new(tx);

// Start a successful task
sender.start_task("task".to_string(), OutputLogs::Full);
sender.output("task".to_string(), b"this is my output".to_vec())?;
sender.end_task("task".to_string(), TaskResult::Success);

// Start a cached task
sender.start_task("task2".to_string(), OutputLogs::Full);
sender.status("task2".to_string(), "status".to_string(), CacheResult::Hit);

// Start a failing task
sender.start_task("task3".to_string(), OutputLogs::Full);
sender.end_task("task3".to_string(), TaskResult::Failure);

// Drop the sender so the subscriber can terminate
drop(sender);

// Run the subscriber blocking
subscriber.watch(state.clone()).await;

let state_handle = state.lock().await.borrow().clone();
assert_eq!(state_handle.tasks().len(), 3);
assert_eq!(
state_handle.tasks().get("task2").unwrap().status,
TaskStatus::Cached
);
assert_eq!(
state_handle.tasks().get("task").unwrap().status,
TaskStatus::Succeeded
);
assert_eq!(
state_handle.tasks().get("task").unwrap().output,
b"this is my output"
);
assert_eq!(
state_handle.tasks().get("task3").unwrap().status,
TaskStatus::Failed
);

// Now let's check with the GraphQL API
let schema = Schema::new(Query::new(state), EmptyMutation, EmptySubscription);
let result = schema
.execute("query { currentRun { tasks { name state { status } } } }")
.await;
assert!(result.errors.is_empty());
assert_eq!(
result.data,
async_graphql::Value::from_json(serde_json::json!({
"currentRun": {
"tasks": [
{
"name": "task",
"state": {
"status": "SUCCEEDED"
}
},
{
"name": "task2",
"state": {
"status": "CACHED"
}
},
{
"name": "task3",
"state": {
"status": "FAILED"
}
}
]
}
}))
.unwrap()
);

Ok(())
}

#[tokio::test]
async fn test_restart_tasks() -> Result<(), crate::Error> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default())));
let subscriber = Subscriber::new(rx);

let sender = WebUISender::new(tx);

// Start a successful task
sender.start_task("task".to_string(), OutputLogs::Full);
sender.output("task".to_string(), b"this is my output".to_vec())?;
sender.end_task("task".to_string(), TaskResult::Success);

// Start a cached task
sender.start_task("task2".to_string(), OutputLogs::Full);
sender.status("task2".to_string(), "status".to_string(), CacheResult::Hit);

// Restart a task
sender.restart_tasks(vec!["task".to_string()])?;

// Drop the sender so the subscriber can terminate
drop(sender);

// Run the subscriber blocking
subscriber.watch(state.clone()).await;

let state_handle = state.lock().await.borrow().clone();
assert_eq!(state_handle.tasks().len(), 3);
assert_eq!(
state_handle.tasks().get("task2").unwrap().status,
TaskStatus::Cached
);
assert_eq!(
state_handle.tasks().get("task").unwrap().status,
TaskStatus::Running
);
assert_eq!(state_handle.tasks().get("task").unwrap().output, vec![]);

Check failure on line 284 in crates/turborepo-ui/src/wui/subscriber.rs

View workflow job for this annotation

GitHub Actions / Turborepo Rust testing on ubuntu

type annotations needed

Check failure on line 284 in crates/turborepo-ui/src/wui/subscriber.rs

View workflow job for this annotation

GitHub Actions / Turborepo Rust testing on ubuntu

type annotations needed

Check failure on line 284 in crates/turborepo-ui/src/wui/subscriber.rs

View workflow job for this annotation

GitHub Actions / Turborepo Rust testing on macos

type annotations needed

Check failure on line 284 in crates/turborepo-ui/src/wui/subscriber.rs

View workflow job for this annotation

GitHub Actions / Turborepo Rust testing on macos

type annotations needed

Check failure on line 284 in crates/turborepo-ui/src/wui/subscriber.rs

View workflow job for this annotation

GitHub Actions / Turborepo Rust testing on windows

type annotations needed

Check failure on line 284 in crates/turborepo-ui/src/wui/subscriber.rs

View workflow job for this annotation

GitHub Actions / Turborepo Rust testing on windows

type annotations needed

Ok(())
}
}

0 comments on commit 2221f05

Please sign in to comment.