Skip to content

Commit

Permalink
WIP added send message loop.
Browse files Browse the repository at this point in the history
- still missing free http connection counters
  • Loading branch information
RaduW committed Jul 27, 2020
1 parent 830bb9e commit 71f9123
Showing 1 changed file with 124 additions and 83 deletions.
207 changes: 124 additions & 83 deletions relay-server/src/actors/upstream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! This actor can be used for sending signed requests to the upstream relay.
use std::borrow::Cow;
use std::collections::VecDeque;
use std::str;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::{str, thread, time};

use ::actix::fut;
use ::actix::prelude::*;
Expand Down Expand Up @@ -145,6 +145,128 @@ struct UpstreamRequest {
build: Box<dyn Send + FnOnce(&mut ClientRequestBuilder) -> Result<ClientRequest, ActixError>>,
}

/// Structure controlling the http request loop
struct UpstreamHttpRequestLoopState {
config: Arc<Config>,
max_inflight_requests: usize,
num_inflight_requests: Arc<AtomicUsize>,
messages: Arc<Mutex<VecDeque<UpstreamRequest>>>,
}

impl UpstreamHttpRequestLoopState {
fn send_http_request(&self, request: UpstreamRequest) {
let UpstreamRequest {
response_sender,
method,
path,
build,
} = request;

let host_header = self
.config
.http_host_header()
.unwrap_or_else(|| self.config.upstream_descriptor().host());

let mut builder = ClientRequest::build();
builder
.method(method)
.uri(self.config.upstream_descriptor().get_url(path.as_ref()))
.set_header("Host", host_header);

if let Some(ref credentials) = self.config.credentials() {
builder.header("X-Sentry-Relay-Id", credentials.id.to_string());
}

//TODO this is work in progress
let future = build(&mut builder)
.map_err(|e| {
response_sender.send(Err(UpstreamRequestError::BuildFailed(e)));
()
})
.map(|client_request| {
client_request
.send()
// We currently use the main connection pool size limit to control how many events get
// sent out at once, and "queue" up the rest (queueing means that there are a lot of
// futures hanging around, waiting for an open connection). We need to adjust this
// timeout to prevent the queued events from timing out while waiting for a free
// connection in the pool.
//
// This may not be good enough in the long run. Right now, filling up the "request
// queue" means that requests unrelated to `store` (queries, proxied/forwarded requests)
// are blocked by store requests. Ideally, those requests would bypass this queue.
//
// Two options come to mind:
// 1. Have own connection pool for `store` requests
// 2. Buffer up/queue/synchronize events before creating the request
.wait_timeout(self.config.event_buffer_expiry())
// This is the timeout after wait + connect.
.timeout(self.config.http_timeout())
//TODO send error into the channel
.map_err(UpstreamRequestError::SendFailed)
.and_then(|response| match response.status() {
StatusCode::TOO_MANY_REQUESTS => {
let headers = response.headers();
let retry_after = headers
.get(header::RETRY_AFTER)
.and_then(|v| v.to_str().ok());

let rate_limits = headers
.get_all(utils::RATE_LIMITS_HEADER)
.iter()
.filter_map(|v| v.to_str().ok())
.join(", ");

let upstream_limits = UpstreamRateLimits::new()
.retry_after(retry_after)
.rate_limits(rate_limits);
//TODO send error into the channel
Err(UpstreamRequestError::RateLimited(upstream_limits))
}
code if !code.is_success() => {
//TODO send error into the channel
Err(UpstreamRequestError::ResponseError(code))
}
//TODO send success into the channel
_ => Ok(response),
})
});
// TODO RaduW send to the channel on error and on resolve
// TODO see what to do with the future (spawn it or return it to be spawned by caller)
// TOOD this should be called from the queue emptying loop
}
}

/// runs the request loop
fn upstream_http_request_loop(loop_state: UpstreamHttpRequestLoopState) {
//TODO should we make this configurable
let sleep_time = time::Duration::from_millis(5);
loop {
// if we have free connections
while loop_state.num_inflight_requests.load(Ordering::AcqRel)
< loop_state.max_inflight_requests
{
let mut message = None;
match loop_state.messages.lock() {
Ok(mut queue) => {
message = queue.pop_back();
}
Err(_) => {
log::error!(
"Could not access the message queue in UpstreamRelay,\nthread died without releasing the lock."
);
break; //TODO should we panic here ?
}
}

if let Some(message) = message {
loop_state.send_http_request(message)
}
}
thread::sleep(sleep_time)
}
}

pub struct UpstreamRelay {
backoff: RetryBackoff,
config: Arc<Config>,
Expand Down Expand Up @@ -208,7 +330,7 @@ impl UpstreamRelay {
//unwrap the result (this is how we transport the http failure through the channel)
.and_then(|result| result);

return Box::new(future);
Box::new(future)
}

fn send_query<Q: UpstreamQuery>(
Expand Down Expand Up @@ -242,87 +364,6 @@ impl UpstreamRelay {

Box::new(future)
}
fn request_sender(&self, request: UpstreamRequest) {
let UpstreamRequest {
response_sender,
method,
path,
build,
} = request;

let host_header = self
.config
.http_host_header()
.unwrap_or_else(|| self.config.upstream_descriptor().host());

let mut builder = ClientRequest::build();
builder
.method(method)
.uri(self.config.upstream_descriptor().get_url(path.as_ref()))
.set_header("Host", host_header);

if let Some(ref credentials) = self.config.credentials() {
builder.header("X-Sentry-Relay-Id", credentials.id.to_string());
}

//TODO this is work in progress
let future = build(&mut builder)
.map_err(|e| {
response_sender.send(Err(UpstreamRequestError::BuildFailed(e)));
()
})
.map(|client_request| {
client_request
.send()
// We currently use the main connection pool size limit to control how many events get
// sent out at once, and "queue" up the rest (queueing means that there are a lot of
// futures hanging around, waiting for an open connection). We need to adjust this
// timeout to prevent the queued events from timing out while waiting for a free
// connection in the pool.
//
// This may not be good enough in the long run. Right now, filling up the "request
// queue" means that requests unrelated to `store` (queries, proxied/forwarded requests)
// are blocked by store requests. Ideally, those requests would bypass this queue.
//
// Two options come to mind:
// 1. Have own connection pool for `store` requests
// 2. Buffer up/queue/synchronize events before creating the request
.wait_timeout(self.config.event_buffer_expiry())
// This is the timeout after wait + connect.
.timeout(self.config.http_timeout())
//TODO send error into the channel
.map_err(UpstreamRequestError::SendFailed)
.and_then(|response| match response.status() {
StatusCode::TOO_MANY_REQUESTS => {
let headers = response.headers();
let retry_after = headers
.get(header::RETRY_AFTER)
.and_then(|v| v.to_str().ok());

let rate_limits = headers
.get_all(utils::RATE_LIMITS_HEADER)
.iter()
.filter_map(|v| v.to_str().ok())
.join(", ");

let upstream_limits = UpstreamRateLimits::new()
.retry_after(retry_after)
.rate_limits(rate_limits);
//TODO send error into the channel
Err(UpstreamRequestError::RateLimited(upstream_limits))
}
code if !code.is_success() => {
//TODO send error into the channel
Err(UpstreamRequestError::ResponseError(code))
}
//TODO send success into the channel
_ => Ok(response),
})
});
// TODO RaduW send to the channel on error and on resolve
// TODO see what to do with the future (spawn it or return it to be spawned by caller)
// TOOD this should be called from the queue emptying loop
}
}

impl Actor for UpstreamRelay {
Expand Down

0 comments on commit 71f9123

Please sign in to comment.