Skip to content

Commit

Permalink
fix(cache): set content length for put artifact (#9183)
Browse files Browse the repository at this point in the history
### Description

Fixes #9177

In #8081 we changed from setting the body of our `PUT` requests from a
vec of bytes to a stream of bytes.

This results in the underlying `hyper` request having a body with
`Kind::Wrapped` instead of `Kind::Once`. This results in the body no
longer having an [exact
length](https://github.com/hyperium/hyper/blob/0.14.x/src/body/body.rs#L437).
With the body no longer having an exact length, `hyper` would no longer
set `Content-Length` for us
[source](https://github.com/hyperium/hyper/blob/0.14.x/src/proto/h2/client.rs#L377).

This PR explicitly sets the content length header. It would be nice if
we could set the length on the body itself, but `hyper` doesn't allow
for this flexibility. (We cannot simply implement a size hint on
`UploadProgress`, but the size hint should return the size of the
stream, not the number of bytes in the stream)

### Testing Instructions

Added an assertion on the mock `PUT /artifacts` endpoint to make sure
that `Content-Length` gets set.
  • Loading branch information
chris-olszewski authored Oct 2, 2024
1 parent 0c34728 commit dfe3ca2
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 2 deletions.
41 changes: 40 additions & 1 deletion crates/turborepo-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub trait CacheClient {
&self,
hash: &str,
artifact_body: impl tokio_stream::Stream<Item = Result<bytes::Bytes>> + Send + Sync + 'static,
body_len: usize,
duration: u64,
tag: Option<&str>,
token: &str,
Expand Down Expand Up @@ -372,6 +373,7 @@ impl CacheClient for APIClient {
&self,
hash: &str,
artifact_body: impl tokio_stream::Stream<Item = Result<bytes::Bytes>> + Send + Sync + 'static,
body_length: usize,
duration: u64,
tag: Option<&str>,
token: &str,
Expand Down Expand Up @@ -403,6 +405,7 @@ impl CacheClient for APIClient {
.header("Content-Type", "application/octet-stream")
.header("x-artifact-duration", duration.to_string())
.header("User-Agent", self.user_agent.clone())
.header("Content-Length", body_length)
.body(stream);

if allow_auth {
Expand Down Expand Up @@ -805,10 +808,11 @@ mod test {
use std::time::Duration;

use anyhow::Result;
use bytes::Bytes;
use turborepo_vercel_api_mock::start_test_server;
use url::Url;

use crate::{APIClient, Client};
use crate::{APIClient, CacheClient, Client};

#[tokio::test]
async fn test_do_preflight() -> Result<()> {
Expand Down Expand Up @@ -898,4 +902,39 @@ mod test {
let err = APIClient::handle_403(response).await;
assert_eq!(err.to_string(), "unknown status forbidden: Not authorized");
}

#[tokio::test]
async fn test_content_length() -> Result<()> {
let port = port_scanner::request_open_port().unwrap();
let handle = tokio::spawn(start_test_server(port));
let base_url = format!("http://localhost:{}", port);

let client = APIClient::new(
&base_url,
Some(Duration::from_secs(200)),
None,
"2.0.0",
true,
)?;
let body = b"hello world!";
let artifact_body = tokio_stream::once(Ok(Bytes::copy_from_slice(body)));

client
.put_artifact(
"eggs",
artifact_body,
body.len(),
123,
None,
"token",
None,
None,
)
.await?;

handle.abort();
let _ = handle.await;

Ok(())
}
}
1 change: 1 addition & 0 deletions crates/turborepo-auth/src/auth/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ mod tests {
> + Send
+ Sync
+ 'static,
_body_len: usize,
_duration: u64,
_tag: Option<&str>,
_token: &str,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-auth/src/auth/sso.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ mod tests {
> + Send
+ Sync
+ 'static,
_body_len: usize,
_duration: u64,
_tag: Option<&str>,
_token: &str,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-auth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ mod tests {
> + Send
+ Sync
+ 'static,
_body_len: usize,
_duration: u64,
_tag: Option<&str>,
_token: &str,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-cache/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl HTTPCache {
.put_artifact(
hash,
progress,
bytes,
duration,
tag.as_deref(),
&self.api_auth.token,
Expand Down
7 changes: 6 additions & 1 deletion crates/turborepo-vercel-api-mock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Result;
use axum::{
body::Body,
extract::Path,
http::{HeaderMap, HeaderValue, StatusCode},
http::{header::CONTENT_LENGTH, HeaderMap, HeaderValue, StatusCode},
routing::{get, head, options, patch, post, put},
Json, Router,
};
Expand Down Expand Up @@ -162,6 +162,11 @@ pub async fn start_test_server(port: u16) -> Result<()> {
.and_then(|duration| duration.parse::<u32>().ok())
.expect("x-artifact-duration header is missing");

assert!(
headers.get(CONTENT_LENGTH).is_some(),
"expected to get content-length"
);

let mut durations_map = put_durations_ref.lock().await;
durations_map.insert(hash.clone(), duration);

Expand Down

0 comments on commit dfe3ca2

Please sign in to comment.