From c07e9de78946f97926d618efb9b72f70e38e2cd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurynas=20Aleksi=C5=ABnas?= Date: Wed, 10 May 2023 16:54:05 +0300 Subject: [PATCH] Add partial file stream download --- src/cursor/mod.rs | 1 + src/error.rs | 9 + src/gridfs/download.rs | 140 ++++++- src/gridfs/options.rs | 21 + src/sync/gridfs.rs | 12 +- src/sync/test.rs | 2 +- src/test/spec/gridfs.rs | 445 +++++++++++++++++++++- src/test/spec/unified_runner/operation.rs | 2 +- 8 files changed, 594 insertions(+), 38 deletions(-) diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 04c63514f..347eb836f 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -148,6 +148,7 @@ impl Cursor { } /// Whether this cursor has any additional items to return. + #[allow(dead_code)] pub(crate) fn has_next(&self) -> bool { !self.is_exhausted() || !self diff --git a/src/error.rs b/src/error.rs index 068900755..8fe5ac5d2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -905,6 +905,15 @@ pub enum GridFsErrorKind { /// [`GridFsUploadStream`](crate::gridfs::GridFsUploadStream) while a write was still in /// progress. WriteInProgress, + + /// Partial file download range is invalid when start is greater then end + InvalidPartialDownloadRange { start: u64, end: u64 }, + + /// Partial file download range is invalid when start or end are greater then file length + PartialDownloadRangeOutOfBounds { + out_of_bounds_value: u64, + file_length: u64, + }, } /// An identifier for a file stored in a GridFS bucket. diff --git a/src/gridfs/download.rs b/src/gridfs/download.rs index c996394c0..8e636164c 100644 --- a/src/gridfs/download.rs +++ b/src/gridfs/download.rs @@ -14,11 +14,14 @@ use super::{options::GridFsDownloadByNameOptions, Chunk, FilesCollectionDocument use crate::{ bson::{doc, Bson}, error::{ErrorKind, GridFsErrorKind, GridFsFileIdentifier, Result}, + gridfs::GridFsDownloadByIdOptions, options::{FindOneOptions, FindOptions}, Collection, Cursor, }; +struct DownloadRange(Option, Option); + // Utility functions for finding files within the bucket. impl GridFsBucket { async fn find_file_by_id(&self, id: &Bson) -> Result { @@ -214,7 +217,7 @@ impl GridFsBucket { /// use futures_util::io::AsyncReadExt; /// /// let mut buf = Vec::new(); -/// let mut download_stream = bucket.open_download_stream(id).await?; +/// let mut download_stream = bucket.open_download_stream(id, None).await?; /// download_stream.read_to_end(&mut buf).await?; /// # Ok(()) /// # } @@ -228,7 +231,7 @@ impl GridFsBucket { /// # async fn compat_example(bucket: GridFsBucket, id: Bson) -> Result<()> { /// use tokio_util::compat::FuturesAsyncReadCompatExt; /// -/// let futures_upload_stream = bucket.open_download_stream(id).await?; +/// let futures_upload_stream = bucket.open_download_stream(id, None).await?; /// let tokio_upload_stream = futures_upload_stream.compat(); /// # Ok(()) /// # } @@ -236,7 +239,10 @@ impl GridFsBucket { pub struct GridFsDownloadStream { state: State, current_n: u32, + total_n: u32, file: FilesCollectionDocument, + to_skip: u64, + to_take: u64, } type GetBytesFuture = BoxFuture<'static, Result<(Vec, Box>>)>>; @@ -264,25 +270,71 @@ impl State { } } +fn validate_range_value(range_value: Option, file_length: u64) -> Result<()> { + if let Some(range) = range_value { + if range > file_length { + return Err( + ErrorKind::GridFs(GridFsErrorKind::PartialDownloadRangeOutOfBounds { + file_length, + out_of_bounds_value: range, + }) + .into(), + ); + } + } + + Ok(()) +} + impl GridFsDownloadStream { async fn new( file: FilesCollectionDocument, chunks: &Collection>, + range: DownloadRange, ) -> Result { - let initial_state = if file.length == 0 { + validate_range_value(range.0, file.length)?; + validate_range_value(range.1, file.length)?; + + let is_empty_range = match range { + DownloadRange(Some(start), Some(end)) => start == end, + _ => false, + }; + + let to_skip = range.0.unwrap_or(0); + let to_take = range.1.unwrap_or(file.length) - to_skip; + let chunk_size = file.chunk_size_bytes as u64; + let chunks_to_skip = to_skip / chunk_size; + let total_chunks = range + .1 + .map(|end| end / chunk_size + u64::from(end % chunk_size != 0)); + + let initial_state = if file.length == 0 || is_empty_range { State::Done } else { - let options = FindOptions::builder().sort(doc! { "n": 1 }).build(); - let cursor = chunks.find(doc! { "files_id": &file.id }, options).await?; + let options = FindOptions::builder() + .sort(doc! { "n": 1 }) + .limit(total_chunks.map(|end| (end - chunks_to_skip) as i64)) + .build(); + let cursor = chunks + .find( + doc! { "files_id": &file.id, "n": { "$gte": chunks_to_skip as i64 } }, + options, + ) + .await?; + State::Idle(Some(Idle { buffer: Vec::new(), cursor: Box::new(cursor), })) }; + Ok(Self { state: initial_state, - current_n: 0, + current_n: chunks_to_skip as u32, + total_n: total_chunks.map(|value| value as u32).unwrap_or(file.n()), file, + to_skip: to_skip % chunk_size, + to_take, }) } } @@ -303,12 +355,12 @@ impl AsyncRead for GridFsDownloadStream { Ok((buffer, cursor)) } else { let chunks_in_buf = FilesCollectionDocument::n_from_vals( - buf.len() as u64, + stream.to_skip + buf.len() as u64, stream.file.chunk_size_bytes, ); // We should read from current_n to chunks_in_buf + current_n, or, if that would // exceed the total number of chunks in the file, to the last chunk in the file. - let final_n = std::cmp::min(chunks_in_buf + stream.current_n, stream.file.n()); + let final_n = std::cmp::min(chunks_in_buf + stream.current_n, stream.total_n); let n_range = stream.current_n..final_n; stream.current_n = final_n; @@ -320,10 +372,13 @@ impl AsyncRead for GridFsDownloadStream { n_range, stream.file.chunk_size_bytes, stream.file.length, + stream.to_skip, ) .boxed(), ); + stream.to_skip = 0; + match new_future.poll_unpin(cx) { Poll::Ready(result) => result, Poll::Pending => return Poll::Pending, @@ -340,13 +395,19 @@ impl AsyncRead for GridFsDownloadStream { match result { Ok((mut buffer, cursor)) => { - let bytes_to_write = std::cmp::min(buffer.len(), buf.len()); + let mut bytes_to_write = std::cmp::min(buffer.len(), buf.len()); + + if bytes_to_write as u64 > stream.to_take { + bytes_to_write = stream.to_take as usize; + } + buf[..bytes_to_write].copy_from_slice(buffer.drain(0..bytes_to_write).as_slice()); + stream.to_take -= bytes_to_write as u64; - stream.state = if !buffer.is_empty() || cursor.has_next() { - State::Idle(Some(Idle { buffer, cursor })) - } else { + stream.state = if stream.to_take == 0 { State::Done + } else { + State::Idle(Some(Idle { buffer, cursor })) }; Poll::Ready(Ok(bytes_to_write)) @@ -365,6 +426,7 @@ async fn get_bytes( n_range: Range, chunk_size_bytes: u32, file_len: u64, + mut to_skip: u64, ) -> Result<(Vec, Box>>)> { for n in n_range { if !cursor.advance().await? { @@ -389,19 +451,53 @@ async fn get_bytes( .into()); } - buffer.extend_from_slice(chunk_bytes); + if to_skip >= chunk_bytes.len() as u64 { + to_skip -= chunk_bytes.len() as u64; + } else if to_skip > 0 { + buffer.extend_from_slice(&chunk_bytes[to_skip as usize..]); + to_skip = 0; + } else { + buffer.extend_from_slice(chunk_bytes); + } } Ok((buffer, cursor)) } +fn create_download_range(start: Option, end: Option) -> Result { + match (start, end) { + (Some(start), Some(end)) => { + if start <= end { + Ok(DownloadRange(Some(start), Some(end))) + } else { + Err( + ErrorKind::GridFs(GridFsErrorKind::InvalidPartialDownloadRange { start, end }) + .into(), + ) + } + } + _ => Ok(DownloadRange(start, end)), + } +} + // User functions for creating download streams. impl GridFsBucket { /// Opens and returns a [`GridFsDownloadStream`] from which the application can read /// the contents of the stored file specified by `id`. - pub async fn open_download_stream(&self, id: Bson) -> Result { + pub async fn open_download_stream( + &self, + id: Bson, + options: impl Into>, + ) -> Result { + let options: Option = options.into(); let file = self.find_file_by_id(&id).await?; - GridFsDownloadStream::new(file, self.chunks()).await + + let range = create_download_range( + options.as_ref().and_then(|options| options.start), + options.as_ref().and_then(|options| options.end), + )?; + + GridFsDownloadStream::new(file, self.chunks(), range).await } /// Opens and returns a [`GridFsDownloadStream`] from which the application can read @@ -416,9 +512,15 @@ impl GridFsBucket { filename: impl AsRef, options: impl Into>, ) -> Result { - let file = self - .find_file_by_name(filename.as_ref(), options.into()) - .await?; - GridFsDownloadStream::new(file, self.chunks()).await + let options: Option = options.into(); + + let range = create_download_range( + options.as_ref().and_then(|options| options.start), + options.as_ref().and_then(|options| options.end), + )?; + + let file = self.find_file_by_name(filename.as_ref(), options).await?; + + GridFsDownloadStream::new(file, self.chunks(), range).await } } diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs index 594eb89f5..33812d084 100644 --- a/src/gridfs/options.rs +++ b/src/gridfs/options.rs @@ -43,6 +43,20 @@ pub struct GridFsUploadOptions { pub metadata: Option, } +/// Contains the options for downloading a file from a [`GridFsBucket`](crate::gridfs::GridFsBucket) +/// by id. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(default, setter(into)))] +#[non_exhaustive] +pub struct GridFsDownloadByIdOptions { + /// 0-indexed non-negative byte offset from the beginning of the file. + pub start: Option, + + /// 0-indexed non-negative byte offset to the end of the file contents to be returned by the + /// stream. end is non-inclusive. + pub end: Option, +} + /// Contains the options for downloading a file from a [`GridFsBucket`](crate::gridfs::GridFsBucket) /// by name. #[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] @@ -60,6 +74,13 @@ pub struct GridFsDownloadByNameOptions { /// -2 = the second most recent revision /// -1 = the most recent revision pub revision: Option, + + /// 0-indexed non-negative byte offset from the beginning of the file. + pub start: Option, + + /// 0-indexed non-negative byte offset to the end of the file contents to be returned by the + /// stream. end is non-inclusive. + pub end: Option, } /// Contains the options for finding diff --git a/src/sync/gridfs.rs b/src/sync/gridfs.rs index 711c60843..e8275367e 100644 --- a/src/sync/gridfs.rs +++ b/src/sync/gridfs.rs @@ -25,6 +25,7 @@ use crate::{ }; pub use crate::gridfs::FilesCollectionDocument; +use crate::gridfs::GridFsDownloadByIdOptions; /// A `GridFsBucket` provides the functionality for storing and retrieving binary BSON data that /// exceeds the 16 MiB size limit of a MongoDB document. Users may upload and download large amounts @@ -98,7 +99,7 @@ impl GridFsBucket { /// use std::io::Read; /// /// let mut buf = Vec::new(); -/// let mut download_stream = bucket.open_download_stream(id)?; +/// let mut download_stream = bucket.open_download_stream(id, None)?; /// download_stream.read_to_end(&mut buf)?; /// # Ok(()) /// # } @@ -123,8 +124,13 @@ impl GridFsDownloadStream { impl GridFsBucket { /// Opens and returns a [`GridFsDownloadStream`] from which the application can read /// the contents of the stored file specified by `id`. - pub fn open_download_stream(&self, id: Bson) -> Result { - runtime::block_on(self.async_bucket.open_download_stream(id)).map(GridFsDownloadStream::new) + pub fn open_download_stream( + &self, + id: Bson, + options: impl Into>, + ) -> Result { + runtime::block_on(self.async_bucket.open_download_stream(id, options)) + .map(GridFsDownloadStream::new) } /// Opens and returns a [`GridFsDownloadStream`] from which the application can read diff --git a/src/sync/test.rs b/src/sync/test.rs index 590c6f2f7..2202fa2f3 100644 --- a/src/sync/test.rs +++ b/src/sync/test.rs @@ -455,7 +455,7 @@ fn gridfs() { upload_stream.close().unwrap(); let mut download_stream = bucket - .open_download_stream(upload_stream.id().clone()) + .open_download_stream(upload_stream.id().clone(), None) .unwrap(); download_stream.read_to_end(&mut download).unwrap(); diff --git a/src/test/spec/gridfs.rs b/src/test/spec/gridfs.rs index 10bfe7311..f1e7c738f 100644 --- a/src/test/spec/gridfs.rs +++ b/src/test/spec/gridfs.rs @@ -1,11 +1,11 @@ -use std::time::Duration; +use std::{ops::Range, time::Duration}; use futures_util::io::{AsyncReadExt, AsyncWriteExt}; use crate::{ bson::{doc, Bson, Document}, error::{Error, ErrorKind, GridFsErrorKind}, - gridfs::{GridFsBucket, GridFsUploadStream}, + gridfs::{GridFsBucket, GridFsDownloadByIdOptions, GridFsUploadStream}, options::{GridFsBucketOptions, GridFsUploadOptions}, runtime, test::{ @@ -38,19 +38,10 @@ async fn download_stream_across_buffers() { let client = TestClient::new().await; - let options = GridFsBucketOptions::builder().chunk_size_bytes(3).build(); - let bucket = client - .database("download_stream_across_buffers") - .gridfs_bucket(options); - bucket.drop().await.unwrap(); - - let data: Vec = (0..20).collect(); - let id = bucket - .upload_from_futures_0_3_reader("test", &data[..], None) - .await - .unwrap(); + let (id, data, bucket) = + mock_bucket_for_download(&client, "download_stream_across_buffers", 3, 0..20).await; - let mut download_stream = bucket.open_download_stream(id.into()).await.unwrap(); + let mut download_stream = bucket.open_download_stream(id, None).await.unwrap(); let mut buf = vec![0u8; 12]; // read in a partial chunk @@ -78,6 +69,411 @@ async fn download_stream_across_buffers() { assert_eq!(buf, data); } +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_start_single_chunk() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_start_single_chunk", + 25, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder().start(5).build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[5..]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_end_single_chunk() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_end_single_chunk", + 25, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder().end(7).build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[..7]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_start_end_single_chunk() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_start_end_single_chunk", + 25, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder() + .start(5) + .end(12) + .build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[5..12]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_start_multiple_chunks() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_start_multiple_chunks", + 3, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder().start(4).build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[4..]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_end_multiple_chunks() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_end_multiple_chunks", + 3, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder().end(7).build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[..7]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_start_end_multiple_chunks() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_start_end_multiple_chunks", + 3, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder() + .start(5) + .end(13) + .build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[5..13]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_start_end_exact_multiple_chunks() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_start_end_exact_multiple_chunks", + 3, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder() + .start(6) + .end(15) + .build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[6..15]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_start_end_extract_single_chunk_in_multiple_chunks() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_start_end_extract_single_chunk", + 3, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder() + .start(9) + .end(12) + .build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[9..12]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_empty_range() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, _, bucket) = + mock_bucket_for_download(&client, "partial_download_stream_empty_range", 3, 0..20).await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder().start(9).end(9).build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf, Vec::::new()); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_empty_range_at_end() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, _, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_empty_range_at_end", + 3, + 0..20, + ) + .await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder() + .start(20) + .end(20) + .build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf, Vec::::new()); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_limit_to_end() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, data, bucket) = + mock_bucket_for_download(&client, "partial_download_stream_limit_to_end", 3, 0..20).await; + + let mut download_stream = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder() + .start(9) + .end(20) + .build(), + ) + .await + .unwrap(); + + let mut buf = Vec::new(); + + download_stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(&buf, &data[9..20]); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_invalid_range_error() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, _, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_invalid_range_error", + 3, + 0..20, + ) + .await; + + let download_error = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder() + .start(10) + .end(9) + .build(), + ) + .await + .err(); + + assert!(matches!( + *download_error.unwrap().kind, + ErrorKind::GridFs(GridFsErrorKind::InvalidPartialDownloadRange { start: 10, end: 9 }) + )); +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn partial_download_stream_out_of_bounds_error() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + + let (id, _, bucket) = mock_bucket_for_download( + &client, + "partial_download_stream_out_of_bounds_error", + 3, + 0..20, + ) + .await; + + let download_error = bucket + .open_download_stream( + id.into(), + GridFsDownloadByIdOptions::builder() + .start(18) + .end(21) + .build(), + ) + .await + .err(); + + assert!(matches!( + *download_error.unwrap().kind, + ErrorKind::GridFs(GridFsErrorKind::PartialDownloadRangeOutOfBounds { + file_length: 20, + out_of_bounds_value: 21, + }) + )); +} + #[cfg_attr(feature = "tokio-runtime", tokio::test)] #[cfg_attr(feature = "async-std-runtime", async_std::test)] async fn upload_stream() { @@ -346,3 +742,24 @@ async fn assert_no_chunks_written(bucket: &GridFsBucket, id: &Bson) { .unwrap() .is_none()); } + +async fn mock_bucket_for_download( + client: &TestClient, + db_name: &str, + chunk_size_bytes: u32, + data: Range, +) -> (Bson, Vec, GridFsBucket) { + let options = GridFsBucketOptions::builder() + .chunk_size_bytes(chunk_size_bytes) + .build(); + let bucket = client.database(db_name).gridfs_bucket(options); + bucket.drop().await.unwrap(); + + let data: Vec = data.collect(); + let id = bucket + .upload_from_futures_0_3_reader("test", &data[..], None) + .await + .unwrap(); + + (id.into(), data, bucket) +} diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index 429213463..d17febf91 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -2744,7 +2744,7 @@ impl TestOperation for Download { // Next, read via the open_download_stream API. let mut buf: Vec = vec![]; - let mut stream = bucket.open_download_stream(self.id.clone()).await?; + let mut stream = bucket.open_download_stream(self.id.clone(), None).await?; stream.read_to_end(&mut buf).await?; let stream_data = hex::encode(buf);