Skip to content

Commit

Permalink
WIP push requests into the queue (no priority yet)
Browse files Browse the repository at this point in the history
Still to do:
- consuming loop not done
- figure out if we can do consuming loop somehow in the actor thread or
we need to spanw thread ( and change request sender not to receive self)
  • Loading branch information
RaduW committed Jul 24, 2020
1 parent 2a3c783 commit 830bb9e
Showing 1 changed file with 117 additions and 57 deletions.
174 changes: 117 additions & 57 deletions relay-server/src/actors/upstream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! This actor can be used for sending signed requests to the upstream relay.
use std::borrow::Cow;
use std::collections::VecDeque;
use std::str;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use ::actix::fut;
use ::actix::prelude::*;
Expand All @@ -25,6 +26,8 @@ use relay_quotas::{
use crate::actors::outcome::SendOutcomes;
use crate::actors::project_upstream::GetProjectStates;
use crate::utils;
use futures::future::Shared;
use futures::sync::oneshot;

#[derive(Fail, Debug)]
pub enum UpstreamRequestError {
Expand All @@ -48,6 +51,9 @@ pub enum UpstreamRequestError {

#[fail(display = "upstream request returned error {}", _0)]
ResponseError(StatusCode),

#[fail(display = "channel closed")]
ChannelClosed,
}

/// Represents the current auth state.
Expand Down Expand Up @@ -132,12 +138,20 @@ impl UpstreamRateLimits {
}
}

struct UpstreamRequest {
response_sender: oneshot::Sender<Result<ClientResponse, UpstreamRequestError>>,
method: Method,
path: String,
build: Box<dyn Send + FnOnce(&mut ClientRequestBuilder) -> Result<ClientRequest, ActixError>>,
}

pub struct UpstreamRelay {
backoff: RetryBackoff,
config: Arc<Config>,
auth_state: AuthState,
max_inflight_requests: usize,
num_inflight_requests: Arc<AtomicUsize>,
messages: Arc<Mutex<VecDeque<UpstreamRequest>>>,
}

impl UpstreamRelay {
Expand All @@ -149,6 +163,7 @@ impl UpstreamRelay {
// TODO get the real value from a config and use it with ClientConnector::limit
max_inflight_requests: 100,
num_inflight_requests: Arc::new(AtomicUsize::new(0)),
messages: Arc::new(Mutex::new(VecDeque::new())),
}
}

Expand All @@ -168,68 +183,32 @@ impl UpstreamRelay {
build: F,
) -> ResponseFuture<ClientResponse, UpstreamRequestError>
where
F: FnOnce(&mut ClientRequestBuilder) -> Result<ClientRequest, ActixError>,
F: 'static + Send + FnOnce(&mut ClientRequestBuilder) -> Result<ClientRequest, ActixError>,
P: AsRef<str>,
{
let host_header = self
.config
.http_host_header()
.unwrap_or_else(|| self.config.upstream_descriptor().host());
let (tx, rx) = oneshot::channel::<Result<ClientResponse, UpstreamRequestError>>();

let mut builder = ClientRequest::build();
builder
.method(method)
.uri(self.config.upstream_descriptor().get_url(path.as_ref()))
.set_header("Host", host_header);
let request = UpstreamRequest {
method: method,
path: path.as_ref().to_owned(),
response_sender: tx,
build: Box::new(build),
};

if let Some(ref credentials) = self.config.credentials() {
builder.header("X-Sentry-Relay-Id", credentials.id.to_string());
match self.messages.lock() {
Ok(mut queue) => queue.push_front(request),
Err(_) => log::error!(
"Could not access the message queue in UpstreamRelay,\nthread died without releasing the lock."
),
}

let request = tryf!(build(&mut builder).map_err(UpstreamRequestError::BuildFailed));
let future = request
.send()
// We currently use the main connection pool size limit to control how many events get
// sent out at once, and "queue" up the rest (queueing means that there are a lot of
// futures hanging around, waiting for an open connection). We need to adjust this
// timeout to prevent the queued events from timing out while waiting for a free
// connection in the pool.
//
// This may not be good enough in the long run. Right now, filling up the "request
// queue" means that requests unrelated to `store` (queries, proxied/forwarded requests)
// are blocked by store requests. Ideally, those requests would bypass this queue.
//
// Two options come to mind:
// 1. Have own connection pool for `store` requests
// 2. Buffer up/queue/synchronize events before creating the request
.wait_timeout(self.config.event_buffer_expiry())
// This is the timeout after wait + connect.
.timeout(self.config.http_timeout())
.map_err(UpstreamRequestError::SendFailed)
.and_then(|response| match response.status() {
StatusCode::TOO_MANY_REQUESTS => {
let headers = response.headers();
let retry_after = headers
.get(header::RETRY_AFTER)
.and_then(|v| v.to_str().ok());

let rate_limits = headers
.get_all(utils::RATE_LIMITS_HEADER)
.iter()
.filter_map(|v| v.to_str().ok())
.join(", ");

let upstream_limits = UpstreamRateLimits::new()
.retry_after(retry_after)
.rate_limits(rate_limits);

Err(UpstreamRequestError::RateLimited(upstream_limits))
}
code if !code.is_success() => Err(UpstreamRequestError::ResponseError(code)),
_ => Ok(response),
});
let future = rx
// map errors caused by the oneshot channel being closed (unlikely)
.map_err(|_| UpstreamRequestError::ChannelClosed)
//unwrap the result (this is how we transport the http failure through the channel)
.and_then(|result| result);

Box::new(future)
return Box::new(future);
}

fn send_query<Q: UpstreamQuery>(
Expand Down Expand Up @@ -263,6 +242,87 @@ impl UpstreamRelay {

Box::new(future)
}
fn request_sender(&self, request: UpstreamRequest) {
let UpstreamRequest {
response_sender,
method,
path,
build,
} = request;

let host_header = self
.config
.http_host_header()
.unwrap_or_else(|| self.config.upstream_descriptor().host());

let mut builder = ClientRequest::build();
builder
.method(method)
.uri(self.config.upstream_descriptor().get_url(path.as_ref()))
.set_header("Host", host_header);

if let Some(ref credentials) = self.config.credentials() {
builder.header("X-Sentry-Relay-Id", credentials.id.to_string());
}

//TODO this is work in progress
let future = build(&mut builder)
.map_err(|e| {
response_sender.send(Err(UpstreamRequestError::BuildFailed(e)));
()
})
.map(|client_request| {
client_request
.send()
// We currently use the main connection pool size limit to control how many events get
// sent out at once, and "queue" up the rest (queueing means that there are a lot of
// futures hanging around, waiting for an open connection). We need to adjust this
// timeout to prevent the queued events from timing out while waiting for a free
// connection in the pool.
//
// This may not be good enough in the long run. Right now, filling up the "request
// queue" means that requests unrelated to `store` (queries, proxied/forwarded requests)
// are blocked by store requests. Ideally, those requests would bypass this queue.
//
// Two options come to mind:
// 1. Have own connection pool for `store` requests
// 2. Buffer up/queue/synchronize events before creating the request
.wait_timeout(self.config.event_buffer_expiry())
// This is the timeout after wait + connect.
.timeout(self.config.http_timeout())
//TODO send error into the channel
.map_err(UpstreamRequestError::SendFailed)
.and_then(|response| match response.status() {
StatusCode::TOO_MANY_REQUESTS => {
let headers = response.headers();
let retry_after = headers
.get(header::RETRY_AFTER)
.and_then(|v| v.to_str().ok());

let rate_limits = headers
.get_all(utils::RATE_LIMITS_HEADER)
.iter()
.filter_map(|v| v.to_str().ok())
.join(", ");

let upstream_limits = UpstreamRateLimits::new()
.retry_after(retry_after)
.rate_limits(rate_limits);
//TODO send error into the channel
Err(UpstreamRequestError::RateLimited(upstream_limits))
}
code if !code.is_success() => {
//TODO send error into the channel
Err(UpstreamRequestError::ResponseError(code))
}
//TODO send success into the channel
_ => Ok(response),
})
});
// TODO RaduW send to the channel on error and on resolve
// TODO see what to do with the future (spawn it or return it to be spawned by caller)
// TOOD this should be called from the queue emptying loop
}
}

impl Actor for UpstreamRelay {
Expand Down Expand Up @@ -416,7 +476,7 @@ impl<B> Message for SendRequest<B> {

impl<B> Handler<SendRequest<B>> for UpstreamRelay
where
B: RequestBuilder,
B: RequestBuilder + Send,
{
type Result = ResponseFuture<(), UpstreamRequestError>;

Expand Down

0 comments on commit 830bb9e

Please sign in to comment.