Skip to content

Commit

Permalink
feat: added salsa integration to plugin core
Browse files Browse the repository at this point in the history
  • Loading branch information
j-lanson authored and alilleybrinker committed Aug 23, 2024
1 parent bbdd162 commit 4ea121c
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 38 deletions.
89 changes: 89 additions & 0 deletions hipcheck/src/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#![allow(unused)]

use crate::plugin::{ActivePlugin, HcPluginCore, PluginExecutor, PluginResponse, PluginWithConfig};
use crate::{hc_error, Result};
use serde_json::Value;
use std::sync::{Arc, LazyLock};
use tokio::runtime::Runtime;

// Salsa doesn't natively support async functions, so our recursive `query()` function that
// interacts with plugins (which use async) has to get a handle to the underlying runtime,
// spawn and block on a task to query the plugin, then choose whether to recurse or return.

static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());

#[salsa::query_group(HcEngineStorage)]
pub trait HcEngine: salsa::Database {
#[salsa::input]
fn core(&self) -> Arc<HcPluginCore>;

fn query(&self, publisher: String, plugin: String, query: String, key: Value) -> Result<Value>;
}

fn query(
db: &dyn HcEngine,
publisher: String,
plugin: String,
query: String,
key: Value,
) -> Result<Value> {
let runtime = RUNTIME.handle();
let core = db.core();
// Find the plugin
let Some(p_handle) = core.plugins.get(&plugin) else {
return Err(hc_error!("No such plugin {}::{}", publisher, plugin));
};
// Initiate the query. If remote closed or we got our response immediately,
// return
let mut ar = match runtime.block_on(p_handle.query(query, key))? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
}
PluginResponse::Completed(v) => return Ok(v),
PluginResponse::AwaitingResult(a) => a,
};
// Otherwise, the plugin needs more data to continue. Recursively query
// (with salsa memo-ization) to get the needed data, and resume our
// current query by providing the plugin the answer.
loop {
let answer = db.query(
ar.publisher.clone(),
ar.plugin.clone(),
ar.query.clone(),
ar.key.clone(),
)?;
ar = match runtime.block_on(p_handle.resume_query(ar, answer))? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
}
PluginResponse::Completed(v) => return Ok(v),
PluginResponse::AwaitingResult(a) => a,
};
}
}

#[salsa::database(HcEngineStorage)]
pub struct HcEngineImpl {
// Query storage
storage: salsa::Storage<Self>,
}

impl salsa::Database for HcEngineImpl {}

impl HcEngineImpl {
// Really HcEngineImpl and HcPluginCore do the same thing right now, except HcPluginCore
// has an async constructor. If we can manipulate salsa to accept async functions, we
// could consider merging the two structs. Although maybe its wise to keep HcPluginCore
// independent of Salsa.
pub fn new(executor: PluginExecutor, plugins: Vec<(PluginWithConfig)>) -> Result<Self> {
let runtime = RUNTIME.handle();
let core = runtime.block_on(HcPluginCore::new(executor, plugins))?;
let mut engine = HcEngineImpl {
storage: Default::default(),
};
engine.set_core(Arc::new(core));
Ok(engine)
}
// TODO - "run" function that takes analysis heirarchy and target, and queries each
// analysis plugin to kick off the execution
}
53 changes: 26 additions & 27 deletions hipcheck/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod command_util;
mod config;
mod context;
mod data;
mod engine;
mod error;
mod git2_log_shim;
mod git2_rustls_transport;
Expand Down Expand Up @@ -38,9 +39,10 @@ use crate::analysis::report_builder::Report;
use crate::analysis::score::score_results;
use crate::cache::HcCache;
use crate::context::Context as _;
use crate::engine::{HcEngine, HcEngineImpl};
use crate::error::Error;
use crate::error::Result;
use crate::plugin::{HcPluginCore, Plugin, PluginExecutor, PluginWithConfig};
use crate::plugin::{Plugin, PluginExecutor, PluginWithConfig};
use crate::session::session::Session;
use crate::setup::{resolve_and_transform_source, SourceType};
use crate::shell::verbosity::Verbosity;
Expand Down Expand Up @@ -639,7 +641,6 @@ fn check_github_token() -> StdResult<(), EnvVarCheckError> {
}

fn cmd_plugin() {
use tokio::runtime::Runtime;
let tgt_dir = "./target/debug";
let entrypoint = pathbuf![tgt_dir, "dummy_rand_data"];
let plugin = Plugin {
Expand All @@ -654,30 +655,29 @@ fn cmd_plugin() {
/* jitter_percent */ 10,
)
.unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(async move {
println!("Started executor");
let mut core = match HcPluginCore::new(
plugin_executor,
vec![PluginWithConfig(plugin, serde_json::json!(null))],
)
.await
{
Ok(c) => c,
Err(e) => {
println!("{e}");
return;
}
};
match core.run().await {
Ok(_) => {
println!("HcCore run completed");
}
Err(e) => {
println!("HcCore run failed with '{e}'");
}
};
});
let engine = match HcEngineImpl::new(
plugin_executor,
vec![PluginWithConfig(plugin, serde_json::json!(null))],
) {
Ok(e) => e,
Err(e) => {
println!("Failed to create engine: {e}");
return;
}
};
let res = match engine.query(
"MITRE".to_owned(),
"rand_data".to_owned(),
"rand_data".to_owned(),
serde_json::json!(7),
) {
Ok(r) => r,
Err(e) => {
println!("Query failed: {e}");
return;
}
};
println!("Result: {res}");
}

fn cmd_ready(config: &CliConfig) {
Expand Down Expand Up @@ -727,7 +727,6 @@ fn cmd_ready(config: &CliConfig) {
Err(e) => println!("{:<17} {}", "Policy Path:", e),
}


match &ready.github_token_check {
Ok(_) => println!("{:<17} Found!", "GitHub Token:"),
Err(e) => println!("{:<17} {}", "GitHub Token:", e),
Expand Down
15 changes: 4 additions & 11 deletions hipcheck/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub async fn initialize_plugins(
Ok(out)
}

struct ActivePlugin {
#[derive(Debug)]
pub struct ActivePlugin {
next_id: Mutex<usize>,
channel: PluginTransport,
}
Expand Down Expand Up @@ -91,9 +92,10 @@ impl ActivePlugin {
}
}

#[derive(Debug)]
pub struct HcPluginCore {
executor: PluginExecutor,
plugins: HashMap<String, ActivePlugin>,
pub plugins: HashMap<String, ActivePlugin>,
}
impl HcPluginCore {
// When this object is returned, the plugins are all connected but the
Expand Down Expand Up @@ -128,13 +130,4 @@ impl HcPluginCore {
// Now we have a set of started and initialized plugins to interact with
Ok(HcPluginCore { executor, plugins })
}
// @Temporary
pub async fn run(&mut self) -> Result<()> {
let handle = self.plugins.get("rand_data").unwrap();
let resp = handle
.query("rand_data".to_owned(), serde_json::json!(7))
.await?;
println!("Plugin response: {resp:?}");
Ok(())
}
}
4 changes: 4 additions & 0 deletions hipcheck/src/plugin/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Plugin {
}

// Hipcheck-facing version of struct from crate::hipcheck
#[derive(Clone, Debug)]
pub struct Schema {
pub query_name: String,
pub key_schema: Value,
Expand Down Expand Up @@ -118,6 +119,7 @@ impl std::fmt::Display for ConfigError {
}

// State for managing an actively running plugin process
#[derive(Debug)]
pub struct PluginContext {
pub plugin: Plugin,
pub port: u16,
Expand Down Expand Up @@ -273,6 +275,7 @@ impl TryFrom<Query> for PluginQuery {
}
}

#[derive(Debug)]
pub struct MultiplexedQueryReceiver {
rx: Streaming<PluginQuery>,
backlog: HashMap<i32, VecDeque<PluginQuery>>,
Expand Down Expand Up @@ -314,6 +317,7 @@ impl MultiplexedQueryReceiver {

// Encapsulate an "initialized" state of a Plugin with interfaces that abstract
// query chunking to produce whole messages for the Hipcheck engine
#[derive(Debug)]
pub struct PluginTransport {
pub schemas: HashMap<String, Schema>,
pub default_policy_expr: String, // TODO - update with policy_expr type
Expand Down

0 comments on commit 4ea121c

Please sign in to comment.