diff --git a/crates/amalthea/src/comm/data_explorer_comm.rs b/crates/amalthea/src/comm/data_explorer_comm.rs index c78cf1640..06faf9f54 100644 --- a/crates/amalthea/src/comm/data_explorer_comm.rs +++ b/crates/amalthea/src/comm/data_explorer_comm.rs @@ -1057,9 +1057,6 @@ pub struct SetSortColumnsParams { /// Parameters for the GetColumnProfiles method. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct GetColumnProfilesParams { - /// Async callback unique identifier - pub callback_id: String, - /// Array of requested profiles pub profiles: Vec, @@ -1067,16 +1064,6 @@ pub struct GetColumnProfilesParams { pub format_options: FormatOptions, } -/// Parameters for the ReturnColumnProfiles method. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct ReturnColumnProfilesParams { - /// Async callback unique identifier - pub callback_id: String, - - /// Array of individual column profile results - pub profiles: Vec, -} - /** * Backend RPC request types for the data_explorer comm */ @@ -1134,10 +1121,9 @@ pub enum DataExplorerBackendRequest { #[serde(rename = "set_sort_columns")] SetSortColumns(SetSortColumnsParams), - /// Async request a batch of column profiles + /// Request a batch of column profiles /// - /// Async request for a statistical summary or data profile for batch of - /// columns + /// Requests a statistical summary or data profile for batch of columns #[serde(rename = "get_column_profiles")] GetColumnProfiles(GetColumnProfilesParams), @@ -1178,8 +1164,7 @@ pub enum DataExplorerBackendReply { /// Reply for the set_sort_columns method (no result) SetSortColumnsReply(), - /// Reply for the get_column_profiles method (no result) - GetColumnProfilesReply(), + GetColumnProfilesReply(Vec), /// The current backend state for the data explorer GetStateReply(BackendState), @@ -1217,9 +1202,5 @@ pub enum DataExplorerFrontendEvent { #[serde(rename = "data_update")] DataUpdate, - /// Return async result of get_column_profiles request - #[serde(rename = "return_column_profiles")] - ReturnColumnProfiles(ReturnColumnProfilesParams), - } diff --git a/crates/ark/src/data_explorer/column_profile.rs b/crates/ark/src/data_explorer/column_profile.rs deleted file mode 100644 index b992f443d..000000000 --- a/crates/ark/src/data_explorer/column_profile.rs +++ /dev/null @@ -1,308 +0,0 @@ -// -// column_profile.rs -// -// Copyright (C) 2024 by Posit Software, PBC -// -// - -use amalthea::comm::comm_channel::CommMsg; -use amalthea::comm::data_explorer_comm::ColumnFrequencyTable; -use amalthea::comm::data_explorer_comm::ColumnHistogram; -use amalthea::comm::data_explorer_comm::ColumnProfileParams; -use amalthea::comm::data_explorer_comm::ColumnProfileRequest; -use amalthea::comm::data_explorer_comm::ColumnProfileResult; -use amalthea::comm::data_explorer_comm::ColumnProfileSpec; -use amalthea::comm::data_explorer_comm::ColumnProfileType; -use amalthea::comm::data_explorer_comm::ColumnSummaryStats; -use amalthea::comm::data_explorer_comm::DataExplorerFrontendEvent; -use amalthea::comm::data_explorer_comm::FormatOptions; -use amalthea::comm::data_explorer_comm::GetColumnProfilesParams; -use amalthea::comm::data_explorer_comm::ReturnColumnProfilesParams; -use amalthea::socket::comm::CommSocket; -use anyhow::anyhow; -use harp::exec::RFunction; -use harp::exec::RFunctionExt; -use harp::tbl_get_column; -use harp::RObject; -use harp::TableKind; -use stdext::unwrap; - -use crate::data_explorer::histogram; -use crate::data_explorer::summary_stats::summary_stats; -use crate::data_explorer::table::Table; -use crate::data_explorer::utils::display_type; -use crate::modules::ARK_ENVS; - -pub struct ProcessColumnsProfilesParams { - pub table: Table, - pub indices: Option>, - pub kind: TableKind, - pub request: GetColumnProfilesParams, -} - -pub async fn handle_columns_profiles_requests( - params: ProcessColumnsProfilesParams, - comm: CommSocket, -) -> anyhow::Result<()> { - let callback_id = params.request.callback_id; - let n_profiles = params.request.profiles.len(); - - let profiles = process_columns_profiles_requests( - params.table, - params.indices, - params.kind, - params.request.profiles, - params.request.format_options, - ) - .await - .unwrap_or_else(|e| { - // In case something goes wrong while computing the profiles, we send - // an empty response. Ideally, we would have a way to comunicate an that - // an error happened but it's not implemented yet. - log::error!("Error while producing profiles: {e}"); - std::iter::repeat(empty_column_profile_result()) - .take(n_profiles) - .collect() - }); - - let event = DataExplorerFrontendEvent::ReturnColumnProfiles(ReturnColumnProfilesParams { - callback_id, - profiles, - }); - - let json_event = serde_json::to_value(event)?; - comm.outgoing_tx.send(CommMsg::Data(json_event))?; - Ok(()) -} - -async fn process_columns_profiles_requests( - table: Table, - indices: Option>, - kind: TableKind, - profiles: Vec, - format_options: FormatOptions, -) -> anyhow::Result> { - // This is an R thread, so we can actually get the data frame. - // If it fails we quickly return an empty result set and end the task. - // This might happen if the task was spawned but the data explorer windows - // was later closed, before the task actually executed. - let data = table.get()?; - let mut results: Vec = Vec::with_capacity(profiles.len()); - - for profile in profiles.into_iter() { - log::trace!("Processing column!"); - results.push( - profile_column( - data.clone(), - indices.clone(), - profile, - &format_options, - kind, - ) - .await, - ); - // Yield to the idle event loop - tokio::task::yield_now().await; - } - - Ok(results) -} - -// This function does not return a Result because it must handle still handle other profile types -// if one of them fails. Thus it needs to gracefully handle the errors that might have resulted -// here. -// It's an async function just because we want to yield to R between each profile type. -async fn profile_column( - table: RObject, - filtered_indices: Option>, - request: ColumnProfileRequest, - format_options: &FormatOptions, - kind: TableKind, -) -> ColumnProfileResult { - let mut output = empty_column_profile_result(); - - let filtered_column = unwrap!(tbl_get_filtered_column( - &table, - request.column_index, - &filtered_indices, - kind, - ), Err(e) => { - // In the case something goes wrong here we log the error and return an empty output. - // This might still work for the other columns in the request. - log::error!("Error applying filter indices for column: {}. Err: {e}", request.column_index); - return output; - }); - - for profile_req in request.profiles { - match profile_req.profile_type { - ColumnProfileType::NullCount => { - output.null_count = profile_null_count(filtered_column.clone()) - .map_err(|err| { - log::error!( - "Error getting summary stats for column {}: {}", - request.column_index, - err - ); - }) - .ok(); - }, - ColumnProfileType::SummaryStats => { - output.summary_stats = - profile_summary_stats(filtered_column.clone(), format_options) - .map_err(|err| { - log::error!( - "Error getting null count for column {}: {}", - request.column_index, - err - ); - }) - .ok() - }, - ColumnProfileType::SmallHistogram | ColumnProfileType::LargeHistogram => { - let histogram = - profile_histogram(filtered_column.clone(), format_options, &profile_req) - .map_err(|err| { - log::error!( - "Error getting histogram for column {}: {}", - request.column_index, - err - ); - }) - .ok(); - - match profile_req.profile_type { - ColumnProfileType::SmallHistogram => { - output.small_histogram = histogram; - }, - ColumnProfileType::LargeHistogram => { - output.large_histogram = histogram; - }, - _ => { - // This is technically unreachable!(), but not worth panicking if - // this happens. - log::warn!("Unreachable"); - }, - } - }, - ColumnProfileType::SmallFrequencyTable | ColumnProfileType::LargeFrequencyTable => { - let frequency_table = - profile_frequency_table(filtered_column.clone(), format_options, &profile_req) - .map_err(|err| { - log::error!( - "Error getting frequency table for column {}: {}", - request.column_index, - err - ); - }) - .ok(); - - match profile_req.profile_type { - ColumnProfileType::SmallFrequencyTable => { - output.small_frequency_table = frequency_table; - }, - ColumnProfileType::LargeFrequencyTable => { - output.large_frequency_table = frequency_table; - }, - _ => { - // This is technically unreachable!(), but not worth panicking if - // this happens. - log::warn!("Unreachable. Unknown profile type.") - }, - } - }, - }; - - // Yield to the R console loop - tokio::task::yield_now().await; - } - output -} - -pub fn empty_column_profile_result() -> ColumnProfileResult { - ColumnProfileResult { - null_count: None, - summary_stats: None, - small_histogram: None, - small_frequency_table: None, - large_histogram: None, - large_frequency_table: None, - } -} - -fn profile_frequency_table( - column: RObject, - format_options: &FormatOptions, - profile_spec: &ColumnProfileSpec, -) -> anyhow::Result { - let params = match &profile_spec.params { - None => return Err(anyhow!("Missing parameters for the frequency table")), - Some(par) => match par { - ColumnProfileParams::SmallFrequencyTable(p) => p, - ColumnProfileParams::LargeFrequencyTable(p) => p, - _ => return Err(anyhow!("Wrong type of parameters for the frequency table.")), - }, - }; - let frequency_table = - histogram::profile_frequency_table(column.sexp, ¶ms, &format_options)?; - Ok(frequency_table) -} - -fn profile_histogram( - column: RObject, - format_options: &FormatOptions, - profile_spec: &ColumnProfileSpec, -) -> anyhow::Result { - let params = match &profile_spec.params { - None => return Err(anyhow!("Missing parameters for the histogram")), - Some(par) => match par { - ColumnProfileParams::SmallHistogram(p) => p, - ColumnProfileParams::LargeHistogram(p) => p, - _ => return Err(anyhow!("Wrong type of parameters for the histogram.")), - }, - }; - let histogram = histogram::profile_histogram(column.sexp, ¶ms, &format_options)?; - Ok(histogram) -} - -fn profile_summary_stats( - column: RObject, - format_options: &FormatOptions, -) -> anyhow::Result { - let dtype = display_type(column.sexp); - Ok(summary_stats(column.sexp, dtype, format_options)?) -} - -/// Counts the number of nulls in a column. As the intent is to provide an -/// idea of how complete the data is, NA values are considered to be null -/// for the purposes of these stats. -/// -/// Expects data to be filtered by the view indices. -/// -/// - `column_index`: The index of the column to count nulls in; 0-based. -fn profile_null_count(column: RObject) -> anyhow::Result { - // Compute the number of nulls in the column - let result: i32 = RFunction::new("", ".ps.null_count") - .param("column", column) - .call_in(ARK_ENVS.positron_ns)? - .try_into()?; - - // Return the count of nulls and NA values - Ok(result.try_into()?) -} - -fn tbl_get_filtered_column( - x: &RObject, - column_index: i64, - indices: &Option>, - kind: TableKind, -) -> anyhow::Result { - let column = tbl_get_column(x.sexp, column_index as i32, kind)?; - - Ok(match &indices { - Some(indices) => RFunction::from("col_filter_indices") - .add(column) - .add(RObject::try_from(indices)?) - .call_in(ARK_ENVS.positron_ns)?, - None => column, - }) -} diff --git a/crates/ark/src/data_explorer/mod.rs b/crates/ark/src/data_explorer/mod.rs index ce2d7c389..c06b21d58 100644 --- a/crates/ark/src/data_explorer/mod.rs +++ b/crates/ark/src/data_explorer/mod.rs @@ -5,11 +5,9 @@ // // -pub mod column_profile; pub mod export_selection; pub mod format; pub mod histogram; pub mod r_data_explorer; pub mod summary_stats; -pub mod table; pub mod utils; diff --git a/crates/ark/src/data_explorer/r_data_explorer.rs b/crates/ark/src/data_explorer/r_data_explorer.rs index 319aa123c..926859ab5 100644 --- a/crates/ark/src/data_explorer/r_data_explorer.rs +++ b/crates/ark/src/data_explorer/r_data_explorer.rs @@ -13,11 +13,18 @@ use amalthea::comm::data_explorer_comm::ArraySelection; use amalthea::comm::data_explorer_comm::BackendState; use amalthea::comm::data_explorer_comm::ColumnDisplayType; use amalthea::comm::data_explorer_comm::ColumnFilter; +use amalthea::comm::data_explorer_comm::ColumnFrequencyTable; +use amalthea::comm::data_explorer_comm::ColumnHistogram; +use amalthea::comm::data_explorer_comm::ColumnProfileParams; +use amalthea::comm::data_explorer_comm::ColumnProfileRequest; +use amalthea::comm::data_explorer_comm::ColumnProfileResult; +use amalthea::comm::data_explorer_comm::ColumnProfileSpec; use amalthea::comm::data_explorer_comm::ColumnProfileType; use amalthea::comm::data_explorer_comm::ColumnProfileTypeSupportStatus; use amalthea::comm::data_explorer_comm::ColumnSchema; use amalthea::comm::data_explorer_comm::ColumnSelection; use amalthea::comm::data_explorer_comm::ColumnSortKey; +use amalthea::comm::data_explorer_comm::ColumnSummaryStats; use amalthea::comm::data_explorer_comm::ColumnValue; use amalthea::comm::data_explorer_comm::DataExplorerBackendReply; use amalthea::comm::data_explorer_comm::DataExplorerBackendRequest; @@ -63,6 +70,10 @@ use harp::exec::RFunctionExt; use harp::object::RObject; use harp::r_symbol; use harp::tbl_get_column; +use harp::utils::r_inherits; +use harp::utils::r_is_object; +use harp::utils::r_is_s4; +use harp::utils::r_typeof; use harp::TableInfo; use harp::TableKind; use itertools::Itertools; @@ -73,16 +84,14 @@ use stdext::local; use stdext::result::ResultOrLog; use stdext::spawn; use stdext::unwrap; -use tracing::Instrument; use uuid::Uuid; -use crate::data_explorer::column_profile::handle_columns_profiles_requests; -use crate::data_explorer::column_profile::ProcessColumnsProfilesParams; use crate::data_explorer::export_selection; use crate::data_explorer::format; use crate::data_explorer::format::format_string; -use crate::data_explorer::table::Table; -use crate::data_explorer::utils::display_type; +use crate::data_explorer::histogram::profile_frequency_table; +use crate::data_explorer::histogram::profile_histogram; +use crate::data_explorer::summary_stats::summary_stats; use crate::data_explorer::utils::tbl_subset_with_view_indices; use crate::interface::RMain; use crate::lsp::events::EVENTS; @@ -113,7 +122,7 @@ pub struct RDataExplorer { title: String, /// The data object that the data viewer is currently viewing. - table: Table, + table: RThreadSafe, /// An optional binding to the environment containing the data object. /// This can be omitted for cases wherein the data object isn't in an @@ -153,18 +162,12 @@ pub struct RDataExplorer { /// A channel to send messages to the CommManager. comm_manager_tx: Sender, } + #[derive(Deserialize, Serialize)] struct Metadata { title: String, } -impl Drop for RDataExplorer { - fn drop(&mut self) { - // We guarantee that the table is deleted from the global store. - self.table.delete(); - } -} - impl RDataExplorer { pub fn start( title: String, @@ -182,18 +185,18 @@ impl RDataExplorer { // To be able to `Send` the `data` to the thread to be owned by the data // viewer, it needs to be made thread safe - let table = Table::new(RThreadSafe::new(data)); + let data = RThreadSafe::new(data); spawn!(format!("ark-data-viewer-{}-{}", title, id), move || { // Get the initial set of column schemas for the data object - let shape = r_task(|| Self::r_get_shape(table.get()?)); + let shape = r_task(|| Self::r_get_shape(&data)); match shape { // shape the columns; start the data viewer Ok(shape) => { // Create the initial state for the data viewer let viewer = Self { title, - table, + table: data, binding, shape, sorted_indices: None, @@ -299,7 +302,7 @@ impl RDataExplorer { let comm = self.comm.clone(); comm.handle_request(msg, |req| self.handle_rpc(req)); - }, + } } } @@ -322,9 +325,9 @@ impl RDataExplorer { return Ok(true); } - // See if the value has changed; this block returns true if the value has changed - // or false otherwise. It also sets the new value correctly. - let changed = r_task(|| { + // See if the value has changed; this block returns a new value if it + // has changed, or None if it hasn't + let new = r_task(|| { let binding = self.binding.as_ref().unwrap(); let env = binding.env.get().sexp; @@ -333,38 +336,28 @@ impl RDataExplorer { Rf_findVarInFrame(env, sym) }; - let old = self.table.get(); - let old = unwrap!(old, Err(_) => { - // This is AFAICT impossible because the table is only deleted when the data explorer instance is - // deleted and this method belongs to that data explorer instance. - log::error!("Old table has been deleted? This is unexpected, but we'll update the data explorer table."); - // It's `unsafe` because RObject::new calls protect, and it shouldn't - // be called outside of the R main thread. - self.table.set(RThreadSafe::new(unsafe { RObject::new(new) })); - return true; - }); - - if new == old.sexp { - false + let old = self.table.get().sexp; + if new == old { + None } else { - // Safety is same as above. We guarantee this is the R main thread. - self.table - .set(RThreadSafe::new(unsafe { RObject::new(new) })); - true + Some(RThreadSafe::new(unsafe { RObject::new(new) })) } }); // No change to the value, so we're done - if !changed { + if new.is_none() { return Ok(true); } + // Update the value + self.table = new.unwrap(); + // Now we need to check to see if the schema has changed or just a data // value. Regenerate the schema. // // Consider: there may be a cheaper way to test the schema for changes // than regenerating it, but it'd be a lot more complicated. - let new_shape = match r_task(|| Self::r_get_shape(self.table.get()?.clone())) { + let new_shape = match r_task(|| Self::r_get_shape(&self.table)) { Ok(shape) => shape, Err(_) => { // The most likely cause of this error is that the object is no @@ -465,12 +458,10 @@ impl RDataExplorer { DataExplorerBackendRequest::GetSchema(GetSchemaParams { column_indices }) => { self.get_schema(column_indices) }, - DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { columns, format_options, }) => r_task(|| self.r_get_data_values(columns, format_options)), - DataExplorerBackendRequest::SetSortColumns(SetSortColumnsParams { sort_keys: keys, }) => { @@ -489,7 +480,6 @@ impl RDataExplorer { Ok(DataExplorerBackendReply::SetSortColumnsReply()) }, - DataExplorerBackendRequest::SetRowFilters(SetRowFiltersParams { filters }) => { // Save the new row filters self.row_filters = filters; @@ -511,26 +501,24 @@ impl RDataExplorer { } })) }, + DataExplorerBackendRequest::GetColumnProfiles(GetColumnProfilesParams { + profiles: requests, + format_options, + }) => { + let profiles = requests + .into_iter() + .map(|request| r_task(|| self.r_get_column_profile(request, &format_options))) + .collect::>(); - DataExplorerBackendRequest::GetColumnProfiles(params) => { - // We respond imediately to this request, but first we launch an R idle task that will - // be responsible to compute the column profiles. - // This idle task yieldsß to the main event loop whenver possible, in order to allow for - // other requests to be computed. - self.launch_get_column_profiles_handler(params); - Ok(DataExplorerBackendReply::GetColumnProfilesReply()) + Ok(DataExplorerBackendReply::GetColumnProfilesReply(profiles)) }, - DataExplorerBackendRequest::GetState => r_task(|| self.r_get_state()), - DataExplorerBackendRequest::SearchSchema(_) => { return Err(anyhow!("Data Explorer: Not yet supported")); }, - DataExplorerBackendRequest::SetColumnFilters(_) => { return Err(anyhow!("Data Explorer: Not yet supported")); }, - DataExplorerBackendRequest::GetRowLabels(req) => { let row_labels = r_task(|| self.r_get_row_labels(req.selection, &req.format_options))?; @@ -540,7 +528,6 @@ impl RDataExplorer { }, )) }, - DataExplorerBackendRequest::ExportDataSelection(ExportDataSelectionParams { selection, format, @@ -556,9 +543,9 @@ impl RDataExplorer { // Methods that must be run on the main R thread impl RDataExplorer { - fn r_get_shape(table: RObject) -> anyhow::Result { + fn r_get_shape(table: &RThreadSafe) -> anyhow::Result { unsafe { - let table = table.clone(); + let table = table.get().clone(); let object = *table; let info = table_info_or_bail(object)?; @@ -612,23 +599,178 @@ impl RDataExplorer { } } - fn launch_get_column_profiles_handler(&self, params: GetColumnProfilesParams) { - let id = params.callback_id.clone(); - - let params = ProcessColumnsProfilesParams { - table: self.table.clone(), - indices: self.filtered_indices.clone(), - kind: self.shape.kind, - request: params, + fn r_get_column_profile( + &self, + request: ColumnProfileRequest, + format_options: &FormatOptions, + ) -> ColumnProfileResult { + let mut output = ColumnProfileResult { + null_count: None, + summary_stats: None, + small_histogram: None, + small_frequency_table: None, + large_histogram: None, + large_frequency_table: None, }; - let comm = self.comm.clone(); - r_task::spawn_idle(|| async move { - log::trace!("Processing GetColumnProfile request: {id}"); - handle_columns_profiles_requests(params, comm) - .instrument(tracing::info_span!("get_columns_profile", ns = id)) - .await - .or_log_error("Unable to handle get_columns_profile"); + + let filtered_column = unwrap!(tbl_get_filtered_column( + self.table.get(), + request.column_index, + &self.filtered_indices, + self.shape.kind, + ), Err(e) => { + // In the case something goes wrong here we log the error and return an empty output. + // This might still work for the other columns in the request. + log::error!("Error applying filter indices for column: {}. Err: {e}", request.column_index); + return output; }); + + for profile_req in request.profiles { + match profile_req.profile_type { + ColumnProfileType::NullCount => { + output.null_count = self + .profile_null_count(filtered_column.clone()) + .map_err(|err| { + log::error!( + "Error getting summary stats for column {}: {}", + request.column_index, + err + ); + }) + .ok(); + }, + ColumnProfileType::SummaryStats => { + output.summary_stats = self + .profile_summary_stats(filtered_column.clone(), format_options) + .map_err(|err| { + log::error!( + "Error getting null count for column {}: {}", + request.column_index, + err + ); + }) + .ok() + }, + ColumnProfileType::SmallHistogram | ColumnProfileType::LargeHistogram => { + let histogram = self + .profile_histogram(filtered_column.clone(), format_options, &profile_req) + .map_err(|err| { + log::error!( + "Error getting histogram for column {}: {}", + request.column_index, + err + ); + }) + .ok(); + + match profile_req.profile_type { + ColumnProfileType::SmallHistogram => { + output.small_histogram = histogram; + }, + ColumnProfileType::LargeHistogram => { + output.large_histogram = histogram; + }, + _ => { + // This is technically unreachable!(), but not worth panicking if + // this happens. + }, + } + }, + ColumnProfileType::SmallFrequencyTable | ColumnProfileType::LargeFrequencyTable => { + let frequency_table = self + .profile_frequency_table( + filtered_column.clone(), + format_options, + &profile_req, + ) + .map_err(|err| { + log::error!( + "Error getting frequency table for column {}: {}", + request.column_index, + err + ); + }) + .ok(); + + match profile_req.profile_type { + ColumnProfileType::SmallFrequencyTable => { + output.small_frequency_table = frequency_table; + }, + ColumnProfileType::LargeFrequencyTable => { + output.large_frequency_table = frequency_table; + }, + _ => { + // This is technically unreachable!(), but not worth panicking if + // this happens. + }, + } + }, + }; + } + output + } + + fn profile_frequency_table( + &self, + column: RObject, + format_options: &FormatOptions, + profile_spec: &ColumnProfileSpec, + ) -> anyhow::Result { + let params = match &profile_spec.params { + None => return Err(anyhow!("Missing parameters for the frequency table")), + Some(par) => match par { + ColumnProfileParams::SmallFrequencyTable(p) => p, + ColumnProfileParams::LargeFrequencyTable(p) => p, + _ => return Err(anyhow!("Wrong type of parameters for the frequency table.")), + }, + }; + let frequency_table = profile_frequency_table(column.sexp, ¶ms, &format_options)?; + Ok(frequency_table) + } + + fn profile_histogram( + &self, + column: RObject, + format_options: &FormatOptions, + profile_spec: &ColumnProfileSpec, + ) -> anyhow::Result { + let params = match &profile_spec.params { + None => return Err(anyhow!("Missing parameters for the histogram")), + Some(par) => match par { + ColumnProfileParams::SmallHistogram(p) => p, + ColumnProfileParams::LargeHistogram(p) => p, + _ => return Err(anyhow!("Wrong type of parameters for the histogram.")), + }, + }; + let histogram = profile_histogram(column.sexp, ¶ms, &format_options)?; + Ok(histogram) + } + + fn profile_summary_stats( + &self, + column: RObject, + format_options: &FormatOptions, + ) -> anyhow::Result { + let dtype = display_type(column.sexp); + Ok(summary_stats(column.sexp, dtype, format_options)?) + } + + /// Counts the number of nulls in a column. As the intent is to provide an + /// idea of how complete the data is, NA values are considered to be null + /// for the purposes of these stats. + /// + /// Expects data to be filtered by the view indices. + /// + /// - `column_index`: The index of the column to count nulls in; 0-based. + fn profile_null_count(&self, column: RObject) -> anyhow::Result { + // Compute the number of nulls in the column + let result: i32 = RFunction::new("", ".ps.null_count") + .param("column", column) + .call_in(ARK_ENVS.positron_ns)? + .try_into()?; + + // Return the count of nulls and NA values + Ok(result.try_into()?) } /// Sort the rows of the data object according to the sort keys in @@ -645,7 +787,7 @@ impl RDataExplorer { for key in &self.sort_keys { // Get the column to sort by order.add(tbl_get_column( - self.table.get()?.sexp, + self.table.get().sexp, key.column_index as i32, self.shape.kind, )?); @@ -691,7 +833,7 @@ impl RDataExplorer { // Pass the row filters to R and get the resulting row indices let filters = RObject::try_from(filters)?; let result: HashMap = RFunction::new("", ".ps.filter_rows") - .param("table", self.table.get()?.sexp) + .param("table", self.table.get().sexp) .param("row_filters", filters) .call_in(ARK_ENVS.positron_ns)? .try_into()?; @@ -888,7 +1030,7 @@ impl RDataExplorer { row_filters: self.row_filters.clone(), column_filters: self.col_filters.clone(), sort_keys: self.sort_keys.clone(), - has_row_labels: match self.table.get()?.attr("row.names") { + has_row_labels: match self.table.get().attr("row.names") { Some(_) => true, None => false, }, @@ -979,7 +1121,7 @@ impl RDataExplorer { let mut column_data: Vec> = Vec::with_capacity(columns.len()); for selection in columns { let tbl = tbl_subset_with_view_indices( - self.table.get()?.sexp, + self.table.get().sexp, &self.view_indices, Some(self.get_row_selection_indices(selection.spec)), Some(vec![selection.column_index]), @@ -1004,7 +1146,7 @@ impl RDataExplorer { format_options: &FormatOptions, ) -> anyhow::Result> { let tbl = tbl_subset_with_view_indices( - self.table.get()?.sexp, + self.table.get().sexp, &self.view_indices, Some(self.get_row_selection_indices(selection)), Some(vec![]), // Use empty vec, because we only need the row names. @@ -1058,7 +1200,7 @@ impl RDataExplorer { ) -> anyhow::Result { r_task(|| { export_selection::export_selection( - self.table.get()?.sexp, + self.table.get().sexp, &self.view_indices, selection, format, @@ -1067,10 +1209,88 @@ impl RDataExplorer { } } +// This returns the type of an _element_ of the column. In R atomic +// vectors do not have a distinct internal type but we pretend that they +// do for the purpose of integrating with Positron types. +fn display_type(x: SEXP) -> ColumnDisplayType { + if r_is_s4(x) { + return ColumnDisplayType::Unknown; + } + + if r_is_object(x) { + if r_inherits(x, "logical") { + return ColumnDisplayType::Boolean; + } + + if r_inherits(x, "integer") { + return ColumnDisplayType::Number; + } + if r_inherits(x, "double") { + return ColumnDisplayType::Number; + } + if r_inherits(x, "complex") { + return ColumnDisplayType::Number; + } + if r_inherits(x, "numeric") { + return ColumnDisplayType::Number; + } + + if r_inherits(x, "character") { + return ColumnDisplayType::String; + } + if r_inherits(x, "factor") { + return ColumnDisplayType::String; + } + + if r_inherits(x, "Date") { + return ColumnDisplayType::Date; + } + if r_inherits(x, "POSIXct") { + return ColumnDisplayType::Datetime; + } + if r_inherits(x, "POSIXlt") { + return ColumnDisplayType::Datetime; + } + + // TODO: vctrs's list_of + if r_inherits(x, "list") { + return ColumnDisplayType::Unknown; + } + + // Catch-all, including for data frame + return ColumnDisplayType::Unknown; + } + + match r_typeof(x) { + LGLSXP => return ColumnDisplayType::Boolean, + INTSXP | REALSXP | CPLXSXP => return ColumnDisplayType::Number, + STRSXP => return ColumnDisplayType::String, + VECSXP => return ColumnDisplayType::Unknown, + _ => return ColumnDisplayType::Unknown, + } +} + fn table_info_or_bail(x: SEXP) -> anyhow::Result { harp::table_info(x).ok_or(anyhow!("Unsupported type for data viewer")) } +fn tbl_get_filtered_column( + x: &RObject, + column_index: i64, + indices: &Option>, + kind: TableKind, +) -> anyhow::Result { + let column = tbl_get_column(x.sexp, column_index as i32, kind)?; + + Ok(match &indices { + Some(indices) => RFunction::from("col_filter_indices") + .add(column) + .add(RObject::try_from(indices)?) + .call_in(ARK_ENVS.positron_ns)?, + None => column, + }) +} + /// Open an R object in the data viewer. /// /// This function is called from the R side to open an R object in the data viewer. diff --git a/crates/ark/src/data_explorer/table.rs b/crates/ark/src/data_explorer/table.rs deleted file mode 100644 index 24dbed2ba..000000000 --- a/crates/ark/src/data_explorer/table.rs +++ /dev/null @@ -1,50 +0,0 @@ -// -// table.rs -// -// Copyright (C) 2024 by Posit Software, PBC -// -// - -use std::sync::Arc; -use std::sync::Mutex; - -use anyhow::anyhow; -use harp::RObject; - -use crate::thread::RThreadSafe; - -#[derive(Clone)] -pub struct Table { - table: Arc>>>, -} - -impl Table { - pub fn new(data: RThreadSafe) -> Self { - let table = Arc::new(Mutex::new(Some(data))); - Self { table } - } - - // Get can only be called from the main thread as it will also call - // get in the RThreadSafe object. - // Get only result in errors when the table is no longer available, so this - // failing to get, can be used as a sign to cancel a task. - pub fn get(&self) -> anyhow::Result { - let guard = self.table.lock().unwrap(); - let table = guard - .as_ref() - .ok_or(anyhow!("Table not found"))? - .get() - .clone(); - Ok(table) - } - - pub fn set(&mut self, data: RThreadSafe) { - let mut table = self.table.lock().unwrap(); - *table = Some(data); - } - - pub fn delete(&mut self) { - let mut table = self.table.lock().unwrap(); - *table = None; - } -} diff --git a/crates/ark/src/data_explorer/utils.rs b/crates/ark/src/data_explorer/utils.rs index 2749a1715..caf44d8bd 100644 --- a/crates/ark/src/data_explorer/utils.rs +++ b/crates/ark/src/data_explorer/utils.rs @@ -1,12 +1,7 @@ -use amalthea::comm::data_explorer_comm::ColumnDisplayType; use harp::exec::RFunction; use harp::exec::RFunctionExt; use harp::object::RObject; -use harp::utils::r_inherits; -use harp::utils::r_is_object; -use harp::utils::r_is_s4; -use harp::utils::r_typeof; -use libr::*; +use libr::SEXP; use crate::modules::ARK_ENVS; @@ -45,64 +40,3 @@ fn tbl_subset(x: SEXP, i: Option>, j: Option>) -> anyhow::Resu Ok(call.call_in(ARK_ENVS.positron_ns)?) } - -// This returns the type of an _element_ of the column. In R atomic -// vectors do not have a distinct internal type but we pretend that they -// do for the purpose of integrating with Positron types. -pub fn display_type(x: SEXP) -> ColumnDisplayType { - if r_is_s4(x) { - return ColumnDisplayType::Unknown; - } - - if r_is_object(x) { - if r_inherits(x, "logical") { - return ColumnDisplayType::Boolean; - } - - if r_inherits(x, "integer") { - return ColumnDisplayType::Number; - } - if r_inherits(x, "double") { - return ColumnDisplayType::Number; - } - if r_inherits(x, "complex") { - return ColumnDisplayType::Number; - } - if r_inherits(x, "numeric") { - return ColumnDisplayType::Number; - } - - if r_inherits(x, "character") { - return ColumnDisplayType::String; - } - if r_inherits(x, "factor") { - return ColumnDisplayType::String; - } - - if r_inherits(x, "Date") { - return ColumnDisplayType::Date; - } - if r_inherits(x, "POSIXct") { - return ColumnDisplayType::Datetime; - } - if r_inherits(x, "POSIXlt") { - return ColumnDisplayType::Datetime; - } - - // TODO: vctrs's list_of - if r_inherits(x, "list") { - return ColumnDisplayType::Unknown; - } - - // Catch-all, including for data frame - return ColumnDisplayType::Unknown; - } - - match r_typeof(x) { - LGLSXP => return ColumnDisplayType::Boolean, - INTSXP | REALSXP | CPLXSXP => return ColumnDisplayType::Number, - STRSXP => return ColumnDisplayType::String, - VECSXP => return ColumnDisplayType::Unknown, - _ => return ColumnDisplayType::Unknown, - } -} diff --git a/crates/ark/src/r_task.rs b/crates/ark/src/r_task.rs index 85ebd9c51..20c57f24e 100644 --- a/crates/ark/src/r_task.rs +++ b/crates/ark/src/r_task.rs @@ -253,7 +253,7 @@ where Fut: Future + 'static, { // Idle tasks are always run from the read-console loop - if unsafe { R_TASK_BYPASS } { + if !only_idle && unsafe { R_TASK_BYPASS } { // Escape hatch for unit tests futures::executor::block_on(fun()); return; diff --git a/crates/ark/tests/data_explorer.rs b/crates/ark/tests/data_explorer.rs index 92027423f..8b63f6c43 100644 --- a/crates/ark/tests/data_explorer.rs +++ b/crates/ark/tests/data_explorer.rs @@ -13,7 +13,6 @@ use amalthea::comm::data_explorer_comm::ColumnHistogramParams; use amalthea::comm::data_explorer_comm::ColumnHistogramParamsMethod; use amalthea::comm::data_explorer_comm::ColumnProfileParams; use amalthea::comm::data_explorer_comm::ColumnProfileRequest; -use amalthea::comm::data_explorer_comm::ColumnProfileResult; use amalthea::comm::data_explorer_comm::ColumnProfileSpec; use amalthea::comm::data_explorer_comm::ColumnProfileType; use amalthea::comm::data_explorer_comm::ColumnSelection; @@ -186,62 +185,6 @@ fn get_data_values_request( }) } -fn expect_column_profile_results( - socket: &DataExplorerSocket, - req: DataExplorerBackendRequest, - check: fn(Vec), -) { - // Randomly generate a unique ID for this request. - let id = uuid::Uuid::new_v4().to_string(); - - // Serialize the message for the wire - let json = serde_json::to_value(req).unwrap(); - println!("--> {:?}", json); - - // Convert the request to a CommMsg and send it. - let msg = CommMsg::Rpc(id, json); - socket.socket.incoming_tx.send(msg).unwrap(); - - let msg = socket - .socket - .outgoing_rx - .recv_timeout(std::time::Duration::from_secs(1)) - .unwrap(); - - // Because during tests, no threads are created with r_task::spawn_idle, the messages are in - // an incorrect order. We first receive the DataExplorerFrontndEvent with the column profiles - // and then receive the results. - assert_match!( - msg, - CommMsg::Data(value) => { - let event = serde_json::from_value::(value).unwrap(); - assert_match!( - event, - DataExplorerFrontendEvent::ReturnColumnProfiles(ev) => { - check(ev.profiles); - } - ); - } - ); - - let msg = socket - .socket - .outgoing_rx - .recv_timeout(std::time::Duration::from_secs(1)) - .unwrap(); - - let reply: DataExplorerBackendReply = match msg { - CommMsg::Rpc(_id, value) => { - println!("<-- {:?}", value); - let reply = serde_json::from_value(value).unwrap(); - reply - }, - _ => panic!("Unexpected Comm Message"), - }; - - assert_eq!(reply, DataExplorerBackendReply::GetColumnProfilesReply()); -} - fn test_mtcars_sort(socket: DataExplorerSocket, has_row_names: bool, display_name: String) { // Get the schema for the test data set. let req = DataExplorerBackendRequest::GetSchema(GetSchemaParams { @@ -641,7 +584,6 @@ fn test_null_counts() { // Ask for a count of nulls in the first column. let req = DataExplorerBackendRequest::GetColumnProfiles(GetColumnProfilesParams { - callback_id: String::from("id"), profiles: vec![ColumnProfileRequest { column_index: 0, profiles: vec![ColumnProfileSpec { @@ -652,10 +594,13 @@ fn test_null_counts() { format_options: default_format_options(), }); - expect_column_profile_results(&socket, req, |data| { - assert!(data.len() == 1); - assert_eq!(data[0].null_count, Some(3)); - }); + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetColumnProfilesReply(data) => { + // We asked for the null count of the first column, which has 3 NA values. + assert!(data.len() == 1); + assert_eq!(data[0].null_count, Some(3)); + } + ); // Next, apply a filter to the data set. Filter out all empty rows. let req = DataExplorerBackendRequest::SetRowFilters(SetRowFiltersParams { @@ -682,7 +627,6 @@ fn test_null_counts() { // Ask for a count of nulls in the first column again. Since a filter // has been applied, the null count should be 0. let req = DataExplorerBackendRequest::GetColumnProfiles(GetColumnProfilesParams { - callback_id: String::from("id"), profiles: vec![ColumnProfileRequest { column_index: 0, profiles: vec![ColumnProfileSpec { @@ -693,12 +637,14 @@ fn test_null_counts() { format_options: default_format_options(), }); - expect_column_profile_results(&socket, req, |data| { - // We asked for the null count of the first column, which has no - // NA values after the filter. - assert!(data.len() == 1); - assert_eq!(data[0].null_count, Some(0)); - }); + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetColumnProfilesReply(data) => { + // We asked for the null count of the first column, which has no + // NA values after the filter. + assert!(data.len() == 1); + assert_eq!(data[0].null_count, Some(0)); + } + ); // Let's look at JUST the empty rows. let req = DataExplorerBackendRequest::SetRowFilters(SetRowFiltersParams { @@ -740,7 +686,6 @@ fn test_summary_stats() { // Ask for summary stats for the columns let req = DataExplorerBackendRequest::GetColumnProfiles(GetColumnProfilesParams { - callback_id: String::from("id"), profiles: (0..3) .map(|i| ColumnProfileRequest { column_index: i, @@ -753,44 +698,47 @@ fn test_summary_stats() { format_options: default_format_options(), }); - expect_column_profile_results(&socket, req, |data| { - // We asked for summary stats for all 3 columns - assert!(data.len() == 3); - - // The first column is numeric and has 3 non-NA values. - assert!(data[0].summary_stats.is_some()); - let number_stats = data[0].summary_stats.clone().unwrap().number_stats; - assert!(number_stats.is_some()); - let number_stats = number_stats.unwrap(); - assert_eq!(number_stats, SummaryStatsNumber { - min_value: Some(String::from("1.00")), - max_value: Some(String::from("3.00")), - mean: Some(String::from("2.00")), - median: Some(String::from("2.00")), - stdev: Some(String::from("1.00")), - }); - - // The second column is a character column - assert!(data[1].summary_stats.is_some()); - let string_stats = data[1].summary_stats.clone().unwrap().string_stats; - assert!(string_stats.is_some()); - let string_stats = string_stats.unwrap(); - assert_eq!(string_stats, SummaryStatsString { - num_empty: 1, - num_unique: 3, // NA's are counted as unique values - }); - - // The third column is boolean - assert!(data[2].summary_stats.is_some()); - let boolean_stats = data[2].summary_stats.clone().unwrap().boolean_stats; - assert!(boolean_stats.is_some()); - let boolean_stats = boolean_stats.unwrap(); - assert_eq!(boolean_stats, SummaryStatsBoolean { - true_count: 2, - false_count: 1, - }); - }); - }); + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetColumnProfilesReply(data) => { + // We asked for summary stats for all 3 columns + assert!(data.len() == 3); + + // The first column is numeric and has 3 non-NA values. + assert!(data[0].summary_stats.is_some()); + let number_stats = data[0].summary_stats.clone().unwrap().number_stats; + assert!(number_stats.is_some()); + let number_stats = number_stats.unwrap(); + assert_eq!(number_stats, SummaryStatsNumber { + min_value: Some(String::from("1.00")), + max_value: Some(String::from("3.00")), + mean: Some(String::from("2.00")), + median: Some(String::from("2.00")), + stdev: Some(String::from("1.00")), + }); + + // The second column is a character column + assert!(data[1].summary_stats.is_some()); + let string_stats = data[1].summary_stats.clone().unwrap().string_stats; + assert!(string_stats.is_some()); + let string_stats = string_stats.unwrap(); + assert_eq!(string_stats, SummaryStatsString { + num_empty: 1, + num_unique: 3, // NA's are counted as unique values + }); + + // The third column is boolean + assert!(data[2].summary_stats.is_some()); + let boolean_stats = data[2].summary_stats.clone().unwrap().boolean_stats; + assert!(boolean_stats.is_some()); + let boolean_stats = boolean_stats.unwrap(); + assert_eq!(boolean_stats, SummaryStatsBoolean { + true_count: 2, + false_count: 1, + }); + + } + ); + }) } #[test] @@ -1768,9 +1716,8 @@ fn test_histogram() { let socket = open_data_explorer_from_expression("data.frame(x = rep(1:10, 10:1))", None).unwrap(); - let make_histogram_req = |id, column_index, method, num_bins, quantiles| { + let make_histogram_req = |column_index, method, num_bins, quantiles| { DataExplorerBackendRequest::GetColumnProfiles(GetColumnProfilesParams { - callback_id: id, profiles: vec![ColumnProfileRequest { column_index, profiles: vec![ColumnProfileSpec { @@ -1786,18 +1733,12 @@ fn test_histogram() { }) }; - let id = String::from("histogram_req"); - let req = make_histogram_req(id.clone(), 0, ColumnHistogramParamsMethod::Fixed, 10, None); + let req = make_histogram_req(0, ColumnHistogramParamsMethod::Fixed, 10, None); - expect_column_profile_results(&socket, req, |profiles| { + assert_match!(socket_rpc(&socket, req), DataExplorerBackendReply::GetColumnProfilesReply(profiles) => { let histogram = profiles[0].small_histogram.clone().unwrap(); assert_eq!(histogram, ColumnHistogram { - bin_edges: format_string( - harp::parse_eval_global("seq(1, 10, length.out=11)") - .unwrap() - .sexp, - &default_format_options() - ), + bin_edges: format_string(harp::parse_eval_global("seq(1, 10, length.out=11)").unwrap().sexp, &default_format_options()), bin_counts: vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1], // Pretty bind edges unite the first two intervals quantiles: vec![], }); @@ -1812,9 +1753,8 @@ fn test_frequency_table() { open_data_explorer_from_expression("data.frame(x = rep(letters[1:10], 10:1))", None) .unwrap(); - let make_freq_table_req = |id, column_index, limit| { + let make_freq_table_req = |column_index, limit| { DataExplorerBackendRequest::GetColumnProfiles(GetColumnProfilesParams { - callback_id: id, profiles: vec![ColumnProfileRequest { column_index, profiles: vec![ColumnProfileSpec { @@ -1828,16 +1768,12 @@ fn test_frequency_table() { }) }; - let id = String::from("freq_table"); - let req = make_freq_table_req(id.clone(), 0, 5); + let req = make_freq_table_req(0, 5); - expect_column_profile_results(&socket, req, |profiles| { + assert_match!(socket_rpc(&socket, req), DataExplorerBackendReply::GetColumnProfilesReply(profiles) => { let freq_table = profiles[0].small_frequency_table.clone().unwrap(); assert_eq!(freq_table, ColumnFrequencyTable { - values: format_string( - harp::parse_eval_global("letters[1:5]").unwrap().sexp, - &default_format_options() - ), + values: format_string(harp::parse_eval_global("letters[1:5]").unwrap().sexp, &default_format_options()), counts: vec![10, 9, 8, 7, 6], other_count: Some(5 + 4 + 3 + 2 + 1) });