From 6ef1332f1144e3fecf656960ebd34aaac8d40773 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 23 Sep 2024 14:03:28 +0200 Subject: [PATCH 01/36] Rename `Frontend` to `DummyFrontend` --- crates/amalthea/tests/client.rs | 2 +- crates/amalthea/tests/frontend/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/amalthea/tests/client.rs b/crates/amalthea/tests/client.rs index 59aca3ce1..a78af30fd 100644 --- a/crates/amalthea/tests/client.rs +++ b/crates/amalthea/tests/client.rs @@ -43,7 +43,7 @@ mod shell; #[test] fn test_kernel() { - let frontend = frontend::Frontend::new(); + let frontend = frontend::DummyFrontend::new(); let connection_file = frontend.get_connection_file(); let mut kernel = Kernel::new("amalthea", connection_file).unwrap(); diff --git a/crates/amalthea/tests/frontend/mod.rs b/crates/amalthea/tests/frontend/mod.rs index 976402118..bb9bebddb 100644 --- a/crates/amalthea/tests/frontend/mod.rs +++ b/crates/amalthea/tests/frontend/mod.rs @@ -12,7 +12,7 @@ use amalthea::wire::jupyter_message::JupyterMessage; use amalthea::wire::jupyter_message::Message; use amalthea::wire::jupyter_message::ProtocolMessage; -pub struct Frontend { +pub struct DummyFrontend { pub _control_socket: Socket, pub shell_socket: Socket, pub iopub_socket: Socket, @@ -27,7 +27,7 @@ pub struct Frontend { heartbeat_port: u16, } -impl Frontend { +impl DummyFrontend { pub fn new() -> Self { use rand::Rng; From 6ecf5a9c7de907dc42b050a0736e7d8e2b83eec9 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 23 Sep 2024 14:12:15 +0200 Subject: [PATCH 02/36] Export `DummyFrontend` from Amalthea --- crates/amalthea/Cargo.toml | 4 ++-- crates/amalthea/src/lib.rs | 1 + .../mod.rs => src/test/dummy_frontend.rs} | 16 ++++++++-------- crates/amalthea/src/test/mod.rs | 1 + crates/amalthea/tests/client.rs | 4 ++-- 5 files changed, 14 insertions(+), 12 deletions(-) rename crates/amalthea/{tests/frontend/mod.rs => src/test/dummy_frontend.rs} (93%) create mode 100644 crates/amalthea/src/test/mod.rs diff --git a/crates/amalthea/Cargo.toml b/crates/amalthea/Cargo.toml index 3fa289f39..5613605e5 100644 --- a/crates/amalthea/Cargo.toml +++ b/crates/amalthea/Cargo.toml @@ -19,6 +19,8 @@ hex = "0.4.3" hmac = "0.12.1" log = "0.4.17" nix = "0.26.2" +portpicker = "0.1.1" +rand = "0.8.5" serde = { version = "1.0.154", features = ["derive"] } serde_json = { version = "1.0.94", features = ["preserve_order"]} sha2 = "0.10.6" @@ -34,6 +36,4 @@ serde_repr = "0.1.17" tracing = "0.1.40" [dev-dependencies] -rand = "0.8.5" -portpicker = "0.1.1" env_logger = "0.10.0" diff --git a/crates/amalthea/src/lib.rs b/crates/amalthea/src/lib.rs index b056087a1..bbebb4d4c 100644 --- a/crates/amalthea/src/lib.rs +++ b/crates/amalthea/src/lib.rs @@ -16,6 +16,7 @@ pub mod session; pub mod socket; pub mod stream_capture; pub mod sys; +pub mod test; pub mod wire; pub use error::Error; diff --git a/crates/amalthea/tests/frontend/mod.rs b/crates/amalthea/src/test/dummy_frontend.rs similarity index 93% rename from crates/amalthea/tests/frontend/mod.rs rename to crates/amalthea/src/test/dummy_frontend.rs index bb9bebddb..30b12c056 100644 --- a/crates/amalthea/tests/frontend/mod.rs +++ b/crates/amalthea/src/test/dummy_frontend.rs @@ -1,16 +1,16 @@ /* - * mod.rs + * dummy_frontend.rs * - * Copyright (C) 2022 Posit Software, PBC. All rights reserved. + * Copyright (C) 2022-2024 Posit Software, PBC. All rights reserved. * */ -use amalthea::connection_file::ConnectionFile; -use amalthea::session::Session; -use amalthea::socket::socket::Socket; -use amalthea::wire::jupyter_message::JupyterMessage; -use amalthea::wire::jupyter_message::Message; -use amalthea::wire::jupyter_message::ProtocolMessage; +use crate::connection_file::ConnectionFile; +use crate::session::Session; +use crate::socket::socket::Socket; +use crate::wire::jupyter_message::JupyterMessage; +use crate::wire::jupyter_message::Message; +use crate::wire::jupyter_message::ProtocolMessage; pub struct DummyFrontend { pub _control_socket: Socket, diff --git a/crates/amalthea/src/test/mod.rs b/crates/amalthea/src/test/mod.rs new file mode 100644 index 000000000..4e9fa5f0e --- /dev/null +++ b/crates/amalthea/src/test/mod.rs @@ -0,0 +1 @@ +pub mod dummy_frontend; diff --git a/crates/amalthea/tests/client.rs b/crates/amalthea/tests/client.rs index a78af30fd..320b957ad 100644 --- a/crates/amalthea/tests/client.rs +++ b/crates/amalthea/tests/client.rs @@ -15,6 +15,7 @@ use amalthea::kernel::StreamBehavior; use amalthea::socket::comm::CommInitiator; use amalthea::socket::comm::CommSocket; use amalthea::socket::stdin::StdInRequest; +use amalthea::test::dummy_frontend::DummyFrontend; use amalthea::wire::comm_close::CommClose; use amalthea::wire::comm_info_reply::CommInfoTargetName; use amalthea::wire::comm_info_request::CommInfoRequest; @@ -38,12 +39,11 @@ use log::info; use serde_json; mod control; -mod frontend; mod shell; #[test] fn test_kernel() { - let frontend = frontend::DummyFrontend::new(); + let frontend = DummyFrontend::new(); let connection_file = frontend.get_connection_file(); let mut kernel = Kernel::new("amalthea", connection_file).unwrap(); From 843f7ccfd3d1d0507cb720c5386161064ef30e56 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 23 Sep 2024 14:38:20 +0200 Subject: [PATCH 03/36] Rename `shell_tx` to `iopub_tx` --- crates/amalthea/tests/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/amalthea/tests/client.rs b/crates/amalthea/tests/client.rs index 320b957ad..3bf0c0ef0 100644 --- a/crates/amalthea/tests/client.rs +++ b/crates/amalthea/tests/client.rs @@ -47,14 +47,14 @@ fn test_kernel() { let connection_file = frontend.get_connection_file(); let mut kernel = Kernel::new("amalthea", connection_file).unwrap(); - let shell_tx = kernel.create_iopub_tx(); + let iopub_tx = kernel.create_iopub_tx(); let comm_manager_tx = kernel.create_comm_manager_tx(); let (stdin_request_tx, stdin_request_rx) = bounded::(1); let (stdin_reply_tx, stdin_reply_rx) = unbounded(); let shell = Arc::new(Mutex::new(shell::Shell::new( - shell_tx, + iopub_tx, stdin_request_tx, stdin_reply_rx, ))); From 7392c20489162c03c2b1b162491ead8c4f4e8181 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 23 Sep 2024 14:42:40 +0200 Subject: [PATCH 04/36] Export `start_kernel()` for tests --- crates/ark/src/lib.rs | 1 + crates/ark/src/main.rs | 288 ++++++++++++---------------------------- crates/ark/src/start.rs | 129 ++++++++++++++++++ 3 files changed, 216 insertions(+), 202 deletions(-) create mode 100644 crates/ark/src/start.rs diff --git a/crates/ark/src/lib.rs b/crates/ark/src/lib.rs index 4654244e4..cc7b34417 100644 --- a/crates/ark/src/lib.rs +++ b/crates/ark/src/lib.rs @@ -29,6 +29,7 @@ pub mod request; pub mod shell; pub mod signals; pub mod srcref; +pub mod start; pub mod startup; pub mod sys; pub mod test; diff --git a/crates/ark/src/main.rs b/crates/ark/src/main.rs index e958c50fc..41dab0b79 100644 --- a/crates/ark/src/main.rs +++ b/crates/ark/src/main.rs @@ -9,26 +9,15 @@ use std::cell::Cell; use std::env; -use std::sync::Arc; -use std::sync::Mutex; use amalthea::connection_file::ConnectionFile; -use amalthea::kernel::Kernel; use amalthea::kernel_spec::KernelSpec; -use amalthea::socket::stdin::StdInRequest; -use ark::control::Control; -use ark::dap; use ark::interface::SessionMode; use ark::logger; -use ark::lsp; -use ark::request::KernelRequest; -use ark::request::RRequest; -use ark::shell::Shell; use ark::signals::initialize_signal_block; +use ark::start::start_kernel; use ark::traps::register_trap_handlers; use ark::version::detect_r; -use bus::Bus; -use crossbeam::channel::bounded; use crossbeam::channel::unbounded; use notify::Watcher; use stdext::unwrap; @@ -37,196 +26,6 @@ thread_local! { pub static ON_R_THREAD: Cell = Cell::new(false); } -fn start_kernel( - connection_file: ConnectionFile, - r_args: Vec, - startup_file: Option, - session_mode: SessionMode, - capture_streams: bool, -) { - // Create a new kernel from the connection file - let mut kernel = match Kernel::new("ark", connection_file) { - Ok(k) => k, - Err(err) => { - log::error!("Failed to create kernel: {err}"); - return; - }, - }; - - // Create the channels used for communication. These are created here - // as they need to be shared across different components / threads. - let iopub_tx = kernel.create_iopub_tx(); - - // A broadcast channel (bus) used to notify clients when the kernel - // has finished initialization. - let mut kernel_init_tx = Bus::new(1); - - // A channel pair used for shell requests. - // These events are used to manage the runtime state, and also to - // handle message delivery, among other things. - let (r_request_tx, r_request_rx) = bounded::(1); - let (kernel_request_tx, kernel_request_rx) = bounded::(1); - - // Create the LSP and DAP clients. - // Not all Amalthea kernels provide these, but ark does. - // They must be able to deliver messages to the shell channel directly. - let lsp = Arc::new(Mutex::new(lsp::handler::Lsp::new(kernel_init_tx.add_rx()))); - - // DAP needs the `RRequest` channel to communicate with - // `read_console()` and send commands to the debug interpreter - let dap = dap::Dap::new_shared(r_request_tx.clone()); - - // Communication channel between the R main thread and the Amalthea - // StdIn socket thread - let (stdin_request_tx, stdin_request_rx) = bounded::(1); - - // Communication channel for `CommEvent` - let comm_manager_tx = kernel.create_comm_manager_tx(); - - // Create the shell. - let kernel_init_rx = kernel_init_tx.add_rx(); - let shell = Shell::new( - comm_manager_tx.clone(), - iopub_tx.clone(), - r_request_tx.clone(), - stdin_request_tx.clone(), - kernel_init_rx, - kernel_request_tx, - kernel_request_rx, - session_mode.clone(), - ); - - // Create the control handler; this is used to handle shutdown/interrupt and - // related requests - let control = Arc::new(Mutex::new(Control::new(r_request_tx.clone()))); - - // Create the stream behavior; this determines whether the kernel should - // capture stdout/stderr and send them to the frontend as IOPub messages - let stream_behavior = match capture_streams { - true => amalthea::kernel::StreamBehavior::Capture, - false => amalthea::kernel::StreamBehavior::None, - }; - - // Create the kernel - let kernel_clone = shell.kernel.clone(); - let shell = Arc::new(Mutex::new(shell)); - - let (stdin_reply_tx, stdin_reply_rx) = unbounded(); - - let res = kernel.connect( - shell, - control, - Some(lsp), - Some(dap.clone()), - stream_behavior, - stdin_request_rx, - stdin_reply_tx, - ); - if let Err(err) = res { - panic!("Couldn't connect to frontend: {err:?}"); - } - - // Start the R REPL (does not return for the duration of the session) - ark::interface::start_r( - r_args, - startup_file, - kernel_clone, - comm_manager_tx, - r_request_rx, - stdin_request_tx, - stdin_reply_rx, - iopub_tx, - kernel_init_tx, - dap, - session_mode, - ) -} - -// Installs the kernelspec JSON file into one of Jupyter's search paths. -fn install_kernel_spec() { - // Create the environment set for the kernel spec - let mut env = serde_json::Map::new(); - - // Detect the active version of R and set the R_HOME environment variable - // accordingly - let r_version = detect_r().unwrap(); - env.insert( - "R_HOME".to_string(), - serde_json::Value::String(r_version.r_home.clone()), - ); - - // Point `LD_LIBRARY_PATH` to a folder with some `libR.so`. It doesn't - // matter which one, but the linker needs to be able to find a file of that - // name, even though we won't use it for symbol resolution. - // https://github.com/posit-dev/positron/issues/1619#issuecomment-1971552522 - if cfg!(target_os = "linux") { - let lib = format!("{}/lib", r_version.r_home.clone()); - env.insert("LD_LIBRARY_PATH".into(), serde_json::Value::String(lib)); - } - - // Create the kernelspec - let exe_path = unwrap!(env::current_exe(), Err(error) => { - eprintln!("Failed to determine path to Ark. {}", error); - return; - }); - - let spec = KernelSpec { - argv: vec![ - String::from(exe_path.to_string_lossy()), - String::from("--connection_file"), - String::from("{connection_file}"), - String::from("--session-mode"), - String::from("notebook"), - ], - language: String::from("R"), - display_name: String::from("Ark R Kernel"), - env, - }; - - let dest = unwrap!(spec.install(String::from("ark")), Err(error) => { - eprintln!("Failed to install Ark's Jupyter kernelspec. {}", error); - return; - }); - - println!( - "Successfully installed Ark Jupyter kernelspec. - - R ({}.{}.{}): {} - Kernel: {} - ", - r_version.major, - r_version.minor, - r_version.patch, - r_version.r_home, - dest.to_string_lossy() - ); -} - -fn parse_file( - connection_file: &String, - r_args: Vec, - startup_file: Option, - session_mode: SessionMode, - capture_streams: bool, -) { - match ConnectionFile::from_file(connection_file) { - Ok(connection) => { - log::info!("Loaded connection information from frontend in {connection_file}"); - log::info!("Connection data: {:?}", connection); - start_kernel( - connection, - r_args, - startup_file, - session_mode, - capture_streams, - ); - }, - Err(error) => { - log::error!("Couldn't read connection file {connection_file}: {error:?}"); - }, - } -} - fn print_usage() { println!("Ark {}, an R Kernel.", env!("CARGO_PKG_VERSION")); println!( @@ -504,3 +303,88 @@ fn main() { ); } } + +fn parse_file( + connection_file: &String, + r_args: Vec, + startup_file: Option, + session_mode: SessionMode, + capture_streams: bool, +) { + match ConnectionFile::from_file(connection_file) { + Ok(connection) => { + log::info!("Loaded connection information from frontend in {connection_file}"); + log::info!("Connection data: {:?}", connection); + start_kernel( + connection, + r_args, + startup_file, + session_mode, + capture_streams, + ); + }, + Err(error) => { + log::error!("Couldn't read connection file {connection_file}: {error:?}"); + }, + } +} + +// Install the kernelspec JSON file into one of Jupyter's search paths. +fn install_kernel_spec() { + // Create the environment set for the kernel spec + let mut env = serde_json::Map::new(); + + // Detect the active version of R and set the R_HOME environment variable + // accordingly + let r_version = detect_r().unwrap(); + env.insert( + "R_HOME".to_string(), + serde_json::Value::String(r_version.r_home.clone()), + ); + + // Point `LD_LIBRARY_PATH` to a folder with some `libR.so`. It doesn't + // matter which one, but the linker needs to be able to find a file of that + // name, even though we won't use it for symbol resolution. + // https://github.com/posit-dev/positron/issues/1619#issuecomment-1971552522 + if cfg!(target_os = "linux") { + let lib = format!("{}/lib", r_version.r_home.clone()); + env.insert("LD_LIBRARY_PATH".into(), serde_json::Value::String(lib)); + } + + // Create the kernelspec + let exe_path = unwrap!(env::current_exe(), Err(error) => { + eprintln!("Failed to determine path to Ark. {}", error); + return; + }); + + let spec = KernelSpec { + argv: vec![ + String::from(exe_path.to_string_lossy()), + String::from("--connection_file"), + String::from("{connection_file}"), + String::from("--session-mode"), + String::from("notebook"), + ], + language: String::from("R"), + display_name: String::from("Ark R Kernel"), + env, + }; + + let dest = unwrap!(spec.install(String::from("ark")), Err(error) => { + eprintln!("Failed to install Ark's Jupyter kernelspec. {}", error); + return; + }); + + println!( + "Successfully installed Ark Jupyter kernelspec. + + R ({}.{}.{}): {} + Kernel: {} + ", + r_version.major, + r_version.minor, + r_version.patch, + r_version.r_home, + dest.to_string_lossy() + ); +} diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs new file mode 100644 index 000000000..ea96f27a9 --- /dev/null +++ b/crates/ark/src/start.rs @@ -0,0 +1,129 @@ +// +// start.rs +// +// Copyright (C) 2023-2024 Posit Software, PBC. All rights reserved. +// +// + +use std::sync::Arc; +use std::sync::Mutex; + +use amalthea::connection_file::ConnectionFile; +use amalthea::socket::stdin::StdInRequest; +use bus::Bus; +use crossbeam::channel::bounded; +use crossbeam::channel::unbounded; + +use crate::control::Control; +use crate::dap; +use crate::interface::SessionMode; +use crate::lsp; +use crate::request::KernelRequest; +use crate::request::RRequest; +use crate::shell::Shell; + +/// Exported for unit tests. Does not return. +pub fn start_kernel( + connection_file: ConnectionFile, + r_args: Vec, + startup_file: Option, + session_mode: SessionMode, + capture_streams: bool, +) { + // Create a new kernel from the connection file + let mut kernel = match amalthea::kernel::Kernel::new("ark", connection_file) { + Ok(k) => k, + Err(err) => { + log::error!("Failed to create kernel: {err}"); + return; + }, + }; + + // Create the channels used for communication. These are created here + // as they need to be shared across different components / threads. + let iopub_tx = kernel.create_iopub_tx(); + + // A broadcast channel (bus) used to notify clients when the kernel + // has finished initialization. + let mut kernel_init_tx = Bus::new(1); + + // A channel pair used for shell requests. + // These events are used to manage the runtime state, and also to + // handle message delivery, among other things. + let (r_request_tx, r_request_rx) = bounded::(1); + let (kernel_request_tx, kernel_request_rx) = bounded::(1); + + // Create the LSP and DAP clients. + // Not all Amalthea kernels provide these, but ark does. + // They must be able to deliver messages to the shell channel directly. + let lsp = Arc::new(Mutex::new(lsp::handler::Lsp::new(kernel_init_tx.add_rx()))); + + // DAP needs the `RRequest` channel to communicate with + // `read_console()` and send commands to the debug interpreter + let dap = dap::Dap::new_shared(r_request_tx.clone()); + + // Communication channel between the R main thread and the Amalthea + // StdIn socket thread + let (stdin_request_tx, stdin_request_rx) = bounded::(1); + + // Communication channel for `CommEvent` + let comm_manager_tx = kernel.create_comm_manager_tx(); + + // Create the shell. + let kernel_init_rx = kernel_init_tx.add_rx(); + let shell = Shell::new( + comm_manager_tx.clone(), + iopub_tx.clone(), + r_request_tx.clone(), + stdin_request_tx.clone(), + kernel_init_rx, + kernel_request_tx, + kernel_request_rx, + session_mode.clone(), + ); + + // Create the control handler; this is used to handle shutdown/interrupt and + // related requests + let control = Arc::new(Mutex::new(Control::new(r_request_tx.clone()))); + + // Create the stream behavior; this determines whether the kernel should + // capture stdout/stderr and send them to the frontend as IOPub messages + let stream_behavior = match capture_streams { + true => amalthea::kernel::StreamBehavior::Capture, + false => amalthea::kernel::StreamBehavior::None, + }; + + // Create the kernel + let kernel_clone = shell.kernel.clone(); + let shell = Arc::new(Mutex::new(shell)); + + let (stdin_reply_tx, stdin_reply_rx) = unbounded(); + + let res = kernel.connect( + shell, + control, + Some(lsp), + Some(dap.clone()), + stream_behavior, + stdin_request_rx, + stdin_reply_tx, + ); + if let Err(err) = res { + panic!("Couldn't connect to frontend: {err:?}"); + } + + // Start the R REPL (does not return for the duration of the session) + crate::interface::start_r( + r_args, + startup_file, + kernel_clone, + comm_manager_tx, + r_request_rx, + stdin_request_tx, + stdin_reply_rx, + iopub_tx, + kernel_init_tx, + dap, + session_mode, + ) +} From 84576522c394b48a4f6afe99de41762436a05bac Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 23 Sep 2024 14:53:43 +0200 Subject: [PATCH 05/36] Add scaffholding for protocol tests --- crates/amalthea/src/test/dummy_frontend.rs | 21 +++++++ crates/ark/src/interface.rs | 11 +++- crates/ark/tests/kernel.rs | 65 ++++++++++++++++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 crates/ark/tests/kernel.rs diff --git a/crates/amalthea/src/test/dummy_frontend.rs b/crates/amalthea/src/test/dummy_frontend.rs index 30b12c056..756e0a05a 100644 --- a/crates/amalthea/src/test/dummy_frontend.rs +++ b/crates/amalthea/src/test/dummy_frontend.rs @@ -5,12 +5,15 @@ * */ +use stdext::assert_match; + use crate::connection_file::ConnectionFile; use crate::session::Session; use crate::socket::socket::Socket; use crate::wire::jupyter_message::JupyterMessage; use crate::wire::jupyter_message::Message; use crate::wire::jupyter_message::ProtocolMessage; +use crate::wire::status::ExecutionState; pub struct DummyFrontend { pub _control_socket: Socket, @@ -146,6 +149,24 @@ impl DummyFrontend { Message::read_from_socket(&self.iopub_socket).unwrap() } + /// Receive from IOPub and assert Busy message + pub fn receive_iopub_busy(&self) -> () { + let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + + assert_match!(msg, Message::Status(status) => { + assert_eq!(status.content.execution_state, ExecutionState::Busy); + }); + } + + /// Receive from IOPub and assert Idle message + pub fn receive_iopub_idle(&self) -> () { + let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + + assert_match!(msg, Message::Status(status) => { + assert_eq!(status.content.execution_state, ExecutionState::Idle); + }); + } + /// Receives a Jupyter message from the Stdin socket pub fn receive_stdin(&self) -> Message { Message::read_from_socket(&self.stdin_socket).unwrap() diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 61629cef1..51f62a877 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -215,7 +215,16 @@ pub fn start_r( // Get `R_HOME`, set by Positron / CI / kernel specification let r_home = match std::env::var("R_HOME") { Ok(home) => PathBuf::from(home), - Err(err) => panic!("Can't find `R_HOME`: {err:?}"), + Err(_) => { + // Get `R_HOME` from `PATH`, via R + let Ok(result) = std::process::Command::new("R").arg("RHOME").output() else { + panic!("Can't find R or `R_HOME`"); + }; + let r_home = String::from_utf8(result.stdout).unwrap(); + let r_home = r_home.trim(); + std::env::set_var("R_HOME", r_home); + PathBuf::from(r_home) + }, }; let libraries = RLibraries::from_r_home_path(&r_home); diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs new file mode 100644 index 000000000..8c6c913e1 --- /dev/null +++ b/crates/ark/tests/kernel.rs @@ -0,0 +1,65 @@ +use amalthea::test::dummy_frontend::DummyFrontend; +use amalthea::wire::execute_request::ExecuteRequest; +use amalthea::wire::jupyter_message::Message; +use amalthea::wire::kernel_info_request::KernelInfoRequest; +use ark::interface::SessionMode; +use serde_json::Value; +use stdext::assert_match; +use stdext::spawn; + +fn spawn_r() -> DummyFrontend { + let frontend = DummyFrontend::new(); + let connection_file = frontend.get_connection_file(); + + spawn!("dummy_kernel", || { + ark::start::start_kernel(connection_file, vec![], None, SessionMode::Console, false); + }); + + // Can we do better? + log::info!("Waiting 500ms for kernel startup to complete"); + std::thread::sleep(std::time::Duration::from_millis(500)); + + frontend.complete_intialization(); + + frontend +} + +#[test] +fn test_kernel() { + let frontend = spawn_r(); + + // --- Kernel info + frontend.send_shell(KernelInfoRequest {}); + + assert_match!(frontend.receive_shell(), Message::KernelInfoReply(reply) => { + assert_eq!(reply.content.language_info.name, "R"); + }); + + frontend.receive_iopub_busy(); + frontend.receive_iopub_idle(); + + // --- Execute request + frontend.send_shell(ExecuteRequest { + code: "42".to_string(), + silent: false, + store_history: true, + user_expressions: serde_json::Value::Null, + allow_stdin: false, + stop_on_error: false, + }); + + frontend.receive_iopub_busy(); + + // Input rebroadcast + assert_match!(frontend.receive_iopub(), Message::ExecuteInput(msg) => { + assert_eq!(msg.content.code, "42"); + }); + + assert_match!(frontend.receive_iopub(), Message::ExecuteResult(msg) => { + assert_match!(msg.content.data, Value::Object(map) => { + assert_eq!(map["text/plain"], serde_json::to_value("[1] 42").unwrap()); + }) + }); + + frontend.receive_iopub_idle(); +} From 001850f8ebcefa4ba509026d06fbc1906b39c452 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 23 Sep 2024 16:02:50 +0200 Subject: [PATCH 06/36] Fix R arguments --- crates/ark/tests/kernel.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs index 8c6c913e1..933ba0844 100644 --- a/crates/ark/tests/kernel.rs +++ b/crates/ark/tests/kernel.rs @@ -12,7 +12,13 @@ fn spawn_r() -> DummyFrontend { let connection_file = frontend.get_connection_file(); spawn!("dummy_kernel", || { - ark::start::start_kernel(connection_file, vec![], None, SessionMode::Console, false); + ark::start::start_kernel( + connection_file, + vec![String::from("--no-save"), String::from("--no-restore")], + None, + SessionMode::Console, + false, + ); }); // Can we do better? From 7210e8ac3f30467314dedc7e63345d4cae8eae23 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 23 Sep 2024 16:25:21 +0200 Subject: [PATCH 07/36] Implement suicide hook --- crates/ark/src/interface.rs | 6 ++++++ crates/ark/src/sys/unix/interface.rs | 3 +++ crates/libr/src/r.rs | 3 +++ 3 files changed, 12 insertions(+) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 51f62a877..407b60119 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -1670,6 +1670,12 @@ pub extern "C" fn r_busy(which: i32) { main.busy(which); } +#[no_mangle] +pub extern "C" fn r_suicide(buf: *const c_char) { + let msg = unsafe { CStr::from_ptr(buf) }; + panic!("Suicide: {}", msg.to_str().unwrap()); +} + #[no_mangle] pub unsafe extern "C" fn r_polled_events() { let main = RMain::get_mut(); diff --git a/crates/ark/src/sys/unix/interface.rs b/crates/ark/src/sys/unix/interface.rs index 5ab6d8980..e6d999068 100644 --- a/crates/ark/src/sys/unix/interface.rs +++ b/crates/ark/src/sys/unix/interface.rs @@ -11,6 +11,7 @@ use std::os::raw::c_char; use libr::ptr_R_Busy; use libr::ptr_R_ReadConsole; use libr::ptr_R_ShowMessage; +use libr::ptr_R_Suicide; use libr::ptr_R_WriteConsole; use libr::ptr_R_WriteConsoleEx; use libr::run_Rmainloop; @@ -32,6 +33,7 @@ use crate::interface::r_busy; use crate::interface::r_polled_events; use crate::interface::r_read_console; use crate::interface::r_show_message; +use crate::interface::r_suicide; use crate::interface::r_write_console; use crate::signals::initialize_signal_handlers; @@ -64,6 +66,7 @@ pub fn setup_r(mut args: Vec<*mut c_char>) { libr::set(ptr_R_ReadConsole, Some(r_read_console)); libr::set(ptr_R_ShowMessage, Some(r_show_message)); libr::set(ptr_R_Busy, Some(r_busy)); + libr::set(ptr_R_Suicide, Some(r_suicide)); // Set up main loop setup_Rmainloop(); diff --git a/crates/libr/src/r.rs b/crates/libr/src/r.rs index 5e841ecaa..5c5923dbd 100644 --- a/crates/libr/src/r.rs +++ b/crates/libr/src/r.rs @@ -726,6 +726,9 @@ mutable_globals::generate! { #[cfg(target_family = "unix")] pub static mut ptr_R_Busy: Option; + #[cfg(target_family = "unix")] + pub static mut ptr_R_Suicide: Option; + // ----------------------------------------------------------------------------------- // Windows From 850011256c622380a5b234e58766b84100a56bd1 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 24 Sep 2024 10:25:15 +0200 Subject: [PATCH 08/36] Wrap dummy Ark frontend in singleton Checks for crumbs on drop --- crates/amalthea/src/socket/socket.rs | 4 + crates/amalthea/src/test/dummy_frontend.rs | 16 ++++ crates/ark/tests/kernel.rs | 95 +++++++++++++++++----- 3 files changed, 93 insertions(+), 22 deletions(-) diff --git a/crates/amalthea/src/socket/socket.rs b/crates/amalthea/src/socket/socket.rs index 90b34b93a..fb07ab51e 100644 --- a/crates/amalthea/src/socket/socket.rs +++ b/crates/amalthea/src/socket/socket.rs @@ -180,6 +180,10 @@ impl Socket { } } + pub fn has_incoming_data(&self) -> zmq::Result { + Ok(self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0) + } + /// Subscribes a SUB socket to all the published messages from a PUB socket. /// /// Note that this needs to be called *after* the socket connection is diff --git a/crates/amalthea/src/test/dummy_frontend.rs b/crates/amalthea/src/test/dummy_frontend.rs index 756e0a05a..ae16835c6 100644 --- a/crates/amalthea/src/test/dummy_frontend.rs +++ b/crates/amalthea/src/test/dummy_frontend.rs @@ -199,4 +199,20 @@ impl DummyFrontend { key: self.key.clone(), } } + + /// Asserts that no socket has incoming data + pub fn assert_no_incoming(&self) { + if self.iopub_socket.has_incoming_data().unwrap() { + panic!("IOPub has incoming data"); + } + if self.shell_socket.has_incoming_data().unwrap() { + panic!("Shell has incoming data"); + } + if self.stdin_socket.has_incoming_data().unwrap() { + panic!("StdIn has incoming data"); + } + if self.heartbeat_socket.has_incoming_data().unwrap() { + panic!("Heartbeat has incoming data"); + } + } } diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs index 933ba0844..bb80d5d0c 100644 --- a/crates/ark/tests/kernel.rs +++ b/crates/ark/tests/kernel.rs @@ -1,40 +1,83 @@ +use std::ops::Deref; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::MutexGuard; + use amalthea::test::dummy_frontend::DummyFrontend; use amalthea::wire::execute_request::ExecuteRequest; use amalthea::wire::jupyter_message::Message; +use amalthea::wire::jupyter_message::Status; use amalthea::wire::kernel_info_request::KernelInfoRequest; use ark::interface::SessionMode; +use once_cell::sync::Lazy; use serde_json::Value; use stdext::assert_match; use stdext::spawn; -fn spawn_r() -> DummyFrontend { - let frontend = DummyFrontend::new(); - let connection_file = frontend.get_connection_file(); - - spawn!("dummy_kernel", || { - ark::start::start_kernel( - connection_file, - vec![String::from("--no-save"), String::from("--no-restore")], - None, - SessionMode::Console, - false, - ); - }); +// There can be only one frontend per process. Needs to be in a mutex because +// the frontend wraps zmq sockets which are unsafe to send across threads. +static FRONTEND: Lazy>> = + Lazy::new(|| Arc::new(Mutex::new(DummyArkFrontend::init()))); + +/// Wrapper around `DummyFrontend` that checks sockets are empty on drop +struct DummyArkFrontend { + guard: MutexGuard<'static, DummyFrontend>, +} + +impl DummyArkFrontend { + fn lock() -> Self { + Self { + guard: FRONTEND.lock().unwrap(), + } + } - // Can we do better? - log::info!("Waiting 500ms for kernel startup to complete"); - std::thread::sleep(std::time::Duration::from_millis(500)); + fn init() -> DummyFrontend { + if Lazy::get(&FRONTEND).is_some() { + panic!("Can't spawn Ark more than once"); + } - frontend.complete_intialization(); + let frontend = DummyFrontend::new(); + let connection_file = frontend.get_connection_file(); + + spawn!("dummy_kernel", || { + ark::start::start_kernel( + connection_file, + vec![String::from("--no-save"), String::from("--no-restore")], + None, + SessionMode::Console, + false, + ); + }); + + // Can we do better? + log::info!("Waiting 500ms for kernel startup to complete"); + std::thread::sleep(std::time::Duration::from_millis(500)); + + frontend.complete_intialization(); + frontend + } +} - frontend +// Check that we haven't left crumbs behind +impl Drop for DummyArkFrontend { + fn drop(&mut self) { + self.assert_no_incoming() + } +} + +// Allow method calls to be forwarded to inner type +impl Deref for DummyArkFrontend { + type Target = DummyFrontend; + + fn deref(&self) -> &Self::Target { + Deref::deref(&self.guard) + } } #[test] -fn test_kernel() { - let frontend = spawn_r(); +fn test_kernel_info() { + let frontend = DummyArkFrontend::lock(); - // --- Kernel info frontend.send_shell(KernelInfoRequest {}); assert_match!(frontend.receive_shell(), Message::KernelInfoReply(reply) => { @@ -43,8 +86,12 @@ fn test_kernel() { frontend.receive_iopub_busy(); frontend.receive_iopub_idle(); +} + +#[test] +fn test_execute_request() { + let frontend = DummyArkFrontend::lock(); - // --- Execute request frontend.send_shell(ExecuteRequest { code: "42".to_string(), silent: false, @@ -68,4 +115,8 @@ fn test_kernel() { }); frontend.receive_iopub_idle(); + + assert_match!(frontend.receive_shell(), Message::ExecuteReply(msg) => { + assert_eq!(msg.content.status, Status::Ok); + }); } From 444151482debb2ce062ea0022134522baa9efd7b Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 24 Sep 2024 13:21:51 +0200 Subject: [PATCH 09/36] Export `DummyArkFrontend` from Ark --- crates/ark/src/test/dummy_frontend.rs | 69 ++++++++++++++++++++++++++ crates/ark/src/test/mod.rs | 5 ++ crates/ark/src/{ => test}/test.rs | 0 crates/ark/tests/kernel.rs | 70 +-------------------------- 4 files changed, 75 insertions(+), 69 deletions(-) create mode 100644 crates/ark/src/test/dummy_frontend.rs create mode 100644 crates/ark/src/test/mod.rs rename crates/ark/src/{ => test}/test.rs (100%) diff --git a/crates/ark/src/test/dummy_frontend.rs b/crates/ark/src/test/dummy_frontend.rs new file mode 100644 index 000000000..a2766c32d --- /dev/null +++ b/crates/ark/src/test/dummy_frontend.rs @@ -0,0 +1,69 @@ +use std::ops::Deref; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::MutexGuard; + +use amalthea::test::dummy_frontend::DummyFrontend; +use once_cell::sync::Lazy; + +use crate::interface::SessionMode; + +// There can be only one frontend per process. Needs to be in a mutex because +// the frontend wraps zmq sockets which are unsafe to send across threads. +static FRONTEND: Lazy>> = + Lazy::new(|| Arc::new(Mutex::new(DummyArkFrontend::init()))); + +/// Wrapper around `DummyFrontend` that checks sockets are empty on drop +pub struct DummyArkFrontend { + guard: MutexGuard<'static, DummyFrontend>, +} + +impl DummyArkFrontend { + pub fn lock() -> Self { + Self { + guard: FRONTEND.lock().unwrap(), + } + } + + fn init() -> DummyFrontend { + if Lazy::get(&FRONTEND).is_some() { + panic!("Can't spawn Ark more than once"); + } + + let frontend = DummyFrontend::new(); + let connection_file = frontend.get_connection_file(); + + stdext::spawn!("dummy_kernel", || { + crate::start::start_kernel( + connection_file, + vec![String::from("--no-save"), String::from("--no-restore")], + None, + SessionMode::Console, + false, + ); + }); + + // Can we do better? + log::info!("Waiting 500ms for kernel startup to complete"); + std::thread::sleep(std::time::Duration::from_millis(500)); + + frontend.complete_intialization(); + frontend + } +} + +// Check that we haven't left crumbs behind +impl Drop for DummyArkFrontend { + fn drop(&mut self) { + self.assert_no_incoming() + } +} + +// Allow method calls to be forwarded to inner type +impl Deref for DummyArkFrontend { + type Target = DummyFrontend; + + fn deref(&self) -> &Self::Target { + Deref::deref(&self.guard) + } +} diff --git a/crates/ark/src/test/mod.rs b/crates/ark/src/test/mod.rs new file mode 100644 index 000000000..31d28ca3f --- /dev/null +++ b/crates/ark/src/test/mod.rs @@ -0,0 +1,5 @@ +pub mod dummy_frontend; +pub mod test; + +pub use dummy_frontend::*; +pub use test::*; diff --git a/crates/ark/src/test.rs b/crates/ark/src/test/test.rs similarity index 100% rename from crates/ark/src/test.rs rename to crates/ark/src/test/test.rs diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs index bb80d5d0c..3703f1aa3 100644 --- a/crates/ark/tests/kernel.rs +++ b/crates/ark/tests/kernel.rs @@ -1,78 +1,10 @@ -use std::ops::Deref; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::MutexGuard; - -use amalthea::test::dummy_frontend::DummyFrontend; use amalthea::wire::execute_request::ExecuteRequest; use amalthea::wire::jupyter_message::Message; use amalthea::wire::jupyter_message::Status; use amalthea::wire::kernel_info_request::KernelInfoRequest; -use ark::interface::SessionMode; -use once_cell::sync::Lazy; +use ark::test::DummyArkFrontend; use serde_json::Value; use stdext::assert_match; -use stdext::spawn; - -// There can be only one frontend per process. Needs to be in a mutex because -// the frontend wraps zmq sockets which are unsafe to send across threads. -static FRONTEND: Lazy>> = - Lazy::new(|| Arc::new(Mutex::new(DummyArkFrontend::init()))); - -/// Wrapper around `DummyFrontend` that checks sockets are empty on drop -struct DummyArkFrontend { - guard: MutexGuard<'static, DummyFrontend>, -} - -impl DummyArkFrontend { - fn lock() -> Self { - Self { - guard: FRONTEND.lock().unwrap(), - } - } - - fn init() -> DummyFrontend { - if Lazy::get(&FRONTEND).is_some() { - panic!("Can't spawn Ark more than once"); - } - - let frontend = DummyFrontend::new(); - let connection_file = frontend.get_connection_file(); - - spawn!("dummy_kernel", || { - ark::start::start_kernel( - connection_file, - vec![String::from("--no-save"), String::from("--no-restore")], - None, - SessionMode::Console, - false, - ); - }); - - // Can we do better? - log::info!("Waiting 500ms for kernel startup to complete"); - std::thread::sleep(std::time::Duration::from_millis(500)); - - frontend.complete_intialization(); - frontend - } -} - -// Check that we haven't left crumbs behind -impl Drop for DummyArkFrontend { - fn drop(&mut self) { - self.assert_no_incoming() - } -} - -// Allow method calls to be forwarded to inner type -impl Deref for DummyArkFrontend { - type Target = DummyFrontend; - - fn deref(&self) -> &Self::Target { - Deref::deref(&self.guard) - } -} #[test] fn test_kernel_info() { From 45478e4efe6249116a4a98dcdc61fa4c4b6e8229 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 24 Sep 2024 16:55:39 +0200 Subject: [PATCH 10/36] Add `send_execute_request()` method --- crates/amalthea/src/test/dummy_frontend.rs | 12 ++++++++++++ crates/ark/tests/kernel.rs | 10 +--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/crates/amalthea/src/test/dummy_frontend.rs b/crates/amalthea/src/test/dummy_frontend.rs index ae16835c6..7ee3063b5 100644 --- a/crates/amalthea/src/test/dummy_frontend.rs +++ b/crates/amalthea/src/test/dummy_frontend.rs @@ -10,6 +10,7 @@ use stdext::assert_match; use crate::connection_file::ConnectionFile; use crate::session::Session; use crate::socket::socket::Socket; +use crate::wire::execute_request::ExecuteRequest; use crate::wire::jupyter_message::JupyterMessage; use crate::wire::jupyter_message::Message; use crate::wire::jupyter_message::ProtocolMessage; @@ -133,6 +134,17 @@ impl DummyFrontend { id } + pub fn send_execute_request(&self, code: &str) -> String { + self.send_shell(ExecuteRequest { + code: String::from(code), + silent: false, + store_history: true, + user_expressions: serde_json::Value::Null, + allow_stdin: false, + stop_on_error: false, + }) + } + /// Sends a Jupyter message on the Stdin socket pub fn send_stdin(&self, msg: T) { let message = JupyterMessage::create(msg, None, &self.session); diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs index 3703f1aa3..79ab34aae 100644 --- a/crates/ark/tests/kernel.rs +++ b/crates/ark/tests/kernel.rs @@ -24,15 +24,7 @@ fn test_kernel_info() { fn test_execute_request() { let frontend = DummyArkFrontend::lock(); - frontend.send_shell(ExecuteRequest { - code: "42".to_string(), - silent: false, - store_history: true, - user_expressions: serde_json::Value::Null, - allow_stdin: false, - stop_on_error: false, - }); - + frontend.send_execute_request("42"); frontend.receive_iopub_busy(); // Input rebroadcast From 788b0caa26e094c72c7223f99de86658cb74936e Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 24 Sep 2024 17:24:07 +0200 Subject: [PATCH 11/36] Add more wrapper methods --- crates/amalthea/src/test/dummy_frontend.rs | 43 ++++++++++++++++++++-- crates/ark/tests/kernel.rs | 19 ++-------- crates/stdext/src/lib.rs | 11 ++---- 3 files changed, 47 insertions(+), 26 deletions(-) diff --git a/crates/amalthea/src/test/dummy_frontend.rs b/crates/amalthea/src/test/dummy_frontend.rs index 7ee3063b5..34051076d 100644 --- a/crates/amalthea/src/test/dummy_frontend.rs +++ b/crates/amalthea/src/test/dummy_frontend.rs @@ -5,11 +5,14 @@ * */ +use serde_json::Value; use stdext::assert_match; use crate::connection_file::ConnectionFile; use crate::session::Session; use crate::socket::socket::Socket; +use crate::wire::execute_input::ExecuteInput; +use crate::wire::execute_reply::ExecuteReply; use crate::wire::execute_request::ExecuteRequest; use crate::wire::jupyter_message::JupyterMessage; use crate::wire::jupyter_message::Message; @@ -156,6 +159,15 @@ impl DummyFrontend { Message::read_from_socket(&self.shell_socket).unwrap() } + /// Receive from Shell and assert ExecuteReply message + pub fn receive_shell_execute_reply(&self) -> ExecuteReply { + let msg = Message::read_from_socket(&self.shell_socket).unwrap(); + + assert_match!(msg, Message::ExecuteReply(data) => { + data.content + }) + } + /// Receives a Jupyter message from the IOPub socket pub fn receive_iopub(&self) -> Message { Message::read_from_socket(&self.iopub_socket).unwrap() @@ -165,8 +177,8 @@ impl DummyFrontend { pub fn receive_iopub_busy(&self) -> () { let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); - assert_match!(msg, Message::Status(status) => { - assert_eq!(status.content.execution_state, ExecutionState::Busy); + assert_match!(msg, Message::Status(data) => { + assert_eq!(data.content.execution_state, ExecutionState::Busy); }); } @@ -174,11 +186,34 @@ impl DummyFrontend { pub fn receive_iopub_idle(&self) -> () { let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); - assert_match!(msg, Message::Status(status) => { - assert_eq!(status.content.execution_state, ExecutionState::Idle); + assert_match!(msg, Message::Status(data) => { + assert_eq!(data.content.execution_state, ExecutionState::Idle); }); } + /// Receive from IOPub and assert ExecuteInput message + pub fn receive_iopub_execute_input(&self) -> ExecuteInput { + let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + + assert_match!(msg, Message::ExecuteInput(data) => { + data.content + }) + } + + /// Receive from IOPub and assert ExecuteResult message. Returns compulsory + /// `plain/text` result. + pub fn receive_iopub_execute_result(&self) -> String { + let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + + assert_match!(msg, Message::ExecuteResult(data) => { + assert_match!(data.content.data, Value::Object(map) => { + assert_match!(map["text/plain"], Value::String(ref string) => { + string.clone() + }) + }) + }) + } + /// Receives a Jupyter message from the Stdin socket pub fn receive_stdin(&self) -> Message { Message::read_from_socket(&self.stdin_socket).unwrap() diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs index 79ab34aae..6fe8e4e46 100644 --- a/crates/ark/tests/kernel.rs +++ b/crates/ark/tests/kernel.rs @@ -1,9 +1,7 @@ -use amalthea::wire::execute_request::ExecuteRequest; use amalthea::wire::jupyter_message::Message; use amalthea::wire::jupyter_message::Status; use amalthea::wire::kernel_info_request::KernelInfoRequest; use ark::test::DummyArkFrontend; -use serde_json::Value; use stdext::assert_match; #[test] @@ -27,20 +25,11 @@ fn test_execute_request() { frontend.send_execute_request("42"); frontend.receive_iopub_busy(); - // Input rebroadcast - assert_match!(frontend.receive_iopub(), Message::ExecuteInput(msg) => { - assert_eq!(msg.content.code, "42"); - }); - - assert_match!(frontend.receive_iopub(), Message::ExecuteResult(msg) => { - assert_match!(msg.content.data, Value::Object(map) => { - assert_eq!(map["text/plain"], serde_json::to_value("[1] 42").unwrap()); - }) - }); + assert_eq!(frontend.receive_iopub_execute_input().code, "42"); + assert_eq!(frontend.receive_iopub_execute_result(), "[1] 42"); frontend.receive_iopub_idle(); - assert_match!(frontend.receive_shell(), Message::ExecuteReply(msg) => { - assert_eq!(msg.content.status, Status::Ok); - }); + let reply = frontend.receive_shell_execute_reply(); + assert_eq!(reply.status, Status::Ok); } diff --git a/crates/stdext/src/lib.rs b/crates/stdext/src/lib.rs index 59feab9ce..34cff1db1 100644 --- a/crates/stdext/src/lib.rs +++ b/crates/stdext/src/lib.rs @@ -60,13 +60,10 @@ macro_rules! cstr { #[macro_export] macro_rules! assert_match { ($expression:expr, $pattern:pat_param => $code:block) => { - assert!(match $expression { - $pattern => { - $code - true - }, - _ => false - }) + match $expression { + $pattern => $code, + _ => panic!("Expected {}", stringify!($pattern)), + } }; ($expression:expr, $pattern:pat_param) => { From 7f024e91535927380acb0483e9ae351914ba0150 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 24 Sep 2024 17:30:06 +0200 Subject: [PATCH 12/36] Rename `receive_` prefix to `recv_` --- crates/amalthea/src/test/dummy_frontend.rs | 18 +++---- crates/amalthea/tests/client.rs | 56 +++++++++++----------- crates/ark/tests/kernel.rs | 16 +++---- 3 files changed, 45 insertions(+), 45 deletions(-) diff --git a/crates/amalthea/src/test/dummy_frontend.rs b/crates/amalthea/src/test/dummy_frontend.rs index 34051076d..b7d56fea5 100644 --- a/crates/amalthea/src/test/dummy_frontend.rs +++ b/crates/amalthea/src/test/dummy_frontend.rs @@ -155,12 +155,12 @@ impl DummyFrontend { } /// Receives a Jupyter message from the Shell socket - pub fn receive_shell(&self) -> Message { + pub fn recv_shell(&self) -> Message { Message::read_from_socket(&self.shell_socket).unwrap() } /// Receive from Shell and assert ExecuteReply message - pub fn receive_shell_execute_reply(&self) -> ExecuteReply { + pub fn recv_shell_execute_reply(&self) -> ExecuteReply { let msg = Message::read_from_socket(&self.shell_socket).unwrap(); assert_match!(msg, Message::ExecuteReply(data) => { @@ -169,12 +169,12 @@ impl DummyFrontend { } /// Receives a Jupyter message from the IOPub socket - pub fn receive_iopub(&self) -> Message { + pub fn recv_iopub(&self) -> Message { Message::read_from_socket(&self.iopub_socket).unwrap() } /// Receive from IOPub and assert Busy message - pub fn receive_iopub_busy(&self) -> () { + pub fn recv_iopub_busy(&self) -> () { let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); assert_match!(msg, Message::Status(data) => { @@ -183,7 +183,7 @@ impl DummyFrontend { } /// Receive from IOPub and assert Idle message - pub fn receive_iopub_idle(&self) -> () { + pub fn recv_iopub_idle(&self) -> () { let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); assert_match!(msg, Message::Status(data) => { @@ -192,7 +192,7 @@ impl DummyFrontend { } /// Receive from IOPub and assert ExecuteInput message - pub fn receive_iopub_execute_input(&self) -> ExecuteInput { + pub fn recv_iopub_execute_input(&self) -> ExecuteInput { let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); assert_match!(msg, Message::ExecuteInput(data) => { @@ -202,7 +202,7 @@ impl DummyFrontend { /// Receive from IOPub and assert ExecuteResult message. Returns compulsory /// `plain/text` result. - pub fn receive_iopub_execute_result(&self) -> String { + pub fn recv_iopub_execute_result(&self) -> String { let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); assert_match!(msg, Message::ExecuteResult(data) => { @@ -215,12 +215,12 @@ impl DummyFrontend { } /// Receives a Jupyter message from the Stdin socket - pub fn receive_stdin(&self) -> Message { + pub fn recv_stdin(&self) -> Message { Message::read_from_socket(&self.stdin_socket).unwrap() } /// Receives a (raw) message from the heartbeat socket - pub fn receive_heartbeat(&self) -> zmq::Message { + pub fn recv_heartbeat(&self) -> zmq::Message { let mut msg = zmq::Message::new(); self.heartbeat_socket.recv(&mut msg).unwrap(); msg diff --git a/crates/amalthea/tests/client.rs b/crates/amalthea/tests/client.rs index 3bf0c0ef0..d185e5649 100644 --- a/crates/amalthea/tests/client.rs +++ b/crates/amalthea/tests/client.rs @@ -90,7 +90,7 @@ fn test_kernel() { frontend.send_shell(KernelInfoRequest {}); info!("Waiting for kernel info reply"); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::KernelInfoReply(reply) => { info!("Kernel info received: {:?}", reply); @@ -114,7 +114,7 @@ fn test_kernel() { // The kernel should send an execute reply message indicating that the execute succeeded info!("Waiting for execute reply"); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::ExecuteReply(reply) => { info!("Received execute reply: {:?}", reply); @@ -139,7 +139,7 @@ fn test_kernel() { // (for the execute_request) info!("Waiting for IOPub execution information messsage 1 of 6: Status"); - let iopub_1 = frontend.receive_iopub(); + let iopub_1 = frontend.recv_iopub(); match iopub_1 { Message::Status(status) => { info!("Got kernel status: {:?}", status); @@ -155,7 +155,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 2 of 6: Status"); - let iopub_2 = frontend.receive_iopub(); + let iopub_2 = frontend.recv_iopub(); match iopub_2 { Message::Status(status) => { info!("Got kernel status: {:?}", status); @@ -171,7 +171,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 3 of 6: Status"); - let iopub_3 = frontend.receive_iopub(); + let iopub_3 = frontend.recv_iopub(); match iopub_3 { Message::Status(status) => { info!("Got kernel status: {:?}", status); @@ -186,7 +186,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 4 of 6: Input Broadcast"); - let iopub_4 = frontend.receive_iopub(); + let iopub_4 = frontend.recv_iopub(); match iopub_4 { Message::ExecuteInput(input) => { info!("Got input rebroadcast: {:?}", input); @@ -201,7 +201,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 5 of 6: Execution Result"); - let iopub_5 = frontend.receive_iopub(); + let iopub_5 = frontend.recv_iopub(); match iopub_5 { Message::ExecuteResult(result) => { info!("Got execution result: {:?}", result); @@ -215,7 +215,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 6 of 6: Status"); - let iopub_6 = frontend.receive_iopub(); + let iopub_6 = frontend.recv_iopub(); match iopub_6 { Message::Status(status) => { info!("Got kernel status: {:?}", status); @@ -240,7 +240,7 @@ fn test_kernel() { }); info!("Waiting for kernel to send an input request"); - let request = frontend.receive_stdin(); + let request = frontend.recv_stdin(); match request { Message::InputRequest(request) => { info!("Got input request: {:?}", request); @@ -263,35 +263,35 @@ fn test_kernel() { // processing of the above `prompt` execution request assert_eq!( // Status: Busy - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), KernelStatus::message_type() ); assert_eq!( // ExecuteInput (re-broadcast of 'Prompt') - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), ExecuteInput::message_type() ); assert_eq!( // StreamOutput (echoed input) - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), StreamOutput::message_type() ); assert_eq!( // ExecuteResult - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), ExecuteResult::message_type() ); assert_eq!( // Status: Idle - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), KernelStatus::message_type() @@ -300,7 +300,7 @@ fn test_kernel() { // The kernel should send an execute reply message indicating that the execute // of the 'prompt' command succeeded info!("Waiting for execute reply"); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::ExecuteReply(reply) => { info!("Received execute reply: {:?}", reply); @@ -318,7 +318,7 @@ fn test_kernel() { frontend.send_heartbeat(msg); info!("Waiting for heartbeat reply"); - let reply = frontend.receive_heartbeat(); + let reply = frontend.recv_heartbeat(); assert_eq!(reply, zmq::Message::from("Heartbeat")); // Test the comms @@ -332,14 +332,14 @@ fn test_kernel() { // Absorb the IOPub messages that the kernel sends back during the // processing of the above `CommOpen` request - frontend.receive_iopub(); // Busy - frontend.receive_iopub(); // Idle + frontend.recv_iopub(); // Busy + frontend.recv_iopub(); // Idle info!("Requesting comm info from the kernel (to test opening from the frontend)"); frontend.send_shell(CommInfoRequest { target_name: "".to_string(), }); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::CommInfoReply(request) => { info!("Got comm info: {:?}", request); @@ -362,7 +362,7 @@ fn test_kernel() { frontend.send_shell(CommInfoRequest { target_name: "i-think-not".to_string(), }); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::CommInfoReply(request) => { info!("Got comm info: {:?}", request); @@ -383,7 +383,7 @@ fn test_kernel() { data: serde_json::Value::Null, }); loop { - let msg = frontend.receive_iopub(); + let msg = frontend.recv_iopub(); match msg { Message::CommMsg(msg) => { // This is the message we were looking for; break out of the @@ -419,8 +419,8 @@ fn test_kernel() { // Absorb the IOPub messages that the kernel sends back during the // processing of the above `CommClose` request info!("Receiving comm close IOPub messages from the kernel"); - frontend.receive_iopub(); // Busy - frontend.receive_iopub(); // Idle + frontend.recv_iopub(); // Busy + frontend.recv_iopub(); // Idle // Test to see if the comm is still in the list of comms after closing it // (it should not be) @@ -428,7 +428,7 @@ fn test_kernel() { frontend.send_shell(CommInfoRequest { target_name: "variables".to_string(), }); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::CommInfoReply(request) => { info!("Got comm info: {:?}", request); @@ -466,7 +466,7 @@ fn test_kernel() { // // We do this in a loop because we expect a number of other messages, e.g. busy/idle loop { - let msg = frontend.receive_iopub(); + let msg = frontend.recv_iopub(); match msg { Message::CommOpen(msg) => { assert_eq!(msg.content.comm_id, test_comm_id); @@ -487,7 +487,7 @@ fn test_kernel() { frontend.send_shell(CommInfoRequest { target_name: test_comm_name.clone(), }); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::CommInfoReply(request) => { info!("Got comm info: {:?}", request); @@ -517,7 +517,7 @@ fn test_kernel() { // Wait for the comm data message to be received by the frontend. loop { - let msg = frontend.receive_iopub(); + let msg = frontend.recv_iopub(); match msg { Message::CommMsg(msg) => { assert_eq!(msg.content.comm_id, test_comm_id); @@ -534,7 +534,7 @@ fn test_kernel() { // Ensure that the frontend is notified loop { - let msg = frontend.receive_iopub(); + let msg = frontend.recv_iopub(); match msg { Message::CommClose(msg) => { assert_eq!(msg.content.comm_id, test_comm_id); diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs index 6fe8e4e46..8c9a13a44 100644 --- a/crates/ark/tests/kernel.rs +++ b/crates/ark/tests/kernel.rs @@ -10,12 +10,12 @@ fn test_kernel_info() { frontend.send_shell(KernelInfoRequest {}); - assert_match!(frontend.receive_shell(), Message::KernelInfoReply(reply) => { + assert_match!(frontend.recv_shell(), Message::KernelInfoReply(reply) => { assert_eq!(reply.content.language_info.name, "R"); }); - frontend.receive_iopub_busy(); - frontend.receive_iopub_idle(); + frontend.recv_iopub_busy(); + frontend.recv_iopub_idle(); } #[test] @@ -23,13 +23,13 @@ fn test_execute_request() { let frontend = DummyArkFrontend::lock(); frontend.send_execute_request("42"); - frontend.receive_iopub_busy(); + frontend.recv_iopub_busy(); - assert_eq!(frontend.receive_iopub_execute_input().code, "42"); - assert_eq!(frontend.receive_iopub_execute_result(), "[1] 42"); + assert_eq!(frontend.recv_iopub_execute_input().code, "42"); + assert_eq!(frontend.recv_iopub_execute_result(), "[1] 42"); - frontend.receive_iopub_idle(); + frontend.recv_iopub_idle(); - let reply = frontend.receive_shell_execute_reply(); + let reply = frontend.recv_shell_execute_reply(); assert_eq!(reply.status, Status::Ok); } From e5c2ea448a396594b033d2adb0a1c67c22f6055c Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 09:23:01 +0200 Subject: [PATCH 13/36] Rename `test/test.rs` to `test/utils.rs` --- crates/ark/src/test/mod.rs | 4 ++-- crates/ark/src/test/{test.rs => utils.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename crates/ark/src/test/{test.rs => utils.rs} (100%) diff --git a/crates/ark/src/test/mod.rs b/crates/ark/src/test/mod.rs index 31d28ca3f..4cbec761b 100644 --- a/crates/ark/src/test/mod.rs +++ b/crates/ark/src/test/mod.rs @@ -1,5 +1,5 @@ pub mod dummy_frontend; -pub mod test; +pub mod utils; pub use dummy_frontend::*; -pub use test::*; +pub use utils::*; diff --git a/crates/ark/src/test/test.rs b/crates/ark/src/test/utils.rs similarity index 100% rename from crates/ark/src/test/test.rs rename to crates/ark/src/test/utils.rs From 0288a3b373f5c8c77da328a7a51370b3f08d494d Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 09:50:08 +0200 Subject: [PATCH 14/36] Don't set `R_HOME` in kernel spec Let Ark detect it from the search path via `R RHOME` --- crates/ark/src/main.rs | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/crates/ark/src/main.rs b/crates/ark/src/main.rs index 41dab0b79..f6d226bb0 100644 --- a/crates/ark/src/main.rs +++ b/crates/ark/src/main.rs @@ -334,19 +334,14 @@ fn install_kernel_spec() { // Create the environment set for the kernel spec let mut env = serde_json::Map::new(); - // Detect the active version of R and set the R_HOME environment variable - // accordingly - let r_version = detect_r().unwrap(); - env.insert( - "R_HOME".to_string(), - serde_json::Value::String(r_version.r_home.clone()), - ); - // Point `LD_LIBRARY_PATH` to a folder with some `libR.so`. It doesn't // matter which one, but the linker needs to be able to find a file of that // name, even though we won't use it for symbol resolution. // https://github.com/posit-dev/positron/issues/1619#issuecomment-1971552522 if cfg!(target_os = "linux") { + // Detect the active version of R + let r_version = detect_r().unwrap(); + let lib = format!("{}/lib", r_version.r_home.clone()); env.insert("LD_LIBRARY_PATH".into(), serde_json::Value::String(lib)); } @@ -370,21 +365,16 @@ fn install_kernel_spec() { env, }; - let dest = unwrap!(spec.install(String::from("ark")), Err(error) => { - eprintln!("Failed to install Ark's Jupyter kernelspec. {}", error); + let dest = unwrap!(spec.install(String::from("ark")), Err(err) => { + eprintln!("Failed to install Ark's Jupyter kernelspec. {err}"); return; }); println!( "Successfully installed Ark Jupyter kernelspec. - R ({}.{}.{}): {} Kernel: {} ", - r_version.major, - r_version.minor, - r_version.patch, - r_version.r_home, dest.to_string_lossy() ); } From 3f469a84441dc7ccb3ae93a685da31524795a84a Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 09:53:31 +0200 Subject: [PATCH 15/36] Set `RUST_LOG` in kernel spec --- crates/ark/src/main.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/ark/src/main.rs b/crates/ark/src/main.rs index f6d226bb0..4267e2dd9 100644 --- a/crates/ark/src/main.rs +++ b/crates/ark/src/main.rs @@ -334,6 +334,9 @@ fn install_kernel_spec() { // Create the environment set for the kernel spec let mut env = serde_json::Map::new(); + // Workaround for https://github.com/posit-dev/positron/issues/2098 + env.insert("RUST_LOG".into(), serde_json::Value::String("error".into())); + // Point `LD_LIBRARY_PATH` to a folder with some `libR.so`. It doesn't // matter which one, but the linker needs to be able to find a file of that // name, even though we won't use it for symbol resolution. From 6c6866f8889ae9be9729b44764d72e8e176c2e44 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 09:59:46 +0200 Subject: [PATCH 16/36] Move `start_r()` to `RMain::start()` --- crates/ark/src/interface.rs | 289 ++++++++++++++++++------------------ crates/ark/src/start.rs | 2 +- 2 files changed, 146 insertions(+), 145 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 407b60119..e04daa430 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -149,150 +149,6 @@ static INIT: Once = Once::new(); // `RMain::get_mut()`). static mut R_MAIN: Option = None; -/// Starts the main R thread. Doesn't return. -pub fn start_r( - r_args: Vec, - startup_file: Option, - kernel_mutex: Arc>, - comm_manager_tx: Sender, - r_request_rx: Receiver, - stdin_request_tx: Sender, - stdin_reply_rx: Receiver>, - iopub_tx: Sender, - kernel_init_tx: Bus, - dap: Arc>, - session_mode: SessionMode, -) { - // Initialize global state (ensure we only do this once!) - INIT.call_once(|| unsafe { - R_MAIN_THREAD_ID = Some(std::thread::current().id()); - - // Channels to send/receive tasks from auxiliary threads via `RTask`s - let (tasks_interrupt_tx, tasks_interrupt_rx) = unbounded::(); - let (tasks_idle_tx, tasks_idle_rx) = unbounded::(); - - r_task::initialize(tasks_interrupt_tx.clone(), tasks_idle_tx.clone()); - - R_MAIN = Some(RMain::new( - kernel_mutex, - tasks_interrupt_rx, - tasks_idle_rx, - comm_manager_tx, - r_request_rx, - stdin_request_tx, - stdin_reply_rx, - iopub_tx, - kernel_init_tx, - dap, - session_mode, - )); - }); - - let mut r_args = r_args.clone(); - - // Record if the user has requested that we don't load the site/user level R profiles - let ignore_site_r_profile = startup::should_ignore_site_r_profile(&r_args); - let ignore_user_r_profile = startup::should_ignore_user_r_profile(&r_args); - - // We always manually load site/user level R profiles rather than letting R do it - // to ensure that ark is fully set up before running code that could potentially call - // back into ark internals. - if !ignore_site_r_profile { - startup::push_ignore_site_r_profile(&mut r_args); - } - if !ignore_user_r_profile { - startup::push_ignore_user_r_profile(&mut r_args); - } - - // Build the argument list from the command line arguments. The default - // list is `--interactive` unless altered with the `--` passthrough - // argument. - let mut args = cargs!["ark"]; - for arg in r_args { - args.push(CString::new(arg).unwrap().into_raw()); - } - - // Get `R_HOME`, set by Positron / CI / kernel specification - let r_home = match std::env::var("R_HOME") { - Ok(home) => PathBuf::from(home), - Err(_) => { - // Get `R_HOME` from `PATH`, via R - let Ok(result) = std::process::Command::new("R").arg("RHOME").output() else { - panic!("Can't find R or `R_HOME`"); - }; - let r_home = String::from_utf8(result.stdout).unwrap(); - let r_home = r_home.trim(); - std::env::set_var("R_HOME", r_home); - PathBuf::from(r_home) - }, - }; - - let libraries = RLibraries::from_r_home_path(&r_home); - libraries.initialize_pre_setup_r(); - - crate::sys::interface::setup_r(args); - - libraries.initialize_post_setup_r(); - - unsafe { - // Register embedded routines - r_register_routines(); - - // Initialize harp (after routine registration) - harp::initialize(); - - // Optionally run a frontend specified R startup script (after harp init) - if let Some(file) = &startup_file { - harp::source(file) - .or_log_error(&format!("Failed to source startup file '{file}' due to")); - } - - // Initialize support functions (after routine registration) - if let Err(err) = modules::initialize(false) { - log::error!("Can't load R modules: {err:?}"); - } - - // Register all hooks once all modules have been imported - let hook_result = RFunction::from(".ps.register_all_hooks").call(); - if let Err(err) = hook_result { - log::error!("Error registering some hooks: {err:?}"); - } - - // Populate srcrefs for namespaces already loaded in the session. - // Namespaces of future loaded packages will be populated on load. - if do_resource_namespaces() { - if let Err(err) = resource_loaded_namespaces() { - log::error!("Can't populate srcrefs for loaded packages: {err:?}"); - } - } - - // Set up the global error handler (after support function initialization) - errors::initialize(); - } - - // Now that R has started (emitting any startup messages), and now that we have set - // up all hooks and handlers, officially finish the R initialization process to - // unblock the kernel-info request and also allow the LSP to start. - RMain::with_mut(|main| { - log::info!( - "R has started and ark handlers have been registered, completing initialization." - ); - main.complete_initialization(); - }); - - // Now that R has started and libr and ark have fully initialized, run site and user - // level R profiles, in that order - if !ignore_site_r_profile { - startup::source_site_r_profile(&r_home); - } - if !ignore_user_r_profile { - startup::source_user_r_profile(); - } - - // Does not return! - crate::sys::interface::run_r(); -} - pub struct RMain { initializing: bool, kernel_init_tx: Bus, @@ -421,6 +277,151 @@ pub enum ConsoleResult { } impl RMain { + /// Starts the main R thread and initializes the `R_MAIN` singleton. + /// Doesn't return. + pub fn start( + r_args: Vec, + startup_file: Option, + kernel_mutex: Arc>, + comm_manager_tx: Sender, + r_request_rx: Receiver, + stdin_request_tx: Sender, + stdin_reply_rx: Receiver>, + iopub_tx: Sender, + kernel_init_tx: Bus, + dap: Arc>, + session_mode: SessionMode, + ) { + // Initialize global state (ensure we only do this once!) + INIT.call_once(|| unsafe { + R_MAIN_THREAD_ID = Some(std::thread::current().id()); + + // Channels to send/receive tasks from auxiliary threads via `RTask`s + let (tasks_interrupt_tx, tasks_interrupt_rx) = unbounded::(); + let (tasks_idle_tx, tasks_idle_rx) = unbounded::(); + + r_task::initialize(tasks_interrupt_tx.clone(), tasks_idle_tx.clone()); + + R_MAIN = Some(RMain::new( + kernel_mutex, + tasks_interrupt_rx, + tasks_idle_rx, + comm_manager_tx, + r_request_rx, + stdin_request_tx, + stdin_reply_rx, + iopub_tx, + kernel_init_tx, + dap, + session_mode, + )); + }); + + let mut r_args = r_args.clone(); + + // Record if the user has requested that we don't load the site/user level R profiles + let ignore_site_r_profile = startup::should_ignore_site_r_profile(&r_args); + let ignore_user_r_profile = startup::should_ignore_user_r_profile(&r_args); + + // We always manually load site/user level R profiles rather than letting R do it + // to ensure that ark is fully set up before running code that could potentially call + // back into ark internals. + if !ignore_site_r_profile { + startup::push_ignore_site_r_profile(&mut r_args); + } + if !ignore_user_r_profile { + startup::push_ignore_user_r_profile(&mut r_args); + } + + // Build the argument list from the command line arguments. The default + // list is `--interactive` unless altered with the `--` passthrough + // argument. + let mut args = cargs!["ark"]; + for arg in r_args { + args.push(CString::new(arg).unwrap().into_raw()); + } + + // Get `R_HOME`, set by Positron / CI / kernel specification + let r_home = match std::env::var("R_HOME") { + Ok(home) => PathBuf::from(home), + Err(_) => { + // Get `R_HOME` from `PATH`, via R + let Ok(result) = std::process::Command::new("R").arg("RHOME").output() else { + panic!("Can't find R or `R_HOME`"); + }; + let r_home = String::from_utf8(result.stdout).unwrap(); + let r_home = r_home.trim(); + std::env::set_var("R_HOME", r_home); + PathBuf::from(r_home) + }, + }; + + let libraries = RLibraries::from_r_home_path(&r_home); + libraries.initialize_pre_setup_r(); + + crate::sys::interface::setup_r(args); + + libraries.initialize_post_setup_r(); + + unsafe { + // Register embedded routines + r_register_routines(); + + // Initialize harp (after routine registration) + harp::initialize(); + + // Optionally run a frontend specified R startup script (after harp init) + if let Some(file) = &startup_file { + harp::source(file) + .or_log_error(&format!("Failed to source startup file '{file}' due to")); + } + + // Initialize support functions (after routine registration) + if let Err(err) = modules::initialize(false) { + log::error!("Can't load R modules: {err:?}"); + } + + // Register all hooks once all modules have been imported + let hook_result = RFunction::from(".ps.register_all_hooks").call(); + if let Err(err) = hook_result { + log::error!("Error registering some hooks: {err:?}"); + } + + // Populate srcrefs for namespaces already loaded in the session. + // Namespaces of future loaded packages will be populated on load. + if do_resource_namespaces() { + if let Err(err) = resource_loaded_namespaces() { + log::error!("Can't populate srcrefs for loaded packages: {err:?}"); + } + } + + // Set up the global error handler (after support function initialization) + errors::initialize(); + } + + // Now that R has started (emitting any startup messages), and now that we have set + // up all hooks and handlers, officially finish the R initialization process to + // unblock the kernel-info request and also allow the LSP to start. + RMain::with_mut(|main| { + log::info!( + "R has started and ark handlers have been registered, completing initialization." + ); + main.complete_initialization(); + }); + + // Now that R has started and libr and ark have fully initialized, run site and user + // level R profiles, in that order + if !ignore_site_r_profile { + startup::source_site_r_profile(&r_home); + } + if !ignore_user_r_profile { + startup::source_user_r_profile(); + } + + // Does not return! + crate::sys::interface::run_r(); + } + pub fn new( kernel: Arc>, tasks_interrupt_rx: Receiver, diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs index ea96f27a9..bce4a707b 100644 --- a/crates/ark/src/start.rs +++ b/crates/ark/src/start.rs @@ -113,7 +113,7 @@ pub fn start_kernel( } // Start the R REPL (does not return for the duration of the session) - crate::interface::start_r( + crate::interface::RMain::start( r_args, startup_file, kernel_clone, From 13fc7e6463880b7387eedce266777bd76ca8a160 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 10:24:50 +0200 Subject: [PATCH 17/36] Add `RMain::wait_r_initialized()` And rename `RMain::initialized()` to `RMain::is_initialized()` and make it thread-safe. It seems safer for static methods of `RMain` should be made thread-safe. --- crates/ark/src/connections/r_connection.rs | 2 +- crates/ark/src/interface.rs | 50 ++++++++++++++++------ crates/ark/src/kernel.rs | 2 +- crates/ark/src/test/dummy_frontend.rs | 6 +-- 4 files changed, 41 insertions(+), 19 deletions(-) diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 3d89a40d9..73e0a7c3d 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -278,7 +278,7 @@ pub unsafe extern "C" fn ps_connection_opened( // If RMain is not initialized, we are probably in testing mode, so we just don't start the connection // and let the testing code manually do it - if RMain::initialized() { + if RMain::is_initialized() { let main = RMain::get(); let metadata = Metadata { diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index e04daa430..fea77d6e8 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -17,7 +17,6 @@ use std::path::PathBuf; use std::result::Result::Ok; use std::sync::Arc; use std::sync::Mutex; -use std::sync::Once; use std::task::Poll; use std::time::Duration; @@ -138,8 +137,15 @@ pub enum SessionMode { // These values must be global in order for them to be accessible from R // callbacks, which do not have a facility for passing or returning context. -/// Ensures that the kernel is only ever initialized once -static INIT: Once = Once::new(); +// We use the `once_cell` crate for init synchronisation because the stdlib +// equivalent `std::sync::Once` does not have a `wait()` method. + +/// Ensures that the kernel is only ever initialized once. Used to wait for the +/// `RMain` singleton initialization in `RMain::wait_initialized()`. +static R_MAIN_INIT: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new(); + +/// Used to wait for complete R startup in `RMain::wait_r_initialized()`. +static R_INIT: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new(); // The global state used by R callbacks. // @@ -293,7 +299,7 @@ impl RMain { session_mode: SessionMode, ) { // Initialize global state (ensure we only do this once!) - INIT.call_once(|| unsafe { + R_MAIN_INIT.get_or_init(|| unsafe { R_MAIN_THREAD_ID = Some(std::thread::current().id()); // Channels to send/receive tasks from auxiliary threads via `RTask`s @@ -463,6 +469,27 @@ impl RMain { } } + /// Wait for complete R initialization + /// + /// Wait for R being ready to take input (i.e. `ReadConsole` was called). + /// Resolves as the same time as the `Bus` init channel does. + /// + /// Thread-safe. + pub fn wait_r_initialized() { + R_INIT.wait(); + } + + /// Has the `RMain` singleton completed initialization. + /// + /// This can return true when R might still not have finished starting up. + /// See `wait_r_initialized()`. + /// + /// Thread-safe. But note you can only get access to the singleton on the R + /// thread. + pub fn is_initialized() -> bool { + R_MAIN_INIT.get().is_some() + } + /// Access a reference to the singleton instance of this struct /// /// SAFETY: Accesses must occur after `start_r()` initializes it, and must @@ -471,16 +498,6 @@ impl RMain { RMain::get_mut() } - /// Indicate whether RMain has been created and is initialized. - pub fn initialized() -> bool { - unsafe { - match R_MAIN { - Some(ref main) => !main.initializing, - None => false, - } - } - } - /// Access a mutable reference to the singleton instance of this struct /// /// SAFETY: Accesses must occur after `start_r()` initializes it, and must @@ -546,7 +563,12 @@ impl RMain { log::info!("Sending kernel info: {version}"); self.kernel_init_tx.broadcast(kernel_info); + + // Internal initialisation flag, should only be used on the R thread self.initializing = false; + + // Used as thread-safe initialisation flag + R_INIT.set(()).unwrap(); } else { log::warn!("Initialization already complete!"); } diff --git a/crates/ark/src/kernel.rs b/crates/ark/src/kernel.rs index 6aa2f360b..d610dc114 100644 --- a/crates/ark/src/kernel.rs +++ b/crates/ark/src/kernel.rs @@ -75,7 +75,7 @@ impl Kernel { r_task::spawn_interrupt(|| async move { // Get the current busy status - let busy = if RMain::initialized() { + let busy = if RMain::is_initialized() { RMain::get().is_busy } else { false diff --git a/crates/ark/src/test/dummy_frontend.rs b/crates/ark/src/test/dummy_frontend.rs index a2766c32d..18981f23c 100644 --- a/crates/ark/src/test/dummy_frontend.rs +++ b/crates/ark/src/test/dummy_frontend.rs @@ -6,6 +6,7 @@ use std::sync::MutexGuard; use amalthea::test::dummy_frontend::DummyFrontend; use once_cell::sync::Lazy; +use crate::interface::RMain; use crate::interface::SessionMode; // There can be only one frontend per process. Needs to be in a mutex because @@ -43,9 +44,8 @@ impl DummyArkFrontend { ); }); - // Can we do better? - log::info!("Waiting 500ms for kernel startup to complete"); - std::thread::sleep(std::time::Duration::from_millis(500)); + // Wait for startup to complete + RMain::wait_r_initialized(); frontend.complete_intialization(); frontend From c7a4adcbc350a6b82b80f61c3abb7000ecbb98d4 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 11:09:13 +0200 Subject: [PATCH 18/36] Set unlimited stack size in unit tests --- crates/ark/src/interface.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index fea77d6e8..68cd4db63 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -365,6 +365,15 @@ impl RMain { let libraries = RLibraries::from_r_home_path(&r_home); libraries.initialize_pre_setup_r(); + // In tests R may be run from various threads. This confuses R's stack + // overflow checks so we disable those. This should not make it in + // production builds as it causes stack overflows to crash R instead of + // throwing an R error. + #[cfg(test)] + unsafe { + libr::set(libr::R_CStackLimit, usize::MAX); + } + crate::sys::interface::setup_r(args); libraries.initialize_post_setup_r(); From 242359d5b093368f9a4d233ebc31a530f8fca943 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 11:55:13 +0200 Subject: [PATCH 19/36] Enable test-only features for integration tests --- crates/ark/Cargo.toml | 1 + crates/ark/src/interface.rs | 7 ++++--- crates/harp/Cargo.toml | 3 +++ crates/harp/src/test.rs | 24 ++++++++++++++++++++++++ 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/crates/ark/Cargo.toml b/crates/ark/Cargo.toml index 6d512c7f0..1a80c1b35 100644 --- a/crates/ark/Cargo.toml +++ b/crates/ark/Cargo.toml @@ -64,6 +64,7 @@ tracing-error = "0.2.0" [dev-dependencies] insta = { version = "1.39.0" } +harp = { path = "../harp", features = ["testing"]} [build-dependencies] chrono = "0.4.23" diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 68cd4db63..f08f16e64 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -369,9 +369,10 @@ impl RMain { // overflow checks so we disable those. This should not make it in // production builds as it causes stack overflows to crash R instead of // throwing an R error. - #[cfg(test)] - unsafe { - libr::set(libr::R_CStackLimit, usize::MAX); + if harp::test::IS_TESTING { + unsafe { + libr::set(libr::R_CStackLimit, usize::MAX); + } } crate::sys::interface::setup_r(args); diff --git a/crates/harp/Cargo.toml b/crates/harp/Cargo.toml index 56b7680db..0a10d3d2f 100644 --- a/crates/harp/Cargo.toml +++ b/crates/harp/Cargo.toml @@ -28,3 +28,6 @@ serde = { version = "1.0.183", features = ["derive"] } serde_json = { version = "1.0.94", features = ["preserve_order"]} rust-embed = "8.2.0" tracing-error = "0.2.0" + +[features] +testing = [] diff --git a/crates/harp/src/test.rs b/crates/harp/src/test.rs index 194f3507a..a327286e3 100644 --- a/crates/harp/src/test.rs +++ b/crates/harp/src/test.rs @@ -35,6 +35,30 @@ use crate::R_MAIN_THREAD_ID; pub static mut R_TASK_BYPASS: bool = false; static mut R_RUNTIME_LOCK: Mutex<()> = Mutex::new(()); +// This global variable is a workaround to enable test-only features or +// behaviour in integration tests (i.e. tests that live in `crate/tests/` as +// opposed to tests living in `crate/src/`). +// +// - Unfortunately we can't use `cfg(test)` in integration tests because they +// are treated as an external crate. +// +// - Unfortunately we cannot move some of our integration tests to `src/` +// because they must be run in their own process (e.g. because they are +// running R). +// +// - Unfortunately we can't use the workaround described in +// https://github.com/rust-lang/cargo/issues/2911#issuecomment-749580481 +// to enable a test-only feature in a self dependency in the dev-deps section +// of the manifest file because Rust-Analyzer doesn't support such +// circular dependencies: https://github.com/rust-lang/rust-analyzer/issues/14167. +// So instead we use the same trick with harp rather than ark, so that there +// is no circular dependency, which fixes the issue with Rust-Analyzer. +// +// - Unfortunately we can't query the features enabled in a dependency with `cfg`. +// So instead we define a global variable here that can then be checked at +// runtime in Ark. +pub static IS_TESTING: bool = cfg!(feature = "testing"); + static INIT: Once = Once::new(); pub fn r_test(f: F) { From 51fdd2db19b2dd5579bd13fa0815c8cc47ee434d Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 13:17:30 +0200 Subject: [PATCH 20/36] Set stack size after initializing R --- crates/ark/src/interface.rs | 10 ---------- crates/ark/src/sys/unix/interface.rs | 11 +++++++++++ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index f08f16e64..fea77d6e8 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -365,16 +365,6 @@ impl RMain { let libraries = RLibraries::from_r_home_path(&r_home); libraries.initialize_pre_setup_r(); - // In tests R may be run from various threads. This confuses R's stack - // overflow checks so we disable those. This should not make it in - // production builds as it causes stack overflows to crash R instead of - // throwing an R error. - if harp::test::IS_TESTING { - unsafe { - libr::set(libr::R_CStackLimit, usize::MAX); - } - } - crate::sys::interface::setup_r(args); libraries.initialize_post_setup_r(); diff --git a/crates/ark/src/sys/unix/interface.rs b/crates/ark/src/sys/unix/interface.rs index e6d999068..56856f6da 100644 --- a/crates/ark/src/sys/unix/interface.rs +++ b/crates/ark/src/sys/unix/interface.rs @@ -68,6 +68,17 @@ pub fn setup_r(mut args: Vec<*mut c_char>) { libr::set(ptr_R_Busy, Some(r_busy)); libr::set(ptr_R_Suicide, Some(r_suicide)); + // In tests R may be run from various threads. This confuses R's stack + // overflow checks so we disable those. This should not make it in + // production builds as it causes stack overflows to crash R instead of + // throwing an R error. + // + // This must be called _after_ `Rf_initialize_R()`, since that's where R + // detects the stack size and sets the default limit. + if harp::test::IS_TESTING { + libr::set(libr::R_CStackLimit, usize::MAX); + } + // Set up main loop setup_Rmainloop(); } From beb5ff1f37ab1808602578bcc2fa803b14b62a24 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 13:40:56 +0200 Subject: [PATCH 21/36] Set `Suicide` on Windows --- crates/ark/src/sys/windows/interface.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/ark/src/sys/windows/interface.rs b/crates/ark/src/sys/windows/interface.rs index 94698b305..6a5a9e2e9 100644 --- a/crates/ark/src/sys/windows/interface.rs +++ b/crates/ark/src/sys/windows/interface.rs @@ -74,6 +74,7 @@ pub fn setup_r(mut _args: Vec<*mut c_char>) { (*params).ShowMessage = Some(r_show_message); (*params).YesNoCancel = Some(r_yes_no_cancel); (*params).Busy = Some(r_busy); + (*params).Suicide = Some(r_suicide); // This is assigned to `ptr_ProcessEvents` (which we don't set on Unix), // in `R_SetParams()` by `R_SetWin32()` and gets called by `R_ProcessEvents()`. From ba228907e77b988bf699747a0b87d88fe0bcac56 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 25 Sep 2024 15:01:27 +0200 Subject: [PATCH 22/36] Use `IS_TESTING` in `modules::initialize()` --- crates/ark/src/interface.rs | 2 +- crates/ark/src/modules.rs | 5 +++-- crates/ark/src/test/utils.rs | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index fea77d6e8..811634b05 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -383,7 +383,7 @@ impl RMain { } // Initialize support functions (after routine registration) - if let Err(err) = modules::initialize(false) { + if let Err(err) = modules::initialize() { log::error!("Can't load R modules: {err:?}"); } diff --git a/crates/ark/src/modules.rs b/crates/ark/src/modules.rs index 4d9f70095..1c772a487 100644 --- a/crates/ark/src/modules.rs +++ b/crates/ark/src/modules.rs @@ -11,6 +11,7 @@ use harp::environment::R_ENVS; use harp::exec::RFunction; use harp::exec::RFunctionExt; use harp::r_symbol; +use harp::test::IS_TESTING; use harp::utils::r_poke_option; use libr::Rf_ScalarLogical; use libr::SEXP; @@ -74,9 +75,9 @@ pub struct ArkEnvs { pub rstudio_ns: SEXP, } -pub fn initialize(testing: bool) -> anyhow::Result<()> { +pub fn initialize() -> anyhow::Result<()> { // If we are `testing`, set the corresponding R level global option - if testing { + if IS_TESTING { r_poke_option_ark_testing() } diff --git a/crates/ark/src/test/utils.rs b/crates/ark/src/test/utils.rs index 14676269a..dec6a7e58 100644 --- a/crates/ark/src/test/utils.rs +++ b/crates/ark/src/test/utils.rs @@ -32,7 +32,7 @@ fn initialize_ark() { INIT.call_once(|| { // Initialize the positron module so tests can use them. // Routines are already registered by `harp::test::r_test()`. - modules::initialize(true).unwrap(); + modules::initialize().unwrap(); }); } From b1847e939fed83d5b670ec8e048a401662d1cf1c Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 26 Sep 2024 09:50:48 +0200 Subject: [PATCH 23/36] Add `RMain::wait_initialized()` --- crates/ark/src/interface.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 811634b05..0b988e9e6 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -490,6 +490,17 @@ impl RMain { R_MAIN_INIT.get().is_some() } + /// Wait for `RMain` singleton initialization + /// + /// Note that R might still not have finished starting up. + /// See `wait_r_initialized()`. + /// + /// Thread-safe. But note you can only get access to the singleton on the R + /// thread. + pub fn wait_initialized() { + R_MAIN_INIT.wait(); + } + /// Access a reference to the singleton instance of this struct /// /// SAFETY: Accesses must occur after `start_r()` initializes it, and must From ff4ef91470141b672c3fd2773aab91e018d021cb Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 26 Sep 2024 14:55:26 +0200 Subject: [PATCH 24/36] Provide debug information about incoming data --- crates/amalthea/src/test/dummy_frontend.rs | 40 +++++++++++++++++++--- crates/ark/src/test/dummy_frontend.rs | 7 ++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/crates/amalthea/src/test/dummy_frontend.rs b/crates/amalthea/src/test/dummy_frontend.rs index b7d56fea5..bfa279f55 100644 --- a/crates/amalthea/src/test/dummy_frontend.rs +++ b/crates/amalthea/src/test/dummy_frontend.rs @@ -18,6 +18,7 @@ use crate::wire::jupyter_message::JupyterMessage; use crate::wire::jupyter_message::Message; use crate::wire::jupyter_message::ProtocolMessage; use crate::wire::status::ExecutionState; +use crate::wire::wire_message::WireMessage; pub struct DummyFrontend { pub _control_socket: Socket, @@ -214,6 +215,16 @@ impl DummyFrontend { }) } + /// Receive from IOPub and assert ExecuteResult message. Returns compulsory + /// `evalue` field. + pub fn recv_iopub_execute_error(&self) -> String { + let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + + assert_match!(msg, Message::ExecuteError(data) => { + data.content.exception.evalue + }) + } + /// Receives a Jupyter message from the Stdin socket pub fn recv_stdin(&self) -> Message { Message::read_from_socket(&self.stdin_socket).unwrap() @@ -248,18 +259,37 @@ impl DummyFrontend { } /// Asserts that no socket has incoming data - pub fn assert_no_incoming(&self) { + pub fn assert_no_incoming(&mut self) { + let mut has_incoming = false; + if self.iopub_socket.has_incoming_data().unwrap() { - panic!("IOPub has incoming data"); + has_incoming = true; + Self::flush_incoming("IOPub", &self.iopub_socket); } if self.shell_socket.has_incoming_data().unwrap() { - panic!("Shell has incoming data"); + has_incoming = true; + Self::flush_incoming("Shell", &self.shell_socket); } if self.stdin_socket.has_incoming_data().unwrap() { - panic!("StdIn has incoming data"); + has_incoming = true; + Self::flush_incoming("StdIn", &self.stdin_socket); } if self.heartbeat_socket.has_incoming_data().unwrap() { - panic!("Heartbeat has incoming data"); + has_incoming = true; + Self::flush_incoming("Heartbeat", &self.heartbeat_socket); + } + + if has_incoming { + panic!("Sockets must be empty on exit (see details above)"); + } + } + + fn flush_incoming(name: &str, socket: &Socket) { + println!("{name} has incoming data:"); + + while socket.has_incoming_data().unwrap() { + dbg!(WireMessage::read_from_socket(socket).unwrap()); + println!("---"); } } } diff --git a/crates/ark/src/test/dummy_frontend.rs b/crates/ark/src/test/dummy_frontend.rs index 18981f23c..9a8daa0dc 100644 --- a/crates/ark/src/test/dummy_frontend.rs +++ b/crates/ark/src/test/dummy_frontend.rs @@ -1,4 +1,5 @@ use std::ops::Deref; +use std::ops::DerefMut; use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; @@ -67,3 +68,9 @@ impl Deref for DummyArkFrontend { Deref::deref(&self.guard) } } + +impl DerefMut for DummyArkFrontend { + fn deref_mut(&mut self) -> &mut Self::Target { + DerefMut::deref_mut(&mut self.guard) + } +} From 1353cf8d701fd5a22ea5b319f6731285d0cbc2c5 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 26 Sep 2024 17:10:12 +0200 Subject: [PATCH 25/36] Comment on use of `once_cell::Lazy` --- crates/ark/src/test/dummy_frontend.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/ark/src/test/dummy_frontend.rs b/crates/ark/src/test/dummy_frontend.rs index 9a8daa0dc..2ead760b8 100644 --- a/crates/ark/src/test/dummy_frontend.rs +++ b/crates/ark/src/test/dummy_frontend.rs @@ -12,6 +12,10 @@ use crate::interface::SessionMode; // There can be only one frontend per process. Needs to be in a mutex because // the frontend wraps zmq sockets which are unsafe to send across threads. +// +// This is using `Lazy` from the `once_cell` crate instead of other standard +// types because the former provides a way of checking whether it has been +// initialized already. static FRONTEND: Lazy>> = Lazy::new(|| Arc::new(Mutex::new(DummyArkFrontend::init()))); From 4945eba33df452a23b2d055d226d556470481e07 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 26 Sep 2024 17:20:30 +0200 Subject: [PATCH 26/36] Use `OnceLock` instead of `once_cell::Lazy` --- crates/ark/src/test/dummy_frontend.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/ark/src/test/dummy_frontend.rs b/crates/ark/src/test/dummy_frontend.rs index 2ead760b8..159ae45cf 100644 --- a/crates/ark/src/test/dummy_frontend.rs +++ b/crates/ark/src/test/dummy_frontend.rs @@ -3,9 +3,9 @@ use std::ops::DerefMut; use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; +use std::sync::OnceLock; use amalthea::test::dummy_frontend::DummyFrontend; -use once_cell::sync::Lazy; use crate::interface::RMain; use crate::interface::SessionMode; @@ -13,11 +13,10 @@ use crate::interface::SessionMode; // There can be only one frontend per process. Needs to be in a mutex because // the frontend wraps zmq sockets which are unsafe to send across threads. // -// This is using `Lazy` from the `once_cell` crate instead of other standard -// types because the former provides a way of checking whether it has been -// initialized already. -static FRONTEND: Lazy>> = - Lazy::new(|| Arc::new(Mutex::new(DummyArkFrontend::init()))); +// This is using `OnceLock` because it provides a way of checking whether the +// value has been initialized already. Also we'll need to parameterize +// initialization in the future. +static FRONTEND: OnceLock>> = OnceLock::new(); /// Wrapper around `DummyFrontend` that checks sockets are empty on drop pub struct DummyArkFrontend { @@ -27,12 +26,16 @@ pub struct DummyArkFrontend { impl DummyArkFrontend { pub fn lock() -> Self { Self { - guard: FRONTEND.lock().unwrap(), + guard: Self::get_frontend().lock().unwrap(), } } + fn get_frontend() -> &'static Arc> { + FRONTEND.get_or_init(|| Arc::new(Mutex::new(DummyArkFrontend::init()))) + } + fn init() -> DummyFrontend { - if Lazy::get(&FRONTEND).is_some() { + if FRONTEND.get().is_some() { panic!("Can't spawn Ark more than once"); } From 3229efb8fdd50b9b45fb12089862e634c42af1ce Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 09:49:22 +0200 Subject: [PATCH 27/36] Move test fixtures to `src/fixtures` --- crates/amalthea/src/{test => fixtures}/dummy_frontend.rs | 0 crates/amalthea/src/{test => fixtures}/mod.rs | 0 crates/amalthea/src/lib.rs | 2 +- crates/amalthea/tests/client.rs | 2 +- crates/ark/src/analysis/input_boundaries.rs | 2 +- crates/ark/src/dap/dap_variables.rs | 2 +- crates/ark/src/data_explorer/export_selection.rs | 2 +- crates/ark/src/data_explorer/format.rs | 2 +- crates/ark/src/data_explorer/histogram.rs | 2 +- crates/ark/src/data_explorer/summary_stats.rs | 2 +- crates/ark/src/{test => fixtures}/dummy_frontend.rs | 2 +- crates/ark/src/{test => fixtures}/mod.rs | 0 crates/ark/src/{test => fixtures}/utils.rs | 2 +- crates/ark/src/lib.rs | 2 +- crates/ark/src/lsp/completions/sources/composite.rs | 2 +- crates/ark/src/lsp/completions/sources/composite/call.rs | 2 +- crates/ark/src/lsp/completions/sources/composite/pipe.rs | 2 +- crates/ark/src/lsp/completions/sources/composite/subset.rs | 2 +- crates/ark/src/lsp/completions/sources/unique/comment.rs | 4 ++-- crates/ark/src/lsp/completions/sources/unique/custom.rs | 4 ++-- crates/ark/src/lsp/completions/sources/unique/extractor.rs | 4 ++-- crates/ark/src/lsp/completions/sources/unique/namespace.rs | 2 +- crates/ark/src/lsp/completions/sources/unique/string.rs | 4 ++-- crates/ark/src/lsp/completions/sources/unique/subset.rs | 4 ++-- crates/ark/src/lsp/completions/sources/utils.rs | 2 +- crates/ark/src/lsp/diagnostics.rs | 2 +- crates/ark/src/lsp/help.rs | 2 +- crates/ark/src/lsp/help_topic.rs | 2 +- crates/ark/src/lsp/selection_range.rs | 2 +- crates/ark/src/lsp/signature_help.rs | 2 +- crates/ark/src/lsp/traits/node.rs | 2 +- crates/ark/src/modules.rs | 2 +- crates/ark/tests/connections.rs | 4 ++-- crates/ark/tests/data_explorer.rs | 4 ++-- crates/ark/tests/help.rs | 2 +- crates/ark/tests/kernel.rs | 2 +- crates/ark/tests/ui.rs | 2 +- 37 files changed, 41 insertions(+), 41 deletions(-) rename crates/amalthea/src/{test => fixtures}/dummy_frontend.rs (100%) rename crates/amalthea/src/{test => fixtures}/mod.rs (100%) rename crates/ark/src/{test => fixtures}/dummy_frontend.rs (97%) rename crates/ark/src/{test => fixtures}/mod.rs (100%) rename crates/ark/src/{test => fixtures}/utils.rs (98%) diff --git a/crates/amalthea/src/test/dummy_frontend.rs b/crates/amalthea/src/fixtures/dummy_frontend.rs similarity index 100% rename from crates/amalthea/src/test/dummy_frontend.rs rename to crates/amalthea/src/fixtures/dummy_frontend.rs diff --git a/crates/amalthea/src/test/mod.rs b/crates/amalthea/src/fixtures/mod.rs similarity index 100% rename from crates/amalthea/src/test/mod.rs rename to crates/amalthea/src/fixtures/mod.rs diff --git a/crates/amalthea/src/lib.rs b/crates/amalthea/src/lib.rs index bbebb4d4c..7fd85f73c 100644 --- a/crates/amalthea/src/lib.rs +++ b/crates/amalthea/src/lib.rs @@ -8,6 +8,7 @@ pub mod comm; pub mod connection_file; pub mod error; +pub mod fixtures; pub mod kernel; pub mod kernel_dirs; pub mod kernel_spec; @@ -16,7 +17,6 @@ pub mod session; pub mod socket; pub mod stream_capture; pub mod sys; -pub mod test; pub mod wire; pub use error::Error; diff --git a/crates/amalthea/tests/client.rs b/crates/amalthea/tests/client.rs index d185e5649..33ac1f806 100644 --- a/crates/amalthea/tests/client.rs +++ b/crates/amalthea/tests/client.rs @@ -10,12 +10,12 @@ use std::sync::Mutex; use amalthea::comm::comm_channel::CommMsg; use amalthea::comm::event::CommManagerEvent; +use amalthea::fixtures::dummy_frontend::DummyFrontend; use amalthea::kernel::Kernel; use amalthea::kernel::StreamBehavior; use amalthea::socket::comm::CommInitiator; use amalthea::socket::comm::CommSocket; use amalthea::socket::stdin::StdInRequest; -use amalthea::test::dummy_frontend::DummyFrontend; use amalthea::wire::comm_close::CommClose; use amalthea::wire::comm_info_reply::CommInfoTargetName; use amalthea::wire::comm_info_request::CommInfoRequest; diff --git a/crates/ark/src/analysis/input_boundaries.rs b/crates/ark/src/analysis/input_boundaries.rs index d07f60a81..43c584513 100644 --- a/crates/ark/src/analysis/input_boundaries.rs +++ b/crates/ark/src/analysis/input_boundaries.rs @@ -258,7 +258,7 @@ fn fill_gaps( #[cfg(test)] mod tests { use crate::analysis::input_boundaries::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn p(text: &str) -> Vec { let mut boundaries = input_boundaries(text).unwrap(); diff --git a/crates/ark/src/dap/dap_variables.rs b/crates/ark/src/dap/dap_variables.rs index 4d240996a..a23f4de25 100644 --- a/crates/ark/src/dap/dap_variables.rs +++ b/crates/ark/src/dap/dap_variables.rs @@ -411,7 +411,7 @@ mod tests { use libr::*; use crate::dap::dap_variables::env_binding_variable; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_env_binding_variable_base() { diff --git a/crates/ark/src/data_explorer/export_selection.rs b/crates/ark/src/data_explorer/export_selection.rs index 8f29de701..4b44e9235 100644 --- a/crates/ark/src/data_explorer/export_selection.rs +++ b/crates/ark/src/data_explorer/export_selection.rs @@ -117,7 +117,7 @@ mod tests { use harp::object::RObject; use super::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn export_selection_helper(data: RObject, selection: TableSelection) -> String { export_selection_helper_with_format(data, selection, ExportFormat::Csv) diff --git a/crates/ark/src/data_explorer/format.rs b/crates/ark/src/data_explorer/format.rs index af8067471..04855c9f1 100644 --- a/crates/ark/src/data_explorer/format.rs +++ b/crates/ark/src/data_explorer/format.rs @@ -440,7 +440,7 @@ impl Into for FormattedValue { #[cfg(test)] mod tests { use super::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn default_options() -> FormatOptions { FormatOptions { diff --git a/crates/ark/src/data_explorer/histogram.rs b/crates/ark/src/data_explorer/histogram.rs index b77d542af..a933ee908 100644 --- a/crates/ark/src/data_explorer/histogram.rs +++ b/crates/ark/src/data_explorer/histogram.rs @@ -161,7 +161,7 @@ mod tests { use stdext::assert_match; use super::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn default_options() -> FormatOptions { FormatOptions { diff --git a/crates/ark/src/data_explorer/summary_stats.rs b/crates/ark/src/data_explorer/summary_stats.rs index 256716de9..9197f048f 100644 --- a/crates/ark/src/data_explorer/summary_stats.rs +++ b/crates/ark/src/data_explorer/summary_stats.rs @@ -167,7 +167,7 @@ fn get_stat(stats: &HashMap, name: &str) -> anyhow::Result< #[cfg(test)] mod tests { use super::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn default_options() -> FormatOptions { FormatOptions { diff --git a/crates/ark/src/test/dummy_frontend.rs b/crates/ark/src/fixtures/dummy_frontend.rs similarity index 97% rename from crates/ark/src/test/dummy_frontend.rs rename to crates/ark/src/fixtures/dummy_frontend.rs index 159ae45cf..d38e3e956 100644 --- a/crates/ark/src/test/dummy_frontend.rs +++ b/crates/ark/src/fixtures/dummy_frontend.rs @@ -5,7 +5,7 @@ use std::sync::Mutex; use std::sync::MutexGuard; use std::sync::OnceLock; -use amalthea::test::dummy_frontend::DummyFrontend; +use amalthea::fixtures::dummy_frontend::DummyFrontend; use crate::interface::RMain; use crate::interface::SessionMode; diff --git a/crates/ark/src/test/mod.rs b/crates/ark/src/fixtures/mod.rs similarity index 100% rename from crates/ark/src/test/mod.rs rename to crates/ark/src/fixtures/mod.rs diff --git a/crates/ark/src/test/utils.rs b/crates/ark/src/fixtures/utils.rs similarity index 98% rename from crates/ark/src/test/utils.rs rename to crates/ark/src/fixtures/utils.rs index dec6a7e58..ea5f7601e 100644 --- a/crates/ark/src/test/utils.rs +++ b/crates/ark/src/fixtures/utils.rs @@ -97,7 +97,7 @@ where mod tests { use tree_sitter::Point; - use crate::test::point_from_cursor; + use crate::fixtures::point_from_cursor; #[test] #[rustfmt::skip] diff --git a/crates/ark/src/lib.rs b/crates/ark/src/lib.rs index cc7b34417..8dcbaf379 100644 --- a/crates/ark/src/lib.rs +++ b/crates/ark/src/lib.rs @@ -32,7 +32,7 @@ pub mod srcref; pub mod start; pub mod startup; pub mod sys; -pub mod test; +pub mod fixtures; pub mod thread; pub mod traps; pub mod treesitter; diff --git a/crates/ark/src/lsp/completions/sources/composite.rs b/crates/ark/src/lsp/completions/sources/composite.rs index 953df4f07..62de6f5f5 100644 --- a/crates/ark/src/lsp/completions/sources/composite.rs +++ b/crates/ark/src/lsp/completions/sources/composite.rs @@ -153,7 +153,7 @@ mod tests { use crate::lsp::completions::sources::composite::is_identifier_like; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; use crate::treesitter::NodeType; use crate::treesitter::NodeTypeExt; diff --git a/crates/ark/src/lsp/completions/sources/composite/call.rs b/crates/ark/src/lsp/completions/sources/composite/call.rs index 3a78b8ec7..7528d0c13 100644 --- a/crates/ark/src/lsp/completions/sources/composite/call.rs +++ b/crates/ark/src/lsp/completions/sources/composite/call.rs @@ -289,7 +289,7 @@ mod tests { use crate::lsp::completions::sources::composite::call::completions_from_call; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_completions_after_user_types_part_of_an_argument_name() { diff --git a/crates/ark/src/lsp/completions/sources/composite/pipe.rs b/crates/ark/src/lsp/completions/sources/composite/pipe.rs index b9fc5ec3d..4951742bd 100644 --- a/crates/ark/src/lsp/completions/sources/composite/pipe.rs +++ b/crates/ark/src/lsp/completions/sources/composite/pipe.rs @@ -174,7 +174,7 @@ mod tests { use crate::lsp::completions::sources::composite::pipe::find_pipe_root; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_find_pipe_root_works_with_native_and_magrittr() { diff --git a/crates/ark/src/lsp/completions/sources/composite/subset.rs b/crates/ark/src/lsp/completions/sources/composite/subset.rs index 7bba1e302..0097a674a 100644 --- a/crates/ark/src/lsp/completions/sources/composite/subset.rs +++ b/crates/ark/src/lsp/completions/sources/composite/subset.rs @@ -79,7 +79,7 @@ mod tests { use crate::lsp::completions::sources::composite::subset::completions_from_subset; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_subset_completions() { diff --git a/crates/ark/src/lsp/completions/sources/unique/comment.rs b/crates/ark/src/lsp/completions/sources/unique/comment.rs index acff7aa7f..6c7750fdd 100644 --- a/crates/ark/src/lsp/completions/sources/unique/comment.rs +++ b/crates/ark/src/lsp/completions/sources/unique/comment.rs @@ -129,7 +129,7 @@ fn test_comment() { use tree_sitter::Point; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; r_test(|| { // If not in a comment, return `None` @@ -154,7 +154,7 @@ fn test_roxygen_comment() { use tree_sitter::Point; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; r_test(|| unsafe { let installed = RFunction::new("", ".ps.is_installed") diff --git a/crates/ark/src/lsp/completions/sources/unique/custom.rs b/crates/ark/src/lsp/completions/sources/unique/custom.rs index 9b7ac7911..4f0396bb6 100644 --- a/crates/ark/src/lsp/completions/sources/unique/custom.rs +++ b/crates/ark/src/lsp/completions/sources/unique/custom.rs @@ -228,8 +228,8 @@ mod tests { use crate::lsp::completions::sources::unique::custom::completions_from_custom_source; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::point_from_cursor; - use crate::test::r_test; + use crate::fixtures::point_from_cursor; + use crate::fixtures::r_test; #[test] fn test_completion_custom_library() { diff --git a/crates/ark/src/lsp/completions/sources/unique/extractor.rs b/crates/ark/src/lsp/completions/sources/unique/extractor.rs index e5f4443de..2bb7df52c 100644 --- a/crates/ark/src/lsp/completions/sources/unique/extractor.rs +++ b/crates/ark/src/lsp/completions/sources/unique/extractor.rs @@ -168,8 +168,8 @@ mod tests { use crate::lsp::completions::sources::unique::extractor::completions_from_dollar; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::point_from_cursor; - use crate::test::r_test; + use crate::fixtures::point_from_cursor; + use crate::fixtures::r_test; #[test] fn test_dollar_completions() { diff --git a/crates/ark/src/lsp/completions/sources/unique/namespace.rs b/crates/ark/src/lsp/completions/sources/unique/namespace.rs index ce31a0ab8..5a00836c2 100644 --- a/crates/ark/src/lsp/completions/sources/unique/namespace.rs +++ b/crates/ark/src/lsp/completions/sources/unique/namespace.rs @@ -225,7 +225,7 @@ mod tests { use crate::lsp::completions::sources::unique::namespace::completions_from_namespace; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_completions_after_colons() { diff --git a/crates/ark/src/lsp/completions/sources/unique/string.rs b/crates/ark/src/lsp/completions/sources/unique/string.rs index 7577bdaf1..bf01954b7 100644 --- a/crates/ark/src/lsp/completions/sources/unique/string.rs +++ b/crates/ark/src/lsp/completions/sources/unique/string.rs @@ -61,8 +61,8 @@ mod tests { use crate::lsp::completions::sources::unique::string::completions_from_string; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::point_from_cursor; - use crate::test::r_test; + use crate::fixtures::point_from_cursor; + use crate::fixtures::r_test; use crate::treesitter::node_find_string; use crate::treesitter::NodeTypeExt; diff --git a/crates/ark/src/lsp/completions/sources/unique/subset.rs b/crates/ark/src/lsp/completions/sources/unique/subset.rs index 4cb61c353..7c61b774e 100644 --- a/crates/ark/src/lsp/completions/sources/unique/subset.rs +++ b/crates/ark/src/lsp/completions/sources/unique/subset.rs @@ -162,8 +162,8 @@ mod tests { use crate::lsp::completions::sources::unique::subset::completions_from_string_subset; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::point_from_cursor; - use crate::test::r_test; + use crate::fixtures::point_from_cursor; + use crate::fixtures::r_test; use crate::treesitter::node_find_string; #[test] diff --git a/crates/ark/src/lsp/completions/sources/utils.rs b/crates/ark/src/lsp/completions/sources/utils.rs index 562af6acd..231f4d81e 100644 --- a/crates/ark/src/lsp/completions/sources/utils.rs +++ b/crates/ark/src/lsp/completions/sources/utils.rs @@ -264,7 +264,7 @@ mod tests { use crate::lsp::completions::sources::utils::CallNodePositionType; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; use crate::treesitter::NodeType; use crate::treesitter::NodeTypeExt; diff --git a/crates/ark/src/lsp/diagnostics.rs b/crates/ark/src/lsp/diagnostics.rs index c98e7daa8..ad093aad2 100644 --- a/crates/ark/src/lsp/diagnostics.rs +++ b/crates/ark/src/lsp/diagnostics.rs @@ -1034,7 +1034,7 @@ mod tests { use crate::lsp::diagnostics::generate_diagnostics; use crate::lsp::documents::Document; use crate::lsp::state::WorldState; - use crate::test::r_test; + use crate::fixtures::r_test; // Default state that includes installed packages and default scopes. static DEFAULT_STATE: Lazy = Lazy::new(|| current_state()); diff --git a/crates/ark/src/lsp/help.rs b/crates/ark/src/lsp/help.rs index d2522bd61..80e3606b5 100644 --- a/crates/ark/src/lsp/help.rs +++ b/crates/ark/src/lsp/help.rs @@ -373,7 +373,7 @@ fn for_each_section(doc: &Html, mut callback: impl FnMut(ElementRef, Vec Environment { let fun = exports.find(fun).unwrap(); diff --git a/crates/ark/tests/connections.rs b/crates/ark/tests/connections.rs index cc8875832..a1af21b72 100644 --- a/crates/ark/tests/connections.rs +++ b/crates/ark/tests/connections.rs @@ -14,8 +14,8 @@ use ark::connections::r_connection::Metadata; use ark::connections::r_connection::RConnection; use ark::modules::ARK_ENVS; use ark::r_task::r_task; -use ark::test::r_test; -use ark::test::socket_rpc_request; +use ark::fixtures::r_test; +use ark::fixtures::socket_rpc_request; use crossbeam::channel::bounded; use harp::exec::RFunction; use harp::object::RObject; diff --git a/crates/ark/tests/data_explorer.rs b/crates/ark/tests/data_explorer.rs index c5285aefa..a924195aa 100644 --- a/crates/ark/tests/data_explorer.rs +++ b/crates/ark/tests/data_explorer.rs @@ -57,8 +57,8 @@ use ark::data_explorer::r_data_explorer::DataObjectEnvInfo; use ark::data_explorer::r_data_explorer::RDataExplorer; use ark::lsp::events::EVENTS; use ark::r_task::r_task; -use ark::test::r_test; -use ark::test::socket_rpc_request; +use ark::fixtures::r_test; +use ark::fixtures::socket_rpc_request; use ark::thread::RThreadSafe; use crossbeam::channel::bounded; use harp::environment::R_ENVS; diff --git a/crates/ark/tests/help.rs b/crates/ark/tests/help.rs index 6e2880b06..0ccc85016 100644 --- a/crates/ark/tests/help.rs +++ b/crates/ark/tests/help.rs @@ -16,7 +16,7 @@ use amalthea::socket::comm::CommSocket; use ark::help::r_help::RHelp; use ark::help_proxy; use ark::r_task::r_task; -use ark::test::r_test; +use ark::fixtures::r_test; use harp::exec::RFunction; /** diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs index 8c9a13a44..7e641e515 100644 --- a/crates/ark/tests/kernel.rs +++ b/crates/ark/tests/kernel.rs @@ -1,7 +1,7 @@ use amalthea::wire::jupyter_message::Message; use amalthea::wire::jupyter_message::Status; use amalthea::wire::kernel_info_request::KernelInfoRequest; -use ark::test::DummyArkFrontend; +use ark::fixtures::DummyArkFrontend; use stdext::assert_match; #[test] diff --git a/crates/ark/tests/ui.rs b/crates/ark/tests/ui.rs index 8ffec3ae8..19075744f 100644 --- a/crates/ark/tests/ui.rs +++ b/crates/ark/tests/ui.rs @@ -16,7 +16,7 @@ use amalthea::socket::comm::CommInitiator; use amalthea::socket::comm::CommSocket; use amalthea::socket::stdin::StdInRequest; use ark::r_task::r_task; -use ark::test::r_test; +use ark::fixtures::r_test; use ark::ui::UiComm; use ark::ui::UiCommMessage; use crossbeam::channel::bounded; From a50b20ea8b142e91dadbf694f68fbcfcf3a069e4 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 10:00:06 +0200 Subject: [PATCH 28/36] Reuse socket receivers --- crates/amalthea/src/fixtures/dummy_frontend.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/amalthea/src/fixtures/dummy_frontend.rs b/crates/amalthea/src/fixtures/dummy_frontend.rs index bfa279f55..b35f29412 100644 --- a/crates/amalthea/src/fixtures/dummy_frontend.rs +++ b/crates/amalthea/src/fixtures/dummy_frontend.rs @@ -162,7 +162,7 @@ impl DummyFrontend { /// Receive from Shell and assert ExecuteReply message pub fn recv_shell_execute_reply(&self) -> ExecuteReply { - let msg = Message::read_from_socket(&self.shell_socket).unwrap(); + let msg = self.recv_shell(); assert_match!(msg, Message::ExecuteReply(data) => { data.content @@ -176,7 +176,7 @@ impl DummyFrontend { /// Receive from IOPub and assert Busy message pub fn recv_iopub_busy(&self) -> () { - let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + let msg = self.recv_iopub(); assert_match!(msg, Message::Status(data) => { assert_eq!(data.content.execution_state, ExecutionState::Busy); @@ -185,7 +185,7 @@ impl DummyFrontend { /// Receive from IOPub and assert Idle message pub fn recv_iopub_idle(&self) -> () { - let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + let msg = self.recv_iopub(); assert_match!(msg, Message::Status(data) => { assert_eq!(data.content.execution_state, ExecutionState::Idle); @@ -194,7 +194,7 @@ impl DummyFrontend { /// Receive from IOPub and assert ExecuteInput message pub fn recv_iopub_execute_input(&self) -> ExecuteInput { - let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + let msg = self.recv_iopub(); assert_match!(msg, Message::ExecuteInput(data) => { data.content @@ -204,7 +204,7 @@ impl DummyFrontend { /// Receive from IOPub and assert ExecuteResult message. Returns compulsory /// `plain/text` result. pub fn recv_iopub_execute_result(&self) -> String { - let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + let msg = self.recv_iopub(); assert_match!(msg, Message::ExecuteResult(data) => { assert_match!(data.content.data, Value::Object(map) => { @@ -218,7 +218,7 @@ impl DummyFrontend { /// Receive from IOPub and assert ExecuteResult message. Returns compulsory /// `evalue` field. pub fn recv_iopub_execute_error(&self) -> String { - let msg = Message::read_from_socket(&self.iopub_socket).unwrap(); + let msg = self.recv_iopub(); assert_match!(msg, Message::ExecuteError(data) => { data.content.exception.evalue From dd5240aff7f2a3900446ad385a96c5360fd54083 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 10:00:57 +0200 Subject: [PATCH 29/36] Fix typo in `.complete_initialization()` --- crates/amalthea/src/fixtures/dummy_frontend.rs | 2 +- crates/amalthea/tests/client.rs | 2 +- crates/ark/src/fixtures/dummy_frontend.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/amalthea/src/fixtures/dummy_frontend.rs b/crates/amalthea/src/fixtures/dummy_frontend.rs index b35f29412..ca8465f0e 100644 --- a/crates/amalthea/src/fixtures/dummy_frontend.rs +++ b/crates/amalthea/src/fixtures/dummy_frontend.rs @@ -125,7 +125,7 @@ impl DummyFrontend { /// Completes initialization of the frontend (usually done after the kernel /// is ready and connected) - pub fn complete_intialization(&self) { + pub fn complete_initialization(&self) { self.iopub_socket.subscribe().unwrap(); } diff --git a/crates/amalthea/tests/client.rs b/crates/amalthea/tests/client.rs index 33ac1f806..ad270c303 100644 --- a/crates/amalthea/tests/client.rs +++ b/crates/amalthea/tests/client.rs @@ -82,7 +82,7 @@ fn test_kernel() { // Complete client initialization info!("Completing frontend initialization"); - frontend.complete_intialization(); + frontend.complete_initialization(); // Ask the kernel for the kernel info. This should return an object with the // language "Test" defined in our shell handler. diff --git a/crates/ark/src/fixtures/dummy_frontend.rs b/crates/ark/src/fixtures/dummy_frontend.rs index d38e3e956..3c8b820ed 100644 --- a/crates/ark/src/fixtures/dummy_frontend.rs +++ b/crates/ark/src/fixtures/dummy_frontend.rs @@ -55,7 +55,7 @@ impl DummyArkFrontend { // Wait for startup to complete RMain::wait_r_initialized(); - frontend.complete_intialization(); + frontend.complete_initialization(); frontend } } From 0e927ee0bfbe5b3717faf828f13c61bc4d85dbe3 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 10:01:39 +0200 Subject: [PATCH 30/36] Include review suggestions --- crates/ark/src/interface.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 0b988e9e6..211d527e9 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -347,7 +347,7 @@ impl RMain { args.push(CString::new(arg).unwrap().into_raw()); } - // Get `R_HOME`, set by Positron / CI / kernel specification + // Get `R_HOME`, typically set by Positron / CI / kernel specification let r_home = match std::env::var("R_HOME") { Ok(home) => PathBuf::from(home), Err(_) => { From a62db57a55cf1cda0c656614f876a3e696a954aa Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 10:27:24 +0200 Subject: [PATCH 31/36] Remove `wait_initialized()` --- crates/ark/src/interface.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 211d527e9..a4683a1ca 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -490,17 +490,6 @@ impl RMain { R_MAIN_INIT.get().is_some() } - /// Wait for `RMain` singleton initialization - /// - /// Note that R might still not have finished starting up. - /// See `wait_r_initialized()`. - /// - /// Thread-safe. But note you can only get access to the singleton on the R - /// thread. - pub fn wait_initialized() { - R_MAIN_INIT.wait(); - } - /// Access a reference to the singleton instance of this struct /// /// SAFETY: Accesses must occur after `start_r()` initializes it, and must From 755a0e9e14a0fa436ed8e187f190e07d74b476d3 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 10:27:56 +0200 Subject: [PATCH 32/36] Use `IS_TESTING` instead of `RMain::is_initialized()` --- crates/ark/src/connections/r_connection.rs | 43 +++++++++++----------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 73e0a7c3d..97faacaa6 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -275,32 +275,31 @@ pub unsafe extern "C" fn ps_connection_opened( code: SEXP, ) -> Result { let id = Uuid::new_v4().to_string(); + let id_r: RObject = id.clone().into(); - // If RMain is not initialized, we are probably in testing mode, so we just don't start the connection - // and let the testing code manually do it - if RMain::is_initialized() { - let main = RMain::get(); - - let metadata = Metadata { - name: RObject::view(name).to::()?, - language_id: String::from("r"), - host: RObject::view(host).to::>().unwrap_or(None), - r#type: RObject::view(r#type).to::>().unwrap_or(None), - code: RObject::view(code).to::>().unwrap_or(None), - }; - - unwrap! ( - RConnection::start(metadata, main.get_comm_manager_tx().clone(), id.clone()), - Err(err) => { - log::error!("Connection Pane: Failed to start connection: {err:?}"); - return Err(err); - } - ); - } else { + if harp::test::IS_TESTING { + // If RMain is not initialized, we are probably in testing mode, so we just don't start the connection + // and let the testing code manually do it log::warn!("Connection Pane: RMain is not initialized. Connection will not be started."); + return Ok(id_r.sexp); + } + + let main = RMain::get(); + + let metadata = Metadata { + name: RObject::view(name).to::()?, + language_id: String::from("r"), + host: RObject::view(host).to::>().unwrap_or(None), + r#type: RObject::view(r#type).to::>().unwrap_or(None), + code: RObject::view(code).to::>().unwrap_or(None), + }; + + if let Err(err) = RConnection::start(metadata, main.get_comm_manager_tx().clone(), id) { + log::error!("Connection Pane: Failed to start connection: {err:?}"); + return Err(err); } - Ok(RObject::from(id).into()) + return Ok(id_r.sexp); } #[harp::register] From 47af0ad0eb15f27a36dfc8dde4bbf9df6e7da596 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 10:29:33 +0200 Subject: [PATCH 33/36] Use `RMain::is_initialized()` after all --- crates/ark/src/connections/r_connection.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 97faacaa6..b859811dc 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -277,9 +277,10 @@ pub unsafe extern "C" fn ps_connection_opened( let id = Uuid::new_v4().to_string(); let id_r: RObject = id.clone().into(); - if harp::test::IS_TESTING { - // If RMain is not initialized, we are probably in testing mode, so we just don't start the connection - // and let the testing code manually do it + if !RMain::is_initialized() { + // If RMain is not initialized, we are probably in unit tests, so we + // just don't start the connection and let the testing code manually do + // it. Note that RMain could be initialized in integration tests. log::warn!("Connection Pane: RMain is not initialized. Connection will not be started."); return Ok(id_r.sexp); } From fafbd8c7fe8c87fc5b8b8c8b919722a3063cc741 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 10:30:01 +0200 Subject: [PATCH 34/36] Panic if `RMain::start()` is called more than once --- crates/ark/src/interface.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index a4683a1ca..b7bee684c 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -284,7 +284,7 @@ pub enum ConsoleResult { impl RMain { /// Starts the main R thread and initializes the `R_MAIN` singleton. - /// Doesn't return. + /// Doesn't return. Must be called only once. pub fn start( r_args: Vec, startup_file: Option, @@ -298,16 +298,15 @@ impl RMain { dap: Arc>, session_mode: SessionMode, ) { - // Initialize global state (ensure we only do this once!) - R_MAIN_INIT.get_or_init(|| unsafe { - R_MAIN_THREAD_ID = Some(std::thread::current().id()); + unsafe { R_MAIN_THREAD_ID = Some(std::thread::current().id()) }; - // Channels to send/receive tasks from auxiliary threads via `RTask`s - let (tasks_interrupt_tx, tasks_interrupt_rx) = unbounded::(); - let (tasks_idle_tx, tasks_idle_rx) = unbounded::(); + // Channels to send/receive tasks from auxiliary threads via `RTask`s + let (tasks_interrupt_tx, tasks_interrupt_rx) = unbounded::(); + let (tasks_idle_tx, tasks_idle_rx) = unbounded::(); - r_task::initialize(tasks_interrupt_tx.clone(), tasks_idle_tx.clone()); + r_task::initialize(tasks_interrupt_tx.clone(), tasks_idle_tx.clone()); + unsafe { R_MAIN = Some(RMain::new( kernel_mutex, tasks_interrupt_rx, @@ -320,8 +319,12 @@ impl RMain { kernel_init_tx, dap, session_mode, - )); - }); + )) + }; + + // Let other threads know that `R_MAIN` is initialized. Deliberately + // panic if already set as `start()` must be called only once. + R_MAIN_INIT.set(()).expect("R can only be initialized once"); let mut r_args = r_args.clone(); From 1f5753da090c017ab27bed4666ea2f9526ec5ca0 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 11:09:07 +0200 Subject: [PATCH 35/36] Remove `RMain::initializing` flag And use `RMain::is_r_initialized()` instead --- crates/ark/src/interface.rs | 95 +++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index b7bee684c..804f69dfd 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -156,7 +156,6 @@ static R_INIT: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new(); static mut R_MAIN: Option = None; pub struct RMain { - initializing: bool, kernel_init_tx: Bus, /// Whether we are running in Console, Notebook, or Background mode. @@ -406,17 +405,15 @@ impl RMain { // Set up the global error handler (after support function initialization) errors::initialize(); - } - // Now that R has started (emitting any startup messages), and now that we have set - // up all hooks and handlers, officially finish the R initialization process to - // unblock the kernel-info request and also allow the LSP to start. - RMain::with_mut(|main| { - log::info!( - "R has started and ark handlers have been registered, completing initialization." - ); - main.complete_initialization(); - }); + // Now that R has started (emitting any startup messages), and now that we have set + // up all hooks and handlers, officially finish the R initialization process to + // unblock the kernel-info request and also allow the LSP to start. + RMain::with_mut(|main| { + log::info!("R has started and ark handlers have been registered, completing initialization."); + main.complete_initialization(); + }); + } // Now that R has started and libr and ark have fully initialized, run site and user // level R profiles, in that order @@ -431,6 +428,34 @@ impl RMain { crate::sys::interface::run_r(); } + /// Completes the kernel's initialization. + /// Unlike `RMain::start()`, this has access to `R_MAIN`'s state, such as + /// the kernel-info banner. + /// SAFETY: Can only be called from the R thread, and only once. + pub unsafe fn complete_initialization(&mut self) { + let version = unsafe { + let version = Rf_findVarInFrame(R_BaseNamespace, r_symbol!("R.version.string")); + RObject::new(version).to::().unwrap() + }; + + // Initial input and continuation prompts + let input_prompt: String = harp::get_option("prompt").try_into().unwrap(); + let continuation_prompt: String = harp::get_option("continue").try_into().unwrap(); + + let kernel_info = KernelInfo { + version: version.clone(), + banner: self.banner_output.clone(), + input_prompt: Some(input_prompt), + continuation_prompt: Some(continuation_prompt), + }; + + log::info!("Sending kernel info: {version}"); + self.kernel_init_tx.broadcast(kernel_info); + + // Thread-safe initialisation flag for R + R_INIT.set(()).expect("`R_INIT` can only be set once"); + } + pub fn new( kernel: Arc>, tasks_interrupt_rx: Receiver, @@ -445,7 +470,6 @@ impl RMain { session_mode: SessionMode, ) -> Self { Self { - initializing: true, r_request_rx, comm_manager_tx, stdin_request_tx, @@ -474,14 +498,23 @@ impl RMain { /// Wait for complete R initialization /// - /// Wait for R being ready to take input (i.e. `ReadConsole` was called). - /// Resolves as the same time as the `Bus` init channel does. + /// Wait for R being ready to evaluate R code. Resolves as the same time as + /// the `Bus` init channel does. /// /// Thread-safe. pub fn wait_r_initialized() { R_INIT.wait(); } + /// Has R completed initialization + /// + /// I.e. is it ready to evaluate R code. + /// + /// Thread-safe. + pub fn is_r_initialized() -> bool { + R_INIT.get().is_some() + } + /// Has the `RMain` singleton completed initialization. /// /// This can return true when R might still not have finished starting up. @@ -545,38 +578,6 @@ impl RMain { thread.id() == unsafe { R_MAIN_THREAD_ID.unwrap() } } - /// Completes the kernel's initialization - pub fn complete_initialization(&mut self) { - if self.initializing { - let version = unsafe { - let version = Rf_findVarInFrame(R_BaseNamespace, r_symbol!("R.version.string")); - RObject::new(version).to::().unwrap() - }; - - // Initial input and continuation prompts - let input_prompt: String = harp::get_option("prompt").try_into().unwrap(); - let continuation_prompt: String = harp::get_option("continue").try_into().unwrap(); - - let kernel_info = KernelInfo { - version: version.clone(), - banner: self.banner_output.clone(), - input_prompt: Some(input_prompt), - continuation_prompt: Some(continuation_prompt), - }; - - log::info!("Sending kernel info: {version}"); - self.kernel_init_tx.broadcast(kernel_info); - - // Internal initialisation flag, should only be used on the R thread - self.initializing = false; - - // Used as thread-safe initialisation flag - R_INIT.set(()).unwrap(); - } else { - log::warn!("Initialization already complete!"); - } - } - /// Provides read-only access to `iopub_tx` pub fn get_iopub_tx(&self) -> &Sender { &self.iopub_tx @@ -1323,7 +1324,7 @@ This is a Positron limitation we plan to fix. In the meantime, you can: Stream::Stderr }; - if self.initializing { + if !RMain::is_r_initialized() { // During init, consider all output to be part of the startup banner self.banner_output.push_str(&content); return; From 5af6ba670b3724fbb366dfae675466266b359fb9 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Fri, 27 Sep 2024 11:17:51 +0200 Subject: [PATCH 36/36] Document reliance on resolver's version 2 --- Cargo.toml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ed5dec75c..720075733 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,11 @@ # Was necessary after switching to dev tree-sitter to fix this warning: # > some crates are on edition 2021 which defaults to `resolver = "2"`, but -# > virtual workspaces default to `resolver = "1"` +# > virtual workspaces default to `resolver = "1"`. +# +# Also necessary to enable the `testing` feature of harp only when testing +# (i.e. when building downstream packages like Ark with Harp's `testing` +# feature set in `dev-dependencies`). resolver = "2" members = [