Skip to content

Commit

Permalink
add two retry strategies to allow requests to timeout gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed May 3, 2024
1 parent c42a40a commit 4477bec
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ either = "1.9.0"
futures = "0.3.26"
futures-retry = "0.6.0"
hex = "0.4.3"
httpmock = { version = "0.6.8", default-features = false }
httpmock = { version = "0.7.0", default-features = false }
image = { version = "0.25.0", default-features = false }
indexmap = "1.9.2"
indicatif = "0.17.3"
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-api-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rustls-tls = ["reqwest/rustls-tls-native-roots"]

[dev-dependencies]
http = "0.2.9"
httpmock = { workspace = true }
port_scanner = { workspace = true }
test-case = { workspace = true }
turborepo-vercel-api-mock = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/turborepo-api-client/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ impl AnalyticsClient for APIClient {
.await?
.json(&events);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down
64 changes: 44 additions & 20 deletions crates/turborepo-api-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(async_closure)]
#![feature(error_generic_member_access)]
#![feature(assert_matches)]
#![deny(clippy::all)]

use std::{backtrace::Backtrace, env, future::Future, time::Duration};
Expand Down Expand Up @@ -134,9 +135,11 @@ impl Client for APIClient {
.header("User-Agent", self.user_agent.clone())
.header("Authorization", format!("Bearer {}", token))
.header("Content-Type", "application/json");
let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -149,9 +152,11 @@ impl Client for APIClient {
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand Down Expand Up @@ -194,9 +199,11 @@ impl Client for APIClient {
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -208,9 +215,11 @@ impl Client for APIClient {
.query(&[("token", token), ("tokenName", token_name)])
.header("User-Agent", self.user_agent.clone());

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

let verification_response: VerificationResponse = response.json().await?;

Expand Down Expand Up @@ -310,7 +319,9 @@ impl CacheClient for APIClient {

request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout).await?;
let response = response.into_response();

match response.status() {
StatusCode::FORBIDDEN => Err(Self::handle_403(response).await),
Expand Down Expand Up @@ -391,7 +402,10 @@ impl CacheClient for APIClient {
request_builder = request_builder.header("x-artifact-tag", tag);
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Connection)
.await?
.into_response();

if response.status() == StatusCode::FORBIDDEN {
return Err(Self::handle_403(response).await);
Expand All @@ -416,9 +430,11 @@ impl CacheClient for APIClient {

let request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand Down Expand Up @@ -451,7 +467,9 @@ impl TokenClient for APIClient {
invalid_token: bool,
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout).await?;
let response = response.into_response();
let status = response.status();
// Give a better error message for invalid tokens. This endpoint returns the
// following statuses:
Expand Down Expand Up @@ -503,7 +521,10 @@ impl TokenClient for APIClient {
invalid_token: bool,
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response();
let status = response.status();
// Give a better error message for invalid tokens. This endpoint returns the
// following statuses:
Expand Down Expand Up @@ -604,7 +625,10 @@ impl APIClient {
.header("Access-Control-Request-Headers", request_headers)
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response();

let headers = response.headers();
let location = if let Some(location) = headers.get("Location") {
Expand Down
129 changes: 118 additions & 11 deletions crates/turborepo-api-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ const MIN_SLEEP_TIME_SECS: u64 = 2;
const MAX_SLEEP_TIME_SECS: u64 = 10;
const RETRY_MAX: u32 = 2;

#[derive(Debug)]
pub enum Retry {
Once(Response),
#[allow(dead_code)]
Retried(Response, u32),
}

impl Retry {
pub fn into_response(self) -> Response {
match self {
Retry::Once(response) => response,
Retry::Retried(response, _) => response,
}
}

#[allow(dead_code)]
pub fn retry_count(&self) -> Option<u32> {
match self {
Retry::Once(_) => None,
Retry::Retried(_, count) => Some(*count),
}
}
}

/// Retries a request until `RETRY_MAX` is reached, the `should_retry_request`
/// function returns false, or the future succeeds. Uses an exponential backoff
/// with a base of 2 to delay between retries.
Expand All @@ -15,11 +39,13 @@ const RETRY_MAX: u32 = 2;
///
/// * `request_builder`: The request builder with everything, i.e. headers and
/// body already set. NOTE: This must be cloneable, so no streams are allowed.
/// * `strategy`: The strategy to use for retrying requests.
///
/// returns: Result<Response, Error>
pub(crate) async fn make_retryable_request(
request_builder: RequestBuilder,
) -> Result<Response, Error> {
strategy: RetryStrategy,
) -> Result<Retry, Error> {
let mut last_error = None;
for retry_count in 0..RETRY_MAX {
// A request builder can fail to clone for two reasons:
Expand All @@ -28,12 +54,12 @@ pub(crate) async fn make_retryable_request(
// - the request body is a stream, in this case we'll just send the one request
// we have
let Some(builder) = request_builder.try_clone() else {
return Ok(request_builder.send().await?);
return Ok(Retry::Once(request_builder.send().await?));
};
match builder.send().await {
Ok(value) => return Ok(value),
Ok(value) => return Ok(Retry::Retried(value, retry_count)),
Err(err) => {
if !should_retry_request(&err) {
if !strategy.should_retry(&err) {
return Err(err.into());
}
last_error = Some(err);
Expand All @@ -49,16 +75,97 @@ pub(crate) async fn make_retryable_request(
Err(Error::TooManyFailures(Box::new(last_error.unwrap())))
}

fn should_retry_request(error: &reqwest::Error) -> bool {
if let Some(status) = error.status() {
if status == StatusCode::TOO_MANY_REQUESTS {
return true;
/// A retry strategy. Note that error statuses and TOO_MANY_REQUESTS are always
/// retried.
pub enum RetryStrategy {
/// Retry in the case of connection issues, but ignore timeouts.
Connection,
/// Retry in the case of connection issues and timeouts.
Timeout,
}

impl RetryStrategy {
fn should_retry(&self, error: &reqwest::Error) -> bool {
if let Some(status) = error.status() {
if status == StatusCode::TOO_MANY_REQUESTS {
return true;
}

if status.as_u16() >= 500 && status.as_u16() != 501 {
return true;
}
}

if status.as_u16() >= 500 && status.as_u16() != 501 {
return true;
match self {
RetryStrategy::Connection => error.is_connect(),
RetryStrategy::Timeout => error.is_timeout(),
}
}
}

error.is_request() || error.is_timeout()
#[cfg(test)]
mod test {
use std::{assert_matches::assert_matches, time::Duration};

use crate::{
retry::{make_retryable_request, RetryStrategy},
Error,
};

#[tokio::test]
async fn handles_too_many_failures() {
let mock = httpmock::MockServer::start_async().await;
let req = mock
.mock_async(|when, then| {
when.method(httpmock::Method::GET);
then.delay(Duration::from_secs(100));
})
.await;

let request_builder = reqwest::Client::new()
.get(mock.url("/"))
.timeout(Duration::from_millis(10));
let result = make_retryable_request(request_builder, RetryStrategy::Timeout).await;

req.assert_hits_async(2).await;
assert_matches!(result, Err(Error::TooManyFailures(_)));
}

#[tokio::test]
async fn handles_connection_timeout() {
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(10))
.build()
.unwrap();

let request_builder = client.get("http://localhost:1").send().await; // bad port
let should_retry = RetryStrategy::Connection.should_retry(&request_builder.unwrap_err());

assert_matches!(should_retry, true);
}

#[tokio::test]
async fn handles_connection_timeout_retries() {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(20))
.connect_timeout(Duration::from_millis(10))
.build()
.unwrap();

let mock = httpmock::MockServer::start_async().await;
let req = mock
.mock_async(|when, then| {
when.method(httpmock::Method::GET);
then.delay(Duration::from_secs(100));
})
.await;

let request_builder = client.get(mock.url("/")); // bad port
let result = make_retryable_request(request_builder, RetryStrategy::Connection).await;

// we should make at most one request and give up if it times out after
// connecting
assert_matches!(result, Err(_));
req.assert_hits_async(1).await;
}
}
14 changes: 9 additions & 5 deletions crates/turborepo-api-client/src/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ impl APIClient {
.await?
.json(&payload);

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -176,8 +178,9 @@ impl APIClient {
.await?
.json(&task);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand All @@ -201,8 +204,9 @@ impl APIClient {
.await?
.json(&payload);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion crates/turborepo-api-client/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ impl TelemetryClient for AnonAPIClient {
.header("x-turbo-session-id", session_id)
.json(&events);

retry::make_retryable_request(telemetry_request)
retry::make_retryable_request(telemetry_request, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down

0 comments on commit 4477bec

Please sign in to comment.