Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelize + show progress for input plugins #57

Merged
merged 2 commits into from
Nov 30, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 68 additions & 20 deletions rust/routee-compass/src/app/compass/compass_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ use crate::{
use chrono::{Duration, Local};
use config::Config;
use itertools::{Either, Itertools};
use kdam::{Bar, BarExt};
use rayon::{current_num_threads, prelude::*};
use routee_compass_core::{
algorithm::search::search_algorithm::SearchAlgorithm,
util::duration_extension::DurationExtension,
};
use std::path::{Path, PathBuf};
use serde_json::Value;
use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex},
};

/// Instance of RouteE Compass as an application.
/// When constructed, it holds
Expand Down Expand Up @@ -196,38 +201,81 @@ impl CompassApp {
&self,
queries: Vec<serde_json::Value>,
) -> Result<Vec<serde_json::Value>, CompassAppError> {
let input_pb = Bar::builder()
.total(queries.len())
.animation("fillup")
.desc("input plugins")
.build()
.map_err(CompassAppError::UXError)?;
let input_pb_shared = Arc::new(Mutex::new(input_pb));

// input plugins need to be flattened, and queries that fail input processing need to be
// returned at the end.
let (input_bundles, input_error_responses): (
Vec<Vec<serde_json::Value>>,
Vec<serde_json::Value>,
) = queries
.iter()
.map(|q| apply_input_plugins(q, &self.input_plugins))
.partition_map(|r| match r {
Ok(values) => Either::Left(values),
Err(error_response) => Either::Right(error_response),
});
let input_queries: Vec<serde_json::Value> = input_bundles.into_iter().flatten().collect();
if input_queries.is_empty() {
return Ok(input_error_responses);
let plugin_chunk_size = (queries.len() as f64 / self.parallelism as f64).ceil() as usize;
let input_plugin_result: (Vec<_>, Vec<_>) = queries
.par_chunks(plugin_chunk_size)
.map(|queries| {
let result: (Vec<Vec<Value>>, Vec<Value>) = queries
.iter()
.map(|q| {
let inner_processed = apply_input_plugins(q, &self.input_plugins);
if let Ok(mut pb_local) = input_pb_shared.lock() {
let _ = pb_local.update(1);
}
inner_processed
})
.partition_map(|r| match r {
Ok(values) => Either::Left(values),
Err(error_response) => Either::Right(error_response),
});

result
})
.unzip();

println!();

// unpack input plugin results
let (processed_inputs_nested, error_inputs_nested) = input_plugin_result;
let processed_inputs: Vec<Value> = processed_inputs_nested
.into_iter()
.flatten()
.flatten()
.collect();
let error_inputs: Vec<Value> = error_inputs_nested.into_iter().flatten().collect();
if processed_inputs.is_empty() {
return Ok(error_inputs);
}

// run parallel searches using a rayon thread pool
let chunk_size = (input_queries.len() as f64 / self.parallelism as f64).ceil() as usize;
let query_chunk_size =
(processed_inputs.len() as f64 / self.parallelism as f64).ceil() as usize;
log::info!(
"creating {} parallel batches across {} threads to run queries with chunk size {}",
self.parallelism,
current_num_threads(),
chunk_size
query_chunk_size
);

let run_query_result = input_queries
.par_chunks(chunk_size)
let search_pb = Bar::builder()
.total(processed_inputs.len())
.animation("fillup")
.desc("search")
.build()
.map_err(CompassAppError::UXError)?;
let search_pb_shared = Arc::new(Mutex::new(search_pb));
let run_query_result = processed_inputs
.par_chunks(query_chunk_size)
.map(|queries| {
queries
.iter()
.map(|q| self.run_single_query(q.clone()))
.map(|q| {
let inner_search = self.run_single_query(q.clone());
if let Ok(mut pb_local) = search_pb_shared.lock() {
let _ = pb_local.update(1);
}
inner_search
})
.collect::<Result<Vec<Vec<serde_json::Value>>, CompassAppError>>()
})
.collect::<Result<Vec<Vec<Vec<serde_json::Value>>>, CompassAppError>>()?;
Expand All @@ -236,7 +284,7 @@ impl CompassApp {
.into_iter()
.flatten()
.flatten()
.chain(input_error_responses)
.chain(error_inputs)
.collect();

Ok(run_result)
Expand Down