Skip to content

Commit

Permalink
test(multipart): replace SlowStream helper
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Jul 6, 2024
1 parent 5c9e6e7 commit 6ae131c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 44 deletions.
1 change: 1 addition & 0 deletions actix-multipart/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ assert_matches = "1"
awc = "3"
env_logger = "0.11"
futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-test = "0.3"
multer = "3"
tokio = { version = "1.24.2", features = ["sync"] }
tokio-stream = "0.1"
Expand Down
58 changes: 14 additions & 44 deletions actix-multipart/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ mod tests {
FromRequest,
};
use assert_matches::assert_matches;
use futures_util::{future::lazy, StreamExt as _};
use futures_test::stream::StreamTestExt as _;
use futures_util::{future::lazy, stream, StreamExt as _};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

Expand Down Expand Up @@ -545,45 +546,6 @@ mod tests {
)
}

// Stream that returns from a Bytes, one char at a time and Pending every other poll()
struct SlowStream {
bytes: Bytes,
pos: usize,
ready: bool,
}

impl SlowStream {
fn new(bytes: Bytes) -> SlowStream {
SlowStream {
bytes,
pos: 0,
ready: false,
}
}
}

impl Stream for SlowStream {
type Item = Result<Bytes, PayloadError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if !this.ready {
this.ready = true;
cx.waker().wake_by_ref();
return Poll::Pending;
}

if this.pos == this.bytes.len() {
return Poll::Ready(None);
}

let res = Poll::Ready(Some(Ok(this.bytes.slice(this.pos..(this.pos + 1)))));
this.pos += 1;
this.ready = false;
res
}
}

fn create_simple_request_with_header() -> (Bytes, HeaderMap) {
let (body, headers) = crate::test::create_form_data_payload_and_headers_with_boundary(
BOUNDARY,
Expand Down Expand Up @@ -721,7 +683,9 @@ mod tests {
#[actix_rt::test]
async fn test_stream() {
let (bytes, headers) = create_double_request_with_header();
let payload = SlowStream::new(bytes);
let payload = stream::iter(bytes)
.map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
.interleave_pending();

let mut multipart = Multipart::new(&headers, payload);
match multipart.next().await.unwrap() {
Expand Down Expand Up @@ -899,7 +863,9 @@ mod tests {
"multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
),
);
let payload = SlowStream::new(bytes);
let payload = stream::iter(bytes)
.map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
.interleave_pending();

let mut multipart = Multipart::new(&headers, payload);
let res = multipart.next().await.unwrap();
Expand Down Expand Up @@ -929,7 +895,9 @@ mod tests {
"multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
),
);
let payload = SlowStream::new(bytes);
let payload = stream::iter(bytes)
.map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
.interleave_pending();

let mut multipart = Multipart::new(&headers, payload);
let res = multipart.next().await.unwrap();
Expand All @@ -955,7 +923,9 @@ mod tests {
"multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
),
);
let payload = SlowStream::new(bytes);
let payload = stream::iter(bytes)
.map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
.interleave_pending();

let mut multipart = Multipart::new(&headers, payload);
let res = multipart.next().await.unwrap();
Expand Down

0 comments on commit 6ae131c

Please sign in to comment.