Skip to content

Commit

Permalink
fix(server): Consume all upstream responses (#680)
Browse files Browse the repository at this point in the history
Relays do not reuse connections for store requests. The default response
transformer for upstream requests does not consume the response payload,
which is a potential cause for this.
  • Loading branch information
jan-auer authored Jul 28, 2020
1 parent dfbf98a commit eba6e4b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

**Bug Fixes**:

- Reuse connections for upstream event submission requests when the server supports connection keepalive. Relay did not consume the response body of all requests, which caused it to reopen a new connection for every event. ([#680](https://github.com/getsentry/relay/pull/680))

**Internal**:

- Extract the event `timestamp` from Minidump files during event normalization. ([#662](https://github.com/getsentry/relay/pull/662))
Expand Down
17 changes: 13 additions & 4 deletions relay-server/src/actors/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::sync::Arc;
use ::actix::fut;
use ::actix::prelude::*;
use actix_web::client::{ClientRequest, ClientRequestBuilder, ClientResponse, SendRequestError};
use actix_web::error::{JsonPayloadError, PayloadError};
use actix_web::http::{header, Method, StatusCode};
use actix_web::{error::JsonPayloadError, Error as ActixError, HttpMessage};
use actix_web::{Error as ActixError, HttpMessage};
use failure::Fail;
use futures::prelude::*;
use itertools::Itertools;
Expand Down Expand Up @@ -40,6 +41,9 @@ pub enum UpstreamRequestError {
#[fail(display = "failed to create upstream request: {}", _0)]
BuildFailed(ActixError),

#[fail(display = "failed to receive response from upstream")]
ResponseFailed(#[cause] PayloadError),

#[fail(display = "upstream requests rate limited")]
RateLimited(UpstreamRateLimits),

Expand Down Expand Up @@ -375,10 +379,15 @@ where
}

impl ResponseTransformer for () {
type Result = Result<(), UpstreamRequestError>;
type Result = ResponseFuture<(), UpstreamRequestError>;

fn transform_response(self, response: ClientResponse) -> Self::Result {
let future = response
.payload()
.for_each(|_| Ok(()))
.map_err(UpstreamRequestError::ResponseFailed);

fn transform_response(self, _: ClientResponse) -> Self::Result {
Ok(())
Box::new(future)
}
}

Expand Down
10 changes: 6 additions & 4 deletions relay-server/src/middlewares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ pub struct ReadRequestMiddleware;

impl<S> Middleware<S> for ReadRequestMiddleware {
fn response(&self, req: &HttpRequest<S>, resp: HttpResponse) -> Result<Response, Error> {
let consume_request = req.payload().for_each(|_| Ok(()));
let future = req
.payload()
.for_each(|_| Ok(()))
.map(|_| resp)
.map_err(Error::from);

Ok(Response::Future(Box::new(
consume_request.map(|_| resp).map_err(Error::from),
)))
Ok(Response::Future(Box::new(future)))
}
}

0 comments on commit eba6e4b

Please sign in to comment.