Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvements for plugins implementation #74

Merged
merged 5 commits into from
Aug 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ tracing-subscriber = "0.3.17"
async-trait = "0.1.73"
hyper-tls = "0.5.0"
axum-macros = "0.3.8"
sync_wrapper = "0.1.2"
tokio-util = { version = "0.7.8", features = ["io", "compat"] }

[dev-dependencies]
Expand Down
6 changes: 5 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub struct ConductorConfig {
pub logger: LoggerConfig,
pub sources: Vec<SourceDefinition>,
pub endpoints: Vec<EndpointDefinition>,
pub global_plugins: Option<Vec<PluginDefinition>>,
}

#[derive(Deserialize, Debug, Clone)]
Expand All @@ -23,6 +24,9 @@ pub struct EndpointDefinition {
pub enum PluginDefinition {
#[serde(rename = "verbose_logging")]
VerboseLogging,

#[serde(rename = "json_content_type_response")]
JSONContentTypeResponse,
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -91,7 +95,7 @@ pub struct GraphQLSourceConfig {
pub endpoint: String,
}

#[tracing::instrument]
#[tracing::instrument(level = "trace")]
pub async fn load_config(file_path: &String) -> ConductorConfig {
let path = Path::new(file_path);
let contents = read_to_string(file_path).expect("Failed to read config file");
Expand Down
18 changes: 7 additions & 11 deletions src/endpoint/endpoint_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
};
use axum::body::Body;
use serde_json::json;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

pub type EndpointResponse = hyper::Response<Body>;

Expand Down Expand Up @@ -39,7 +39,7 @@
pub struct EndpointRuntime {
pub config: EndpointDefinition,
pub plugin_manager: Arc<PluginManager>,
pub upstream: Arc<Mutex<dyn SourceService + Send>>,
pub upstream: Arc<Box<dyn SourceService + Send>>,
}

impl EndpointRuntime {
Expand All @@ -50,7 +50,7 @@
) -> Self {
Self {
config: endpoint_config,
upstream: Arc::new(Mutex::new(source)),
upstream: Arc::new(Box::new(source)),
plugin_manager,
}
}
Expand All @@ -63,16 +63,12 @@
Some(source_request) => {
// DOTAN: Can we avoid cloning here?
let upstream_request = SourceRequest::from_parts(
source_request.operation_name.clone(),
source_request.query.clone(),
Some(source_request.variables.clone()),
source_request.operation_name.as_ref().map(|e| e.as_str()),

Check warning on line 66 in src/endpoint/endpoint_runtime.rs

View workflow job for this annotation

GitHub Actions / clippy

called `.as_ref().map(|e| e.as_str())` on an Option value. This can be done more directly by calling `source_request.operation_name.as_deref()` instead

warning: called `.as_ref().map(|e| e.as_str())` on an Option value. This can be done more directly by calling `source_request.operation_name.as_deref()` instead --> src/endpoint/endpoint_runtime.rs:66:21 | 66 | source_request.operation_name.as_ref().map(|e| e.as_str()), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try using as_deref instead: `source_request.operation_name.as_deref()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#option_as_ref_deref = note: `#[warn(clippy::option_as_ref_deref)]` on by default
source_request.query.as_ref(),
Some(&source_request.variables),
);

let source_result = self
.upstream
.lock()
.expect("upstream service lock coudln't be acquired")
.call(upstream_request);
let source_result = self.upstream.call(upstream_request);

// DOTAN: We probably need some kind of handling for network-related errors here,
// I guess some kind of static "upstream is not healthy" error response?
Expand Down
38 changes: 26 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::config::SourceDefinition;
use crate::plugins::plugin_manager::PluginManager;
use crate::source::graphql_source::GraphQLSourceService;

#[derive(Clone)]
pub struct RouterState {
pub plugin_manager: Arc<PluginManager>,
}
Expand All @@ -37,7 +36,7 @@ pub async fn serve_graphiql_ide(req: Request<Body>) -> impl IntoResponse {
pub async fn handle_post(
Extension(endpoint): Extension<EndpointRuntime>,
headers: HeaderMap,
body_stream: BodyStream,
mut body_stream: BodyStream,
) -> impl IntoResponse {
// This represents the main context that is shared across the execution.
// We'll use this struct to pass data between the plugins, endpoint and source.
Expand All @@ -50,7 +49,9 @@ pub async fn handle_post(
// Execute plugins on the HTTP request.
// Later we need to think how to expose things like the request body to the
// plugins, if needed,without having to read it twice.
flow_ctx = endpoint.plugin_manager.on_downstream_http_request(flow_ctx);
endpoint
.plugin_manager
.on_downstream_http_request(&mut flow_ctx);

// In case the response was set by one of the plugins at this stage, just short-circuit and return it.
if let Some(sc_response) = flow_ctx.short_circuit_response {
Expand All @@ -60,30 +61,33 @@ pub async fn handle_post(
// In case the GraphQL operation was not extracted from the HTTP request yet, do it now.
// This is done in order to allow plugins to override/set the GraphQL operation, for use-cases like persisted operations.
if flow_ctx.downstream_graphql_request.is_none() {
flow_ctx = flow_ctx
.extract_graphql_request_from_http_request(body_stream)
flow_ctx
.extract_graphql_request_from_http_request(&mut body_stream)
.await;
}

// Execute plugins on the GraphQL request.
flow_ctx = endpoint
endpoint
.plugin_manager
.on_downstream_graphql_request(flow_ctx);
.on_downstream_graphql_request(&mut flow_ctx);

// In case the response was set by one of the plugins at this stage, just short-circuit and return it.
if let Some(sc_response) = flow_ctx.short_circuit_response {
return sc_response;
}

// Run the actual endpoint handler and get the response.
let endpoint_response = endpoint.handle_request(flow_ctx).await;
flow_ctx = endpoint_response.0;
let (mut flow_ctx, mut endpoint_response) = endpoint.handle_request(flow_ctx).await;

endpoint
.plugin_manager
.on_downstream_http_response(&mut flow_ctx);

endpoint
.plugin_manager
.on_downstream_http_response(flow_ctx);
.on_upstream_graphql_response(&mut endpoint_response);

match endpoint_response.1 {
match endpoint_response {
Ok(response) => response,
Err(e) => e.into(),
}
Expand All @@ -104,8 +108,18 @@ pub async fn run_services(config_file_path: String) {
let server_config = config_object.server.clone();
let mut http_router = Router::new();

let global_plugins = &config_object.global_plugins;

for endpoint_config in config_object.endpoints.into_iter() {
let plugin_manager = Arc::new(PluginManager::new(&endpoint_config.plugins));
let combined_plugins = global_plugins
.iter()
.chain(&endpoint_config.plugins)
.flat_map(|vec| vec.iter())
.cloned()
.collect::<Vec<_>>();

let plugin_manager = Arc::new(PluginManager::new(&Some(combined_plugins)));

let upstream_source = config_object
.sources
.iter()
Expand Down
23 changes: 11 additions & 12 deletions src/plugins/core.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use std::fmt::Debug;

use crate::source::base_source::SourceRequest;
use hyper::Body;

use crate::{endpoint::endpoint_runtime::EndpointError, source::base_source::SourceRequest};

use super::flow_context::FlowContext;

pub trait Plugin: Sync + Send {
fn on_downstream_http_request(&self, mut _ctx: FlowContext) -> FlowContext {
_ctx
}
fn on_downstream_http_response(&self, mut _ctx: FlowContext) -> FlowContext {
_ctx
}
fn on_downstream_graphql_request(&self, mut _ctx: FlowContext) -> FlowContext {
_ctx
}
fn on_upstream_graphql_request(&self, mut _req: SourceRequest) -> SourceRequest {
_req
fn on_downstream_http_request(&self, _ctx: &mut FlowContext) {}
fn on_downstream_http_response(&self, _ctx: &mut FlowContext) {}
fn on_downstream_graphql_request(&self, _ctx: &mut FlowContext) {}
fn on_upstream_graphql_request<'a>(&self, _req: &mut SourceRequest<'a>) {}

Check warning on line 13 in src/plugins/core.rs

View workflow job for this annotation

GitHub Actions / clippy

the following explicit lifetimes could be elided: 'a

warning: the following explicit lifetimes could be elided: 'a --> src/plugins/core.rs:13:5 | 13 | fn on_upstream_graphql_request<'a>(&self, _req: &mut SourceRequest<'a>) {} | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes = note: `#[warn(clippy::needless_lifetimes)]` on by default help: elide the lifetimes | 13 - fn on_upstream_graphql_request<'a>(&self, _req: &mut SourceRequest<'a>) {} 13 + fn on_upstream_graphql_request(&self, _req: &mut SourceRequest<'_>) {} |
fn on_upstream_graphql_response<'a>(

Check warning on line 14 in src/plugins/core.rs

View workflow job for this annotation

GitHub Actions / clippy

this lifetime isn't used in the function definition

warning: this lifetime isn't used in the function definition --> src/plugins/core.rs:14:37 | 14 | fn on_upstream_graphql_response<'a>( | ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes = note: `#[warn(clippy::extra_unused_lifetimes)]` on by default
&self,
_response: &mut Result<hyper::Response<Body>, EndpointError>,
) {
}
}

Expand Down
9 changes: 4 additions & 5 deletions src/plugins/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ pub struct FlowContext {
}

impl FlowContext {
#[tracing::instrument]
#[tracing::instrument(level = "trace")]
pub async fn extract_graphql_request_from_http_request(
mut self,
body_stream: BodyStream,
) -> Self {
&mut self,
body_stream: &mut BodyStream,
) {
let content_type = self
.downstream_headers
.get(axum::http::header::CONTENT_TYPE)
Expand All @@ -41,6 +41,5 @@ impl FlowContext {
.unwrap();

self.downstream_graphql_request = Some(graphql_request);
self
}
}
18 changes: 18 additions & 0 deletions src/plugins/json_content_type_response_plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::endpoint::endpoint_runtime::EndpointError;

use super::core::Plugin;
use hyper::{
header::{HeaderValue, CONTENT_TYPE},
Body,
};

pub struct JSONContentTypePlugin {}

impl Plugin for JSONContentTypePlugin {
fn on_upstream_graphql_response(&self, req: &mut Result<hyper::Response<Body>, EndpointError>) {
if let Ok(res) = req {
let headers = res.headers_mut();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
}
}
}
1 change: 1 addition & 0 deletions src/plugins/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod core;
pub mod flow_context;
pub mod json_content_type_response_plugin;
pub mod plugin_manager;
pub mod verbose_logging_plugin;
Loading
Loading