From 756869fd3e6e02f63f8ed059c570665ee990f0b5 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Fri, 1 Jul 2022 11:44:29 -0400 Subject: [PATCH 01/23] push public api skeleton --- src/db/mod.rs | 11 ++++ src/gridfs.rs | 168 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 180 insertions(+) create mode 100644 src/gridfs.rs diff --git a/src/db/mod.rs b/src/db/mod.rs index 2916a4bfa..71d26f360 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -19,6 +19,7 @@ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, error::{Error, ErrorKind, Result}, + gridfs::{GridFSBucket, GridFSBucketOptions}, operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, options::{ AggregateOptions, @@ -564,4 +565,14 @@ impl Database { .execute_watch_with_session(pipeline, options, target, None, session) .await } + + pub fn new_gridfs_bucket( + &self, + options: impl Into>, + ) -> GridFSBucket { + GridFSBucket { + db: self.clone(), + options: options.into(), + } + } } diff --git a/src/gridfs.rs b/src/gridfs.rs new file mode 100644 index 000000000..753e09509 --- /dev/null +++ b/src/gridfs.rs @@ -0,0 +1,168 @@ +use core::task::{Context, Poll}; +use std::{ + io::{self, Result}, + marker::PhantomPinned, + pin::Pin, +}; + +use crate::{ + concern::{ReadConcern, WriteConcern}, + cursor::Cursor, + selection_criteria::ReadPreference, + Database, +}; + +use bson::{oid::ObjectId, DateTime, Document}; +use serde::Deserialize; +use typed_builder::TypedBuilder; + +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFSBucketOptions { + bucket_name: Option, + chunk_size_bytes: Option, + write_concern: Option, + read_concern: Option, + read_preference: Option, +} + +pub struct GridFSUploadOptions { + chunk_size_bytes: Option, + metadata: Option, +} + +pub struct GridFSDownloadByNameOptions { + revision: Option, +} + +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFSFindOptions { + allow_disk_use: Option, + batch_size: Option, + limit: Option, + max_time_ms: Option, + no_cursor_timeout: Option, + skip: i32, + sort: Option, +} + +// Contained in a "chunks" collection for each user file +pub struct Chunk { + id: ObjectId, + files_id: T, + n: i32, + // default size is 255 KiB + data: Vec, +} + +// A collection in which information about stored files is stored. There will be one files +// collection document per stored file. +pub struct FilesCollectionDocument { + id: T, + length: i64, + chunk_size: i32, + upload_date: DateTime, + filename: String, + metadata: Document, +} + +pub struct GridFSBucket { + // Contains a "chunks" collection + pub db: Database, + pub options: Option, +} + +pub struct GridFSStream { + _pin: PhantomPinned, +} + +impl AsyncRead for GridFSStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + todo!() + } +} + +impl AsyncWrite for GridFSStream { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + todo!() + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} + +impl GridFSBucket { + pub fn open_upload_stream_with_id( + &self, + id: T, + filename: String, + options: GridFSUploadOptions, + ) -> GridFSStream { + todo!() + } + + pub fn upload_from_stream_with_id( + &self, + id: T, + filename: String, + source: GridFSStream, + option: GridFSUploadOptions, + ) { + todo!() + } + + pub fn open_download_stream(&self, id: T) { + todo!() + } + + pub fn download_to_stream(&self, id: T, destination: GridFSStream) { + todo!() + } + + pub fn delete(&self, id: T) { + todo!() + } + + pub fn find(&self, filter: Document, options: GridFSBucketOptions) -> Result> { + todo!() + } + + pub fn open_download_stream_by_name( + &self, + filename: String, + options: GridFSDownloadByNameOptions, + ) -> GridFSStream { + todo!() + } + + pub fn download_to_stream_by_name( + &self, + filename: String, + destination: GridFSStream, + options: GridFSDownloadByNameOptions, + ) { + todo!() + } + + pub fn rename(&self, id: T, new_filename: String) { + todo!() + } + + pub fn drop(&self) { + todo!() + } +} diff --git a/src/lib.rs b/src/lib.rs index 68b803005..76a2e1422 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -322,6 +322,7 @@ mod compression; mod concern; mod cursor; mod db; +pub mod gridfs; pub mod error; pub mod event; mod index; From b245a673c7437cf4f8a1b20a9ac896ea5c94c436 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Thu, 7 Jul 2022 10:48:37 -0400 Subject: [PATCH 02/23] add public api documentation --- src/db/mod.rs | 9 ++- src/gridfs.rs | 202 ++++++++++++++++++++++++++++++++++++++------------ src/lib.rs | 18 ++--- 3 files changed, 168 insertions(+), 61 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 71d26f360..c9a488990 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -19,7 +19,7 @@ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, error::{Error, ErrorKind, Result}, - gridfs::{GridFSBucket, GridFSBucketOptions}, + gridfs::{GridFsBucket, GridFsBucketOptions}, operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, options::{ AggregateOptions, @@ -566,11 +566,12 @@ impl Database { .await } + /// Creates a new [`GridFsBucket`] in the database with the given options. pub fn new_gridfs_bucket( &self, - options: impl Into>, - ) -> GridFSBucket { - GridFSBucket { + options: impl Into>, + ) -> GridFsBucket { + GridFsBucket { db: self.clone(), options: options.into(), } diff --git a/src/gridfs.rs b/src/gridfs.rs index 753e09509..1e3cb17d9 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -8,7 +8,7 @@ use std::{ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, - selection_criteria::ReadPreference, + selection_criteria::{ReadPreference, SelectionCriteria}, Database, }; @@ -18,41 +18,92 @@ use typed_builder::TypedBuilder; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +/// Contains the options for creating a [`GridFsBucket`]. #[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] #[builder(field_defaults(setter(into)))] #[non_exhaustive] -pub struct GridFSBucketOptions { - bucket_name: Option, - chunk_size_bytes: Option, - write_concern: Option, - read_concern: Option, - read_preference: Option, +pub struct GridFsBucketOptions { + /// The bucket name. Defaults to 'fs'. + pub bucket_name: Option, + + /// The chunk size in bytes. Defaults to 255 KiB. + pub chunk_size_bytes: Option, + + /// The write concern. Defaults to the write concern of the database. + pub write_concern: Option, + + /// The read concern. Defaults to the read concern of the database. + pub read_concern: Option, + + /// The read preference. Defaults to the read preference of the database. + pub read_preference: Option, } -pub struct GridFSUploadOptions { - chunk_size_bytes: Option, - metadata: Option, +/// Contains the options for creating a [`GridFsStream`] to upload a file to a +/// [`GridFsBucket`]. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsUploadOptions { + /// The number of bytes per chunk of this file. Defaults to the chunk_size_bytes in the + /// GridFsBucketOptions. + pub chunk_size_bytes: Option, + + /// User data for the 'metadata' field of the files collection document. + pub metadata: Option, } -pub struct GridFSDownloadByNameOptions { - revision: Option, +/// Contains the options for creating [`GridFsStream`] to retrieve a stored file +/// from a [`GridFsBucket`]. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsDownloadByNameOptions { + /// Which revision (documents with the same filename and different `upload_date`) + /// of the file to retrieve. Defaults to -1 (the most recent revision). + /// + /// Revision numbers are defined as follows: + /// 0 = the original stored file + /// 1 = the first revision + /// 2 = the second revision + /// etc... + /// -2 = the second most recent revision + /// -1 = the most recent revision + pub revision: Option, } +/// Contains the options for performing a find operation on a files collection. #[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] #[builder(field_defaults(setter(into)))] #[non_exhaustive] -pub struct GridFSFindOptions { - allow_disk_use: Option, - batch_size: Option, - limit: Option, - max_time_ms: Option, - no_cursor_timeout: Option, - skip: i32, - sort: Option, +pub struct GridFsFindOptions { + /// Enables writing to temporary files on the server. When set to true, the + /// server can write temporary data to disk while executing the find operation + /// on the files collection. + pub allow_disk_use: Option, + + /// The number of documents to return per batch. + pub batch_size: Option, + + /// The maximum number of documents to return. + pub limit: Option, + + /// The maximum amount of time to allow the query to run. + pub max_time_ms: Option, + + /// The server normally times out idle cursors after an inactivity period + /// (10 minutes) to prevent excess memory use. Set this option to prevent that. + pub no_cursor_timeout: Option, + + /// The number of documents to skip before returning. + pub skip: i32, + + /// The order by which to sort results. Defaults to not sorting. + pub sort: Option, } // Contained in a "chunks" collection for each user file -pub struct Chunk { +struct Chunk { id: ObjectId, files_id: T, n: i32, @@ -62,7 +113,7 @@ pub struct Chunk { // A collection in which information about stored files is stored. There will be one files // collection document per stored file. -pub struct FilesCollectionDocument { +struct FilesCollectionDocument { id: T, length: i64, chunk_size: i32, @@ -71,17 +122,18 @@ pub struct FilesCollectionDocument { metadata: Document, } -pub struct GridFSBucket { +/// Struct for storing GridFS managed files within a [`Database`]. +pub struct GridFsBucket { // Contains a "chunks" collection - pub db: Database, - pub options: Option, + pub(crate) db: Database, + pub(crate) options: Option, } -pub struct GridFSStream { +pub struct GridFsStream { _pin: PhantomPinned, } -impl AsyncRead for GridFSStream { +impl AsyncRead for GridFsStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -91,7 +143,7 @@ impl AsyncRead for GridFSStream { } } -impl AsyncWrite for GridFSStream { +impl AsyncWrite for GridFsStream { fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { todo!() } @@ -105,63 +157,117 @@ impl AsyncWrite for GridFSStream { } } -impl GridFSBucket { +impl GridFsBucket { + /// Gets the read concern of the [`GridFsBucket`]. + pub fn read_concern(&self) -> Option<&ReadConcern> { + if let Some(options) = &self.options { + if let Some(ref rc) = options.read_concern { + return Some(rc); + } + } + self.db.read_concern() + } + + /// Gets the write concern of the [`GridFsBucket`]. + pub fn write_concern(&self) -> Option<&WriteConcern> { + if let Some(options) = &self.options { + if let Some(ref wc) = options.write_concern { + return Some(wc); + } + } + self.db.write_concern() + } + + /// Gets the read preference of the [`GridFsBucket`]. + pub fn read_preference(&self) -> Option<&ReadPreference> { + if let Some(options) = &self.options { + if let Some(ref rp) = options.read_preference { + return Some(rp); + } + } + if let Some(SelectionCriteria::ReadPreference(ref rp)) = self.db.selection_criteria() { + Some(rp) + } else { + None + } + } + + /// Opens a [`GridFsStream`] that the application can write the contents of the file to. + /// The application provides a custom file id. + /// + /// Returns a [`GridFsStream`] to which the application will write the contents. pub fn open_upload_stream_with_id( &self, id: T, filename: String, - options: GridFSUploadOptions, - ) -> GridFSStream { + options: GridFsUploadOptions, + ) -> GridFsStream { todo!() } + /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. pub fn upload_from_stream_with_id( &self, id: T, filename: String, - source: GridFSStream, - option: GridFSUploadOptions, + source: GridFsStream, + option: GridFsUploadOptions, ) { todo!() } - pub fn open_download_stream(&self, id: T) { + /// Opens and returns a [`GridFsStream`] from which the application can read + /// the contents of the stored file specified by `id`. + pub fn open_download_stream(&self, id: T) -> GridFsStream { todo!() } - pub fn download_to_stream(&self, id: T, destination: GridFSStream) { + /// Opens and returns a [`GridFsStream`] from which the application can read + /// the contents of the stored file specified by `filename` and the revision + /// in `options`. + pub fn open_download_stream_by_name( + &self, + filename: String, + options: GridFsDownloadByNameOptions, + ) -> GridFsStream { todo!() } - pub fn delete(&self, id: T) { + /// Downloads the contents of the stored file specified by `id` and writes + /// the contents to the destination [`GridFsStream`]. + pub fn download_to_stream(&self, id: T, destination: GridFsStream) { todo!() } - pub fn find(&self, filter: Document, options: GridFSBucketOptions) -> Result> { + /// Downloads the contents of the stored file specified by `filename` and by + /// the revision in `options` and writes the contents to the destination + /// [`GridFsStream`]. + pub fn download_to_stream_by_name( + &self, + filename: String, + destination: GridFsStream, + options: GridFsDownloadByNameOptions, + ) { todo!() } - pub fn open_download_stream_by_name( - &self, - filename: String, - options: GridFSDownloadByNameOptions, - ) -> GridFSStream { + /// Given an `id`, deletes the stored file's files collection document and + /// associated chunks from a [`GridFsBucket`]. + pub fn delete(&self, id: T) { todo!() } - pub fn download_to_stream_by_name( - &self, - filename: String, - destination: GridFSStream, - options: GridFSDownloadByNameOptions, - ) { + /// Finds and returns the files collection documents that match the filter. + pub fn find(&self, filter: Document, options: GridFsBucketOptions) -> Result> { todo!() } + /// Renames the stored file with the specified `id`. pub fn rename(&self, id: T, new_filename: String) { todo!() } + /// Drops the files associated with this bucket. pub fn drop(&self) { todo!() } diff --git a/src/lib.rs b/src/lib.rs index 76a2e1422..738496ff4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -322,11 +322,11 @@ mod compression; mod concern; mod cursor; mod db; -pub mod gridfs; pub mod error; pub mod event; -mod index; +mod gridfs; mod hello; +mod index; mod operation; pub mod results; pub(crate) mod runtime; @@ -344,18 +344,18 @@ mod test; extern crate derive_more; pub use crate::{ - client::{Client, session::ClientSession}, + client::{session::ClientSession, Client}, coll::Collection, - cursor::{Cursor, session::{SessionCursor, SessionCursorStream}}, + cursor::{ + session::{SessionCursor, SessionCursorStream}, + Cursor, + }, db::Database, }; -pub use {coll::Namespace, index::IndexModel, client::session::ClusterTime, sdam::public::*}; +pub use {client::session::ClusterTime, coll::Namespace, index::IndexModel, sdam::public::*}; -#[cfg(all( - feature = "tokio-runtime", - feature = "async-std-runtime", -))] +#[cfg(all(feature = "tokio-runtime", feature = "async-std-runtime",))] compile_error!( "`tokio-runtime` and `async-std-runtime` can't both be enabled; either disable \ `async-std-runtime` or set `default-features = false` in your Cargo.toml" From 796c1c08ac329be5c902ed1b074aa4eb25ec83bf Mon Sep 17 00:00:00 2001 From: sanav33 Date: Tue, 12 Jul 2022 16:39:46 -0400 Subject: [PATCH 03/23] convert generic tfileid to bson --- src/db/mod.rs | 2 +- src/gridfs.rs | 163 +++++++++++++----------------------------- src/gridfs/options.rs | 93 ++++++++++++++++++++++++ 3 files changed, 144 insertions(+), 114 deletions(-) create mode 100644 src/gridfs/options.rs diff --git a/src/db/mod.rs b/src/db/mod.rs index c9a488990..1b14f72a0 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -19,7 +19,7 @@ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, error::{Error, ErrorKind, Result}, - gridfs::{GridFsBucket, GridFsBucketOptions}, + gridfs::{GridFsBucket, options::GridFsBucketOptions}, operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, options::{ AggregateOptions, diff --git a/src/gridfs.rs b/src/gridfs.rs index 1e3cb17d9..64e834771 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -1,3 +1,5 @@ +pub mod options; + use core::task::{Context, Poll}; use std::{ io::{self, Result}, @@ -8,104 +10,19 @@ use std::{ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, - selection_criteria::{ReadPreference, SelectionCriteria}, + selection_criteria::SelectionCriteria, Database, }; -use bson::{oid::ObjectId, DateTime, Document}; -use serde::Deserialize; -use typed_builder::TypedBuilder; +use options::*; +use bson::{oid::ObjectId, DateTime, Document, Bson}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -/// Contains the options for creating a [`GridFsBucket`]. -#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] -#[builder(field_defaults(setter(into)))] -#[non_exhaustive] -pub struct GridFsBucketOptions { - /// The bucket name. Defaults to 'fs'. - pub bucket_name: Option, - - /// The chunk size in bytes. Defaults to 255 KiB. - pub chunk_size_bytes: Option, - - /// The write concern. Defaults to the write concern of the database. - pub write_concern: Option, - - /// The read concern. Defaults to the read concern of the database. - pub read_concern: Option, - - /// The read preference. Defaults to the read preference of the database. - pub read_preference: Option, -} - -/// Contains the options for creating a [`GridFsStream`] to upload a file to a -/// [`GridFsBucket`]. -#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] -#[builder(field_defaults(setter(into)))] -#[non_exhaustive] -pub struct GridFsUploadOptions { - /// The number of bytes per chunk of this file. Defaults to the chunk_size_bytes in the - /// GridFsBucketOptions. - pub chunk_size_bytes: Option, - - /// User data for the 'metadata' field of the files collection document. - pub metadata: Option, -} - -/// Contains the options for creating [`GridFsStream`] to retrieve a stored file -/// from a [`GridFsBucket`]. -#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] -#[builder(field_defaults(setter(into)))] -#[non_exhaustive] -pub struct GridFsDownloadByNameOptions { - /// Which revision (documents with the same filename and different `upload_date`) - /// of the file to retrieve. Defaults to -1 (the most recent revision). - /// - /// Revision numbers are defined as follows: - /// 0 = the original stored file - /// 1 = the first revision - /// 2 = the second revision - /// etc... - /// -2 = the second most recent revision - /// -1 = the most recent revision - pub revision: Option, -} - -/// Contains the options for performing a find operation on a files collection. -#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] -#[builder(field_defaults(setter(into)))] -#[non_exhaustive] -pub struct GridFsFindOptions { - /// Enables writing to temporary files on the server. When set to true, the - /// server can write temporary data to disk while executing the find operation - /// on the files collection. - pub allow_disk_use: Option, - - /// The number of documents to return per batch. - pub batch_size: Option, - - /// The maximum number of documents to return. - pub limit: Option, - - /// The maximum amount of time to allow the query to run. - pub max_time_ms: Option, - - /// The server normally times out idle cursors after an inactivity period - /// (10 minutes) to prevent excess memory use. Set this option to prevent that. - pub no_cursor_timeout: Option, - - /// The number of documents to skip before returning. - pub skip: i32, - - /// The order by which to sort results. Defaults to not sorting. - pub sort: Option, -} - // Contained in a "chunks" collection for each user file -struct Chunk { +struct Chunk { id: ObjectId, - files_id: T, + files_id: Bson, n: i32, // default size is 255 KiB data: Vec, @@ -113,8 +30,8 @@ struct Chunk { // A collection in which information about stored files is stored. There will be one files // collection document per stored file. -struct FilesCollectionDocument { - id: T, +pub struct FilesCollectionDocument { + id: Bson, length: i64, chunk_size: i32, upload_date: DateTime, @@ -129,7 +46,9 @@ pub struct GridFsBucket { pub(crate) options: Option, } +// TODO: RUST-1399 Add documentation and example code for this struct. pub struct GridFsStream { + pub id: Bson, _pin: PhantomPinned, } @@ -179,46 +98,64 @@ impl GridFsBucket { } /// Gets the read preference of the [`GridFsBucket`]. - pub fn read_preference(&self) -> Option<&ReadPreference> { + pub fn read_preference(&self) -> Option<&SelectionCriteria> { if let Some(options) = &self.options { - if let Some(ref rp) = options.read_preference { - return Some(rp); + if options.read_preference.is_some() { + return options.read_preference.as_ref(); } } - if let Some(SelectionCriteria::ReadPreference(ref rp)) = self.db.selection_criteria() { - Some(rp) - } else { - None - } + self.db.selection_criteria() } /// Opens a [`GridFsStream`] that the application can write the contents of the file to. /// The application provides a custom file id. /// /// Returns a [`GridFsStream`] to which the application will write the contents. - pub fn open_upload_stream_with_id( + pub fn open_upload_stream_with_id( &self, - id: T, + id: Bson, filename: String, - options: GridFsUploadOptions, + options: impl Into, ) -> GridFsStream { todo!() } + /// Opens a [`GridFsStream`] that the application can write the contents of the file to. + /// The driver generates a unique [`Bson::ObjectId`] for the file id. + /// + /// Returns a [`GridFsStream`] to which the application will write the contents. + pub fn open_upload_stream( + &self, + filename: String, + options: impl Into, + ) -> GridFsStream { + self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options) + } + /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. - pub fn upload_from_stream_with_id( + pub fn upload_from_stream_with_id( &self, - id: T, + id: Bson, filename: String, source: GridFsStream, - option: GridFsUploadOptions, + options: impl Into, ) { todo!() } + /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for the file id. + pub fn upload_from_stream( + &self, + filename: String, + source: GridFsStream, + options: impl Into, + ) { + self.upload_from_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, source, options) + } + /// Opens and returns a [`GridFsStream`] from which the application can read /// the contents of the stored file specified by `id`. - pub fn open_download_stream(&self, id: T) -> GridFsStream { + pub fn open_download_stream(&self, id: Bson) -> GridFsStream { todo!() } @@ -228,14 +165,14 @@ impl GridFsBucket { pub fn open_download_stream_by_name( &self, filename: String, - options: GridFsDownloadByNameOptions, + options: impl Into, ) -> GridFsStream { todo!() } /// Downloads the contents of the stored file specified by `id` and writes /// the contents to the destination [`GridFsStream`]. - pub fn download_to_stream(&self, id: T, destination: GridFsStream) { + pub fn download_to_stream(&self, id: Bson, destination: GridFsStream) { todo!() } @@ -246,24 +183,24 @@ impl GridFsBucket { &self, filename: String, destination: GridFsStream, - options: GridFsDownloadByNameOptions, + options: impl Into, ) { todo!() } /// Given an `id`, deletes the stored file's files collection document and /// associated chunks from a [`GridFsBucket`]. - pub fn delete(&self, id: T) { + pub fn delete(&self, id: Bson) { todo!() } /// Finds and returns the files collection documents that match the filter. - pub fn find(&self, filter: Document, options: GridFsBucketOptions) -> Result> { + pub fn find(&self, filter: Document, options: impl Into) -> Result> { todo!() } /// Renames the stored file with the specified `id`. - pub fn rename(&self, id: T, new_filename: String) { + pub fn rename(&self, id: Bson, new_filename: String) { todo!() } diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs new file mode 100644 index 000000000..8cee8f038 --- /dev/null +++ b/src/gridfs/options.rs @@ -0,0 +1,93 @@ +use serde::Deserialize; +use typed_builder::TypedBuilder; +use crate::{ + concern::{ReadConcern, WriteConcern}, + selection_criteria::SelectionCriteria, +}; + +use bson::Document; + + +/// Contains the options for creating a [`GridFsBucket`]. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsBucketOptions { + /// The bucket name. Defaults to 'fs'. + pub bucket_name: Option, + + /// The chunk size in bytes used to break the user file into chunks. Defaults to 255 KiB. + pub chunk_size_bytes: Option, + + /// The write concern. Defaults to the write concern of the database. + pub write_concern: Option, + + /// The read concern. Defaults to the read concern of the database. + pub read_concern: Option, + + /// The read preference. Defaults to the read preference of the database. + pub read_preference: Option, +} + +/// Contains the options for creating a [`GridFsStream`] to upload a file to a +/// [`GridFsBucket`]. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsUploadOptions { + /// The number of bytes per chunk of this file. Defaults to the `chunk_size_bytes` specified + /// in the [`GridFsBucketOptions`]. + pub chunk_size_bytes: Option, + + /// User data for the 'metadata' field of the files collection document. + pub metadata: Option, +} + +/// Contains the options for creating [`GridFsStream`] to retrieve a stored file +/// from a [`GridFsBucket`]. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsDownloadByNameOptions { + /// Which revision (documents with the same filename and different `upload_date`) + /// of the file to retrieve. Defaults to -1 (the most recent revision). + /// + /// Revision numbers are defined as follows: + /// 0 = the original stored file + /// 1 = the first revision + /// 2 = the second revision + /// etc... + /// -2 = the second most recent revision + /// -1 = the most recent revision + pub revision: Option, +} + +/// Contains the options for performing a find operation on a files collection. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsFindOptions { + /// Enables writing to temporary files on the server. When set to true, the + /// server can write temporary data to disk while executing the find operation + /// on the files collection. + pub allow_disk_use: Option, + + /// The number of documents to return per batch. + pub batch_size: Option, + + /// The maximum number of documents to return. + pub limit: Option, + + /// The maximum amount of time to allow the query to run. + pub max_time_ms: Option, + + /// The server normally times out idle cursors after an inactivity period + /// (10 minutes) to prevent excess memory use. Set this option to prevent that. + pub no_cursor_timeout: Option, + + /// The number of documents to skip before returning. + pub skip: i32, + + /// The order by which to sort results. Defaults to not sorting. + pub sort: Option, +} \ No newline at end of file From d91561460f874be6baa482d1a364cd8a4b9a36a3 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Tue, 12 Jul 2022 16:43:41 -0400 Subject: [PATCH 04/23] run fmt --- src/db/mod.rs | 2 +- src/gridfs.rs | 11 ++++++++--- src/gridfs/options.rs | 7 +++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 1b14f72a0..ff5dc1041 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -19,7 +19,7 @@ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, error::{Error, ErrorKind, Result}, - gridfs::{GridFsBucket, options::GridFsBucketOptions}, + gridfs::{options::GridFsBucketOptions, GridFsBucket}, operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, options::{ AggregateOptions, diff --git a/src/gridfs.rs b/src/gridfs.rs index 64e834771..b5fe2f423 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -16,7 +16,7 @@ use crate::{ use options::*; -use bson::{oid::ObjectId, DateTime, Document, Bson}; +use bson::{oid::ObjectId, Bson, DateTime, Document}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; // Contained in a "chunks" collection for each user file @@ -143,7 +143,8 @@ impl GridFsBucket { todo!() } - /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for the file id. + /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for + /// the file id. pub fn upload_from_stream( &self, filename: String, @@ -195,7 +196,11 @@ impl GridFsBucket { } /// Finds and returns the files collection documents that match the filter. - pub fn find(&self, filter: Document, options: impl Into) -> Result> { + pub fn find( + &self, + filter: Document, + options: impl Into, + ) -> Result> { todo!() } diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs index 8cee8f038..0426e737c 100644 --- a/src/gridfs/options.rs +++ b/src/gridfs/options.rs @@ -1,13 +1,12 @@ -use serde::Deserialize; -use typed_builder::TypedBuilder; use crate::{ concern::{ReadConcern, WriteConcern}, selection_criteria::SelectionCriteria, }; +use serde::Deserialize; +use typed_builder::TypedBuilder; use bson::Document; - /// Contains the options for creating a [`GridFsBucket`]. #[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] #[builder(field_defaults(setter(into)))] @@ -90,4 +89,4 @@ pub struct GridFsFindOptions { /// The order by which to sort results. Defaults to not sorting. pub sort: Option, -} \ No newline at end of file +} From 8cbfffe309ed79915da9cd21efb8f9c784573c2c Mon Sep 17 00:00:00 2001 From: sanav33 Date: Mon, 25 Jul 2022 10:41:05 -0400 Subject: [PATCH 05/23] split GridFsStream type --- src/gridfs.rs | 84 +++++++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index b5fe2f423..67025a219 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -2,9 +2,8 @@ pub mod options; use core::task::{Context, Poll}; use std::{ - io::{self, Result}, - marker::PhantomPinned, pin::Pin, + io::{self, Error} }; use crate::{ @@ -47,31 +46,44 @@ pub struct GridFsBucket { } // TODO: RUST-1399 Add documentation and example code for this struct. -pub struct GridFsStream { +pub struct GridFsUploadStream { pub id: Bson, - _pin: PhantomPinned, } -impl AsyncRead for GridFsStream { - fn poll_read( +impl AsyncWrite for GridFsUploadStream { + fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { + buf: &[u8] + ) -> Poll> { todo!() } -} -impl AsyncWrite for GridFsStream { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_> + ) -> Poll> { todo!() } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_> + ) -> Poll> { todo!() } +} - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { +pub struct GridFsDownloadStream { + pub id: Bson, +} + +impl AsyncRead for GridFsDownloadStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_> + ) -> Poll> { todo!() } } @@ -107,28 +119,28 @@ impl GridFsBucket { self.db.selection_criteria() } - /// Opens a [`GridFsStream`] that the application can write the contents of the file to. + /// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. /// The application provides a custom file id. /// - /// Returns a [`GridFsStream`] to which the application will write the contents. + /// Returns a [`GridFsUploadStream`] to which the application will write the contents. pub fn open_upload_stream_with_id( &self, id: Bson, filename: String, - options: impl Into, - ) -> GridFsStream { + options: impl Into>, + ) -> GridFsUploadStream { todo!() } - /// Opens a [`GridFsStream`] that the application can write the contents of the file to. + /// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. /// The driver generates a unique [`Bson::ObjectId`] for the file id. /// - /// Returns a [`GridFsStream`] to which the application will write the contents. + /// Returns a [`GridFsUploadStream`] to which the application will write the contents. pub fn open_upload_stream( &self, filename: String, - options: impl Into, - ) -> GridFsStream { + options: impl Into>, + ) -> GridFsUploadStream { self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options) } @@ -137,8 +149,8 @@ impl GridFsBucket { &self, id: Bson, filename: String, - source: GridFsStream, - options: impl Into, + source: impl AsyncRead, + options: impl Into>, ) { todo!() } @@ -148,32 +160,32 @@ impl GridFsBucket { pub fn upload_from_stream( &self, filename: String, - source: GridFsStream, - options: impl Into, + source: impl AsyncRead, + options: impl Into>, ) { self.upload_from_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, source, options) } - /// Opens and returns a [`GridFsStream`] from which the application can read + /// Opens and returns a [`GridFsDownloadStream`] from which the application can read /// the contents of the stored file specified by `id`. - pub fn open_download_stream(&self, id: Bson) -> GridFsStream { + pub fn open_download_stream(&self, id: Bson) -> GridFsDownloadStream { todo!() } - /// Opens and returns a [`GridFsStream`] from which the application can read + /// Opens and returns a [`GridFsDownloadStream`] from which the application can read /// the contents of the stored file specified by `filename` and the revision /// in `options`. pub fn open_download_stream_by_name( &self, filename: String, - options: impl Into, - ) -> GridFsStream { + options: impl Into>, + ) -> GridFsDownloadStream { todo!() } /// Downloads the contents of the stored file specified by `id` and writes - /// the contents to the destination [`GridFsStream`]. - pub fn download_to_stream(&self, id: Bson, destination: GridFsStream) { + /// the contents to the destination [`GridFsDownloadStream`]. + pub fn download_to_stream(&self, id: Bson, destination: impl AsyncWrite) { todo!() } @@ -183,8 +195,8 @@ impl GridFsBucket { pub fn download_to_stream_by_name( &self, filename: String, - destination: GridFsStream, - options: impl Into, + destination: impl AsyncWrite, + options: impl Into>, ) { todo!() } @@ -199,8 +211,8 @@ impl GridFsBucket { pub fn find( &self, filter: Document, - options: impl Into, - ) -> Result> { + options: impl Into>, + ) -> io::Result> { todo!() } From e5f6e99b66357ac69d51ef04bd53fe72d2aa6217 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Tue, 26 Jul 2022 12:20:58 -0400 Subject: [PATCH 06/23] add finish --- src/gridfs.rs | 102 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index 67025a219..0e1aec124 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -2,8 +2,8 @@ pub mod options; use core::task::{Context, Poll}; use std::{ + io::{self, Error}, pin::Pin, - io::{self, Error} }; use crate::{ @@ -16,6 +16,7 @@ use crate::{ use options::*; use bson::{oid::ObjectId, Bson, DateTime, Document}; +use futures_util; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; // Contained in a "chunks" collection for each user file @@ -50,26 +51,27 @@ pub struct GridFsUploadStream { pub id: Bson, } +impl GridFsUploadStream { + /// Consumes the stream and uploads data in the stream to the server. + pub fn finish(self) { + todo!() + } +} + impl AsyncWrite for GridFsUploadStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &[u8] + buf: &[u8], ) -> Poll> { todo!() } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_> - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { todo!() } - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_> - ) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { todo!() } } @@ -82,7 +84,7 @@ impl AsyncRead for GridFsDownloadStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut ReadBuf<'_> + buf: &mut ReadBuf<'_>, ) -> Poll> { todo!() } @@ -144,26 +146,60 @@ impl GridFsBucket { self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options) } - /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. - pub fn upload_from_stream_with_id( + /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the + /// `tokio` runtime. + pub fn upload_from_stream_with_id_tokio( &self, id: Bson, filename: String, - source: impl AsyncRead, + source: impl tokio::io::AsyncRead, + options: impl Into>, + ) { + todo!() + } + + /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the + /// `futures` crate. + pub fn upload_from_stream_with_id_futures( + &self, + id: Bson, + filename: String, + source: impl futures_util::io::AsyncRead, options: impl Into>, ) { todo!() } /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for - /// the file id. - pub fn upload_from_stream( + /// the file id. Uses the `tokio` runtime. + pub fn upload_from_stream_tokio( &self, filename: String, source: impl AsyncRead, options: impl Into>, ) { - self.upload_from_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, source, options) + self.upload_from_stream_with_id_tokio( + Bson::ObjectId(ObjectId::new()), + filename, + source, + options, + ) + } + + /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for + /// the file id. Uses the `futures` crate. + pub fn upload_from_stream_futures( + &self, + filename: String, + source: impl futures_util::io::AsyncRead, + options: impl Into>, + ) { + self.upload_from_stream_with_id_futures( + Bson::ObjectId(ObjectId::new()), + filename, + source, + options, + ) } /// Opens and returns a [`GridFsDownloadStream`] from which the application can read @@ -184,18 +220,40 @@ impl GridFsBucket { } /// Downloads the contents of the stored file specified by `id` and writes - /// the contents to the destination [`GridFsDownloadStream`]. - pub fn download_to_stream(&self, id: Bson, destination: impl AsyncWrite) { + /// the contents to the destination [`GridFsDownloadStream`]. Uses the `tokio` runtime. + pub fn download_to_stream_tokio(&self, id: Bson, destination: impl tokio::io::AsyncWrite) { + todo!() + } + + /// Downloads the contents of the stored file specified by `id` and writes + /// the contents to the destination [`GridFsDownloadStream`]. Uses the `futures` crate. + pub fn download_to_stream_futures( + &self, + id: Bson, + destination: impl futures_util::io::AsyncWrite, + ) { + todo!() + } + + /// Downloads the contents of the stored file specified by `filename` and by + /// the revision in `options` and writes the contents to the destination + /// [`GridFsStream`]. Uses the `tokio` runtime. + pub fn download_to_stream_by_name_tokio( + &self, + filename: String, + destination: impl tokio::io::AsyncWrite, + options: impl Into>, + ) { todo!() } /// Downloads the contents of the stored file specified by `filename` and by /// the revision in `options` and writes the contents to the destination - /// [`GridFsStream`]. - pub fn download_to_stream_by_name( + /// [`GridFsStream`]. Uses the `futures` crate. + pub fn download_to_stream_by_name_futures( &self, filename: String, - destination: impl AsyncWrite, + destination: impl futures_util::io::AsyncWrite, options: impl Into>, ) { todo!() From 49cdb7d2782e4a881ffb4e22f0549bb0156232de Mon Sep 17 00:00:00 2001 From: sanav33 Date: Tue, 26 Jul 2022 16:03:57 -0400 Subject: [PATCH 07/23] make gridfs stream methods return result --- src/gridfs.rs | 59 ++++++++++++++++++++++++++------------------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index 0e1aec124..411e2fcd3 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -2,7 +2,7 @@ pub mod options; use core::task::{Context, Poll}; use std::{ - io::{self, Error}, + io, pin::Pin, }; @@ -11,6 +11,7 @@ use crate::{ cursor::Cursor, selection_criteria::SelectionCriteria, Database, + error::{Error, Result}, }; use options::*; @@ -58,20 +59,20 @@ impl GridFsUploadStream { } } -impl AsyncWrite for GridFsUploadStream { +impl tokio::io::AsyncWrite for GridFsUploadStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], - ) -> Poll> { + ) -> Poll> { todo!() } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { todo!() } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { todo!() } } @@ -80,12 +81,12 @@ pub struct GridFsDownloadStream { pub id: Bson, } -impl AsyncRead for GridFsDownloadStream { +impl tokio::io::AsyncRead for GridFsDownloadStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, - ) -> Poll> { + ) -> Poll> { todo!() } } @@ -125,12 +126,12 @@ impl GridFsBucket { /// The application provides a custom file id. /// /// Returns a [`GridFsUploadStream`] to which the application will write the contents. - pub fn open_upload_stream_with_id( + pub async fn open_upload_stream_with_id( &self, id: Bson, filename: String, options: impl Into>, - ) -> GridFsUploadStream { + ) -> Result { todo!() } @@ -138,17 +139,17 @@ impl GridFsBucket { /// The driver generates a unique [`Bson::ObjectId`] for the file id. /// /// Returns a [`GridFsUploadStream`] to which the application will write the contents. - pub fn open_upload_stream( + pub async fn open_upload_stream( &self, filename: String, options: impl Into>, - ) -> GridFsUploadStream { - self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options) + ) -> Result { + self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options).await } /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the /// `tokio` runtime. - pub fn upload_from_stream_with_id_tokio( + pub async fn upload_from_stream_with_id_tokio( &self, id: Bson, filename: String, @@ -160,7 +161,7 @@ impl GridFsBucket { /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the /// `futures` crate. - pub fn upload_from_stream_with_id_futures( + pub async fn upload_from_stream_with_id_futures( &self, id: Bson, filename: String, @@ -172,7 +173,7 @@ impl GridFsBucket { /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for /// the file id. Uses the `tokio` runtime. - pub fn upload_from_stream_tokio( + pub async fn upload_from_stream_tokio( &self, filename: String, source: impl AsyncRead, @@ -183,12 +184,12 @@ impl GridFsBucket { filename, source, options, - ) + ).await } /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for /// the file id. Uses the `futures` crate. - pub fn upload_from_stream_futures( + pub async fn upload_from_stream_futures( &self, filename: String, source: impl futures_util::io::AsyncRead, @@ -199,35 +200,35 @@ impl GridFsBucket { filename, source, options, - ) + ).await } /// Opens and returns a [`GridFsDownloadStream`] from which the application can read /// the contents of the stored file specified by `id`. - pub fn open_download_stream(&self, id: Bson) -> GridFsDownloadStream { + pub async fn open_download_stream(&self, id: Bson) -> Result { todo!() } /// Opens and returns a [`GridFsDownloadStream`] from which the application can read /// the contents of the stored file specified by `filename` and the revision /// in `options`. - pub fn open_download_stream_by_name( + pub async fn open_download_stream_by_name( &self, filename: String, options: impl Into>, - ) -> GridFsDownloadStream { + ) -> Result { todo!() } /// Downloads the contents of the stored file specified by `id` and writes /// the contents to the destination [`GridFsDownloadStream`]. Uses the `tokio` runtime. - pub fn download_to_stream_tokio(&self, id: Bson, destination: impl tokio::io::AsyncWrite) { + pub async fn download_to_stream_tokio(&self, id: Bson, destination: impl tokio::io::AsyncWrite) { todo!() } /// Downloads the contents of the stored file specified by `id` and writes /// the contents to the destination [`GridFsDownloadStream`]. Uses the `futures` crate. - pub fn download_to_stream_futures( + pub async fn download_to_stream_futures( &self, id: Bson, destination: impl futures_util::io::AsyncWrite, @@ -238,7 +239,7 @@ impl GridFsBucket { /// Downloads the contents of the stored file specified by `filename` and by /// the revision in `options` and writes the contents to the destination /// [`GridFsStream`]. Uses the `tokio` runtime. - pub fn download_to_stream_by_name_tokio( + pub async fn download_to_stream_by_name_tokio( &self, filename: String, destination: impl tokio::io::AsyncWrite, @@ -250,7 +251,7 @@ impl GridFsBucket { /// Downloads the contents of the stored file specified by `filename` and by /// the revision in `options` and writes the contents to the destination /// [`GridFsStream`]. Uses the `futures` crate. - pub fn download_to_stream_by_name_futures( + pub async fn download_to_stream_by_name_futures( &self, filename: String, destination: impl futures_util::io::AsyncWrite, @@ -261,12 +262,12 @@ impl GridFsBucket { /// Given an `id`, deletes the stored file's files collection document and /// associated chunks from a [`GridFsBucket`]. - pub fn delete(&self, id: Bson) { + pub async fn delete(&self, id: Bson) { todo!() } /// Finds and returns the files collection documents that match the filter. - pub fn find( + pub async fn find( &self, filter: Document, options: impl Into>, @@ -275,12 +276,12 @@ impl GridFsBucket { } /// Renames the stored file with the specified `id`. - pub fn rename(&self, id: Bson, new_filename: String) { + pub async fn rename(&self, id: Bson, new_filename: String) { todo!() } /// Drops the files associated with this bucket. - pub fn drop(&self) { + pub async fn drop(&self) { todo!() } } From 6a8fddb96683d86d408b2f74cc59413f20c6fa91 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Wed, 27 Jul 2022 14:36:15 -0400 Subject: [PATCH 08/23] add abort method for upload stream --- src/gridfs.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/gridfs.rs b/src/gridfs.rs index 411e2fcd3..c97aa98d1 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -57,6 +57,11 @@ impl GridFsUploadStream { pub fn finish(self) { todo!() } + + /// Aborts the upload and discards the upload stream. + pub fn abort(self) { + todo!() + } } impl tokio::io::AsyncWrite for GridFsUploadStream { From 2849c7ccd9ca2f01d84e42c4a8505436a4a7fc23 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Wed, 27 Jul 2022 20:27:52 -0400 Subject: [PATCH 09/23] eliminate options in gridfsbucket struct --- src/db/mod.rs | 18 +++++++++++++++++- src/gridfs.rs | 49 +++++++++++++++++++++---------------------------- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index ff5dc1041..8d127c8fc 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -571,9 +571,25 @@ impl Database { &self, options: impl Into>, ) -> GridFsBucket { + let options: GridFsBucketOptions = options.into().unwrap_or_default(); + let read_concern = options + .read_concern + .or_else(|| self.read_concern().cloned()); + let write_concern = options + .write_concern + .or_else(|| self.write_concern().cloned()); + let read_preference = options + .read_preference + .or_else(|| self.selection_criteria().cloned()); + let bucket_name = options.bucket_name.unwrap_or_else(|| "fs".to_string()); + let chunk_size_bytes = options.chunk_size_bytes.unwrap_or(255 * 1024); GridFsBucket { db: self.clone(), - options: options.into(), + bucket_name, + chunk_size_bytes, + read_concern, + write_concern, + read_preference, } } } diff --git a/src/gridfs.rs b/src/gridfs.rs index c97aa98d1..a959c68f2 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -1,17 +1,14 @@ pub mod options; use core::task::{Context, Poll}; -use std::{ - io, - pin::Pin, -}; +use std::{io, pin::Pin}; use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, + error::{Error, Result}, selection_criteria::SelectionCriteria, Database, - error::{Error, Result}, }; use options::*; @@ -43,8 +40,12 @@ pub struct FilesCollectionDocument { /// Struct for storing GridFS managed files within a [`Database`]. pub struct GridFsBucket { // Contains a "chunks" collection + pub(crate) bucket_name: String, pub(crate) db: Database, - pub(crate) options: Option, + pub(crate) chunk_size_bytes: i32, + pub(crate) read_concern: Option, + pub(crate) write_concern: Option, + pub(crate) read_preference: Option, } // TODO: RUST-1399 Add documentation and example code for this struct. @@ -99,32 +100,17 @@ impl tokio::io::AsyncRead for GridFsDownloadStream { impl GridFsBucket { /// Gets the read concern of the [`GridFsBucket`]. pub fn read_concern(&self) -> Option<&ReadConcern> { - if let Some(options) = &self.options { - if let Some(ref rc) = options.read_concern { - return Some(rc); - } - } - self.db.read_concern() + self.read_concern.as_ref() } /// Gets the write concern of the [`GridFsBucket`]. pub fn write_concern(&self) -> Option<&WriteConcern> { - if let Some(options) = &self.options { - if let Some(ref wc) = options.write_concern { - return Some(wc); - } - } - self.db.write_concern() + self.write_concern.as_ref() } /// Gets the read preference of the [`GridFsBucket`]. pub fn read_preference(&self) -> Option<&SelectionCriteria> { - if let Some(options) = &self.options { - if options.read_preference.is_some() { - return options.read_preference.as_ref(); - } - } - self.db.selection_criteria() + self.read_preference.as_ref() } /// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. @@ -149,7 +135,8 @@ impl GridFsBucket { filename: String, options: impl Into>, ) -> Result { - self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options).await + self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options) + .await } /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the @@ -189,7 +176,8 @@ impl GridFsBucket { filename, source, options, - ).await + ) + .await } /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for @@ -205,7 +193,8 @@ impl GridFsBucket { filename, source, options, - ).await + ) + .await } /// Opens and returns a [`GridFsDownloadStream`] from which the application can read @@ -227,7 +216,11 @@ impl GridFsBucket { /// Downloads the contents of the stored file specified by `id` and writes /// the contents to the destination [`GridFsDownloadStream`]. Uses the `tokio` runtime. - pub async fn download_to_stream_tokio(&self, id: Bson, destination: impl tokio::io::AsyncWrite) { + pub async fn download_to_stream_tokio( + &self, + id: Bson, + destination: impl tokio::io::AsyncWrite, + ) { todo!() } From 72b39b3a66b01ebb784a3cca4aa2b4c44bf7d726 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Mon, 1 Aug 2022 09:40:09 -0400 Subject: [PATCH 10/23] change method names in gridfs api --- src/gridfs.rs | 73 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index a959c68f2..90bd175ee 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -12,7 +12,7 @@ use crate::{ }; use options::*; - +use serde::{Deserialize, Serialize}; use bson::{oid::ObjectId, Bson, DateTime, Document}; use futures_util; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -26,8 +26,9 @@ struct Chunk { data: Vec, } -// A collection in which information about stored files is stored. There will be one files -// collection document per stored file. +/// A collection in which information about stored files is stored. There will be one files +/// collection document per stored file. +#[derive(Serialize, Deserialize)] pub struct FilesCollectionDocument { id: Bson, length: i64, @@ -50,17 +51,22 @@ pub struct GridFsBucket { // TODO: RUST-1399 Add documentation and example code for this struct. pub struct GridFsUploadStream { - pub id: Bson, + id: Bson, } impl GridFsUploadStream { + /// Gets the stream `id`. + pub fn stream_id(&self) -> &Bson { + &self.id + } + /// Consumes the stream and uploads data in the stream to the server. - pub fn finish(self) { + pub async fn finish(self) { todo!() } /// Aborts the upload and discards the upload stream. - pub fn abort(self) { + pub async fn abort(self) { todo!() } } @@ -84,7 +90,14 @@ impl tokio::io::AsyncWrite for GridFsUploadStream { } pub struct GridFsDownloadStream { - pub id: Bson, + id: Bson, +} + +impl GridFsDownloadStream { + /// Gets the stream `id`. + pub fn stream_id(&self) -> &Bson { + &self.id + } } impl tokio::io::AsyncRead for GridFsDownloadStream { @@ -140,8 +153,8 @@ impl GridFsBucket { } /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the - /// `tokio` runtime. - pub async fn upload_from_stream_with_id_tokio( + /// `tokio` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_tokio_reader_with_id( &self, id: Bson, filename: String, @@ -152,8 +165,8 @@ impl GridFsBucket { } /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the - /// `futures` crate. - pub async fn upload_from_stream_with_id_futures( + /// `futures` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_futures_reader_with_id( &self, id: Bson, filename: String, @@ -164,14 +177,14 @@ impl GridFsBucket { } /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for - /// the file id. Uses the `tokio` runtime. - pub async fn upload_from_stream_tokio( + /// the file id. Uses the `tokio` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_tokio_reader( &self, filename: String, - source: impl AsyncRead, + source: impl tokio::io::AsyncRead, options: impl Into>, ) { - self.upload_from_stream_with_id_tokio( + self.upload_from_tokio_reader_with_id( Bson::ObjectId(ObjectId::new()), filename, source, @@ -181,14 +194,14 @@ impl GridFsBucket { } /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for - /// the file id. Uses the `futures` crate. - pub async fn upload_from_stream_futures( + /// the file id. Uses the `futures` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_futures_reader( &self, filename: String, source: impl futures_util::io::AsyncRead, options: impl Into>, ) { - self.upload_from_stream_with_id_futures( + self.upload_from_futures_reader_with_id( Bson::ObjectId(ObjectId::new()), filename, source, @@ -215,8 +228,9 @@ impl GridFsBucket { } /// Downloads the contents of the stored file specified by `id` and writes - /// the contents to the destination [`GridFsDownloadStream`]. Uses the `tokio` runtime. - pub async fn download_to_stream_tokio( + /// the contents to the `destination`. Uses the `tokio` crate's `AsyncWrite` + /// trait for the `destination`. + pub async fn download_to_tokio_writer( &self, id: Bson, destination: impl tokio::io::AsyncWrite, @@ -225,8 +239,9 @@ impl GridFsBucket { } /// Downloads the contents of the stored file specified by `id` and writes - /// the contents to the destination [`GridFsDownloadStream`]. Uses the `futures` crate. - pub async fn download_to_stream_futures( + /// the contents to the `destination`. Uses the + /// `futures` crate's `AsyncWrite` trait for the `destination`. + pub async fn download_to_futures_writer( &self, id: Bson, destination: impl futures_util::io::AsyncWrite, @@ -235,9 +250,9 @@ impl GridFsBucket { } /// Downloads the contents of the stored file specified by `filename` and by - /// the revision in `options` and writes the contents to the destination - /// [`GridFsStream`]. Uses the `tokio` runtime. - pub async fn download_to_stream_by_name_tokio( + /// the revision in `options` and writes the contents to the `destination`. Uses the + /// `tokio` crate's `AsyncWrite` trait for the `destination`. + pub async fn download_to_tokio_writer_by_name( &self, filename: String, destination: impl tokio::io::AsyncWrite, @@ -247,9 +262,9 @@ impl GridFsBucket { } /// Downloads the contents of the stored file specified by `filename` and by - /// the revision in `options` and writes the contents to the destination - /// [`GridFsStream`]. Uses the `futures` crate. - pub async fn download_to_stream_by_name_futures( + /// the revision in `options` and writes the contents to the `destination`. Uses the + /// `futures` crate's `AsyncWrite` trait for the `destination`. + pub async fn download_to_futures_writer_by_name( &self, filename: String, destination: impl futures_util::io::AsyncWrite, @@ -269,7 +284,7 @@ impl GridFsBucket { &self, filter: Document, options: impl Into>, - ) -> io::Result> { + ) -> Result> { todo!() } From 6fa82675e56149af1ec52ad1026865e7a97eb975 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Mon, 1 Aug 2022 16:13:57 -0400 Subject: [PATCH 11/23] make type changes --- src/db/mod.rs | 12 ++++++------ src/gridfs.rs | 17 ++++++++++------- src/gridfs/options.rs | 19 ++++++++++--------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 8d127c8fc..9ea9121dd 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -19,7 +19,7 @@ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, error::{Error, ErrorKind, Result}, - gridfs::{options::GridFsBucketOptions, GridFsBucket}, + gridfs::{options::GridFsBucketOptions, GridFsBucket, DEFAULT_BUCKET_NAME, DEFAULT_CHUNK_SIZE_BYTES}, operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, options::{ AggregateOptions, @@ -578,18 +578,18 @@ impl Database { let write_concern = options .write_concern .or_else(|| self.write_concern().cloned()); - let read_preference = options - .read_preference + let selection_criteria = options + .selection_criteria .or_else(|| self.selection_criteria().cloned()); - let bucket_name = options.bucket_name.unwrap_or_else(|| "fs".to_string()); - let chunk_size_bytes = options.chunk_size_bytes.unwrap_or(255 * 1024); + let bucket_name = options.bucket_name.unwrap_or_else(|| DEFAULT_BUCKET_NAME.to_string()); + let chunk_size_bytes = options.chunk_size_bytes.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES); GridFsBucket { db: self.clone(), bucket_name, chunk_size_bytes, read_concern, write_concern, - read_preference, + selection_criteria, } } } diff --git a/src/gridfs.rs b/src/gridfs.rs index 90bd175ee..42d13fd92 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -17,11 +17,14 @@ use bson::{oid::ObjectId, Bson, DateTime, Document}; use futures_util; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +pub const DEFAULT_BUCKET_NAME: &'static str = "fs"; +pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024; + // Contained in a "chunks" collection for each user file struct Chunk { id: ObjectId, files_id: Bson, - n: i32, + n: u32, // default size is 255 KiB data: Vec, } @@ -32,7 +35,7 @@ struct Chunk { pub struct FilesCollectionDocument { id: Bson, length: i64, - chunk_size: i32, + chunk_size: u32, upload_date: DateTime, filename: String, metadata: Document, @@ -43,10 +46,10 @@ pub struct GridFsBucket { // Contains a "chunks" collection pub(crate) bucket_name: String, pub(crate) db: Database, - pub(crate) chunk_size_bytes: i32, + pub(crate) chunk_size_bytes: u32, pub(crate) read_concern: Option, pub(crate) write_concern: Option, - pub(crate) read_preference: Option, + pub(crate) selection_criteria: Option, } // TODO: RUST-1399 Add documentation and example code for this struct. @@ -121,9 +124,9 @@ impl GridFsBucket { self.write_concern.as_ref() } - /// Gets the read preference of the [`GridFsBucket`]. - pub fn read_preference(&self) -> Option<&SelectionCriteria> { - self.read_preference.as_ref() + /// Gets the selection criteria of the [`GridFsBucket`]. + pub fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.selection_criteria.as_ref() } /// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs index 0426e737c..e1b5df528 100644 --- a/src/gridfs/options.rs +++ b/src/gridfs/options.rs @@ -2,6 +2,7 @@ use crate::{ concern::{ReadConcern, WriteConcern}, selection_criteria::SelectionCriteria, }; +use std::time::Duration; use serde::Deserialize; use typed_builder::TypedBuilder; @@ -16,7 +17,7 @@ pub struct GridFsBucketOptions { pub bucket_name: Option, /// The chunk size in bytes used to break the user file into chunks. Defaults to 255 KiB. - pub chunk_size_bytes: Option, + pub chunk_size_bytes: Option, /// The write concern. Defaults to the write concern of the database. pub write_concern: Option, @@ -24,8 +25,8 @@ pub struct GridFsBucketOptions { /// The read concern. Defaults to the read concern of the database. pub read_concern: Option, - /// The read preference. Defaults to the read preference of the database. - pub read_preference: Option, + /// The selection criteria. Defaults to the selection criteria of the database. + pub selection_criteria: Option, } /// Contains the options for creating a [`GridFsStream`] to upload a file to a @@ -36,7 +37,7 @@ pub struct GridFsBucketOptions { pub struct GridFsUploadOptions { /// The number of bytes per chunk of this file. Defaults to the `chunk_size_bytes` specified /// in the [`GridFsBucketOptions`]. - pub chunk_size_bytes: Option, + pub chunk_size_bytes: Option, /// User data for the 'metadata' field of the files collection document. pub metadata: Option, @@ -72,20 +73,20 @@ pub struct GridFsFindOptions { pub allow_disk_use: Option, /// The number of documents to return per batch. - pub batch_size: Option, + pub batch_size: Option, /// The maximum number of documents to return. - pub limit: Option, + pub limit: Option, /// The maximum amount of time to allow the query to run. - pub max_time_ms: Option, + pub max_time_ms: Option, /// The server normally times out idle cursors after an inactivity period - /// (10 minutes) to prevent excess memory use. Set this option to prevent that. + /// to prevent excess memory use. Set this option to prevent that. pub no_cursor_timeout: Option, /// The number of documents to skip before returning. - pub skip: i32, + pub skip: Option, /// The order by which to sort results. Defaults to not sorting. pub sort: Option, From e7ae7a7c2c2e92f31a1ac652493f3baed714d1f9 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Mon, 1 Aug 2022 16:59:29 -0400 Subject: [PATCH 12/23] run fmt --- src/db/mod.rs | 11 +++++++++-- src/gridfs.rs | 4 ++-- src/gridfs/options.rs | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 9ea9121dd..5a5f1ec4b 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -19,7 +19,12 @@ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, error::{Error, ErrorKind, Result}, - gridfs::{options::GridFsBucketOptions, GridFsBucket, DEFAULT_BUCKET_NAME, DEFAULT_CHUNK_SIZE_BYTES}, + gridfs::{ + options::GridFsBucketOptions, + GridFsBucket, + DEFAULT_BUCKET_NAME, + DEFAULT_CHUNK_SIZE_BYTES, + }, operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, options::{ AggregateOptions, @@ -581,7 +586,9 @@ impl Database { let selection_criteria = options .selection_criteria .or_else(|| self.selection_criteria().cloned()); - let bucket_name = options.bucket_name.unwrap_or_else(|| DEFAULT_BUCKET_NAME.to_string()); + let bucket_name = options + .bucket_name + .unwrap_or_else(|| DEFAULT_BUCKET_NAME.to_string()); let chunk_size_bytes = options.chunk_size_bytes.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES); GridFsBucket { db: self.clone(), diff --git a/src/gridfs.rs b/src/gridfs.rs index 42d13fd92..d0759f1d8 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -11,10 +11,10 @@ use crate::{ Database, }; -use options::*; -use serde::{Deserialize, Serialize}; use bson::{oid::ObjectId, Bson, DateTime, Document}; use futures_util; +use options::*; +use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub const DEFAULT_BUCKET_NAME: &'static str = "fs"; diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs index e1b5df528..b7a4d674f 100644 --- a/src/gridfs/options.rs +++ b/src/gridfs/options.rs @@ -2,8 +2,8 @@ use crate::{ concern::{ReadConcern, WriteConcern}, selection_criteria::SelectionCriteria, }; -use std::time::Duration; use serde::Deserialize; +use std::time::Duration; use typed_builder::TypedBuilder; use bson::Document; From 1487008e59687964a9fa5cfdb2b4980c053954b7 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Mon, 1 Aug 2022 17:19:31 -0400 Subject: [PATCH 13/23] remove noCursorTimeout option --- src/gridfs/options.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs index b7a4d674f..ab78b1c17 100644 --- a/src/gridfs/options.rs +++ b/src/gridfs/options.rs @@ -81,10 +81,6 @@ pub struct GridFsFindOptions { /// The maximum amount of time to allow the query to run. pub max_time_ms: Option, - /// The server normally times out idle cursors after an inactivity period - /// to prevent excess memory use. Set this option to prevent that. - pub no_cursor_timeout: Option, - /// The number of documents to skip before returning. pub skip: Option, From 8f8f0fc22fd097aef055c5e28b56ad5157b92a3d Mon Sep 17 00:00:00 2001 From: sanav33 Date: Mon, 1 Aug 2022 17:21:01 -0400 Subject: [PATCH 14/23] fixed docs for options --- src/gridfs/options.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs index ab78b1c17..36c57cedb 100644 --- a/src/gridfs/options.rs +++ b/src/gridfs/options.rs @@ -29,7 +29,7 @@ pub struct GridFsBucketOptions { pub selection_criteria: Option, } -/// Contains the options for creating a [`GridFsStream`] to upload a file to a +/// Contains the options for creating a [`GridFsUploadStream`] to upload a file to a /// [`GridFsBucket`]. #[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] #[builder(field_defaults(setter(into)))] @@ -43,7 +43,7 @@ pub struct GridFsUploadOptions { pub metadata: Option, } -/// Contains the options for creating [`GridFsStream`] to retrieve a stored file +/// Contains the options for creating a [`GridFsDownloadStream`] to retrieve a stored file /// from a [`GridFsBucket`]. #[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] #[builder(field_defaults(setter(into)))] From b34e2b534681fbf8436165d10de755a37c49ebd1 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Tue, 2 Aug 2022 12:39:17 -0400 Subject: [PATCH 15/23] change stream id field name to remvoe ambiguity --- src/gridfs.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index d0759f1d8..9d75da6ac 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -54,13 +54,13 @@ pub struct GridFsBucket { // TODO: RUST-1399 Add documentation and example code for this struct. pub struct GridFsUploadStream { - id: Bson, + files_id: Bson, } impl GridFsUploadStream { - /// Gets the stream `id`. - pub fn stream_id(&self) -> &Bson { - &self.id + /// Gets the file `id` for the stream. + pub fn files_id(&self) -> &Bson { + &self.files_id } /// Consumes the stream and uploads data in the stream to the server. @@ -93,13 +93,13 @@ impl tokio::io::AsyncWrite for GridFsUploadStream { } pub struct GridFsDownloadStream { - id: Bson, + files_id: Bson, } impl GridFsDownloadStream { - /// Gets the stream `id`. + /// Gets the file `id` for the stream. pub fn stream_id(&self) -> &Bson { - &self.id + &self.files_id } } From 92b2a538e61278fa41e21d3de72b755270496dde Mon Sep 17 00:00:00 2001 From: sanav33 Date: Tue, 2 Aug 2022 13:02:02 -0400 Subject: [PATCH 16/23] update download stream id field name --- src/gridfs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index 9d75da6ac..bd7c7f7fd 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -98,7 +98,7 @@ pub struct GridFsDownloadStream { impl GridFsDownloadStream { /// Gets the file `id` for the stream. - pub fn stream_id(&self) -> &Bson { + pub fn files_id(&self) -> &Bson { &self.files_id } } From 86ac29d52e25ca40081ca9af632fde20a72e011d Mon Sep 17 00:00:00 2001 From: sanav33 Date: Tue, 2 Aug 2022 15:32:57 -0400 Subject: [PATCH 17/23] store options directly in bucket --- src/db/mod.rs | 22 +++++++++------------- src/gridfs.rs | 26 ++++++++++++++++---------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 5a5f1ec4b..808992b9e 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -574,29 +574,25 @@ impl Database { /// Creates a new [`GridFsBucket`] in the database with the given options. pub fn new_gridfs_bucket( &self, - options: impl Into>, + options: Option, ) -> GridFsBucket { - let options: GridFsBucketOptions = options.into().unwrap_or_default(); - let read_concern = options + let mut options: GridFsBucketOptions = options.unwrap_or_default(); + options.read_concern = options .read_concern .or_else(|| self.read_concern().cloned()); - let write_concern = options + options.write_concern = options .write_concern .or_else(|| self.write_concern().cloned()); - let selection_criteria = options + options.selection_criteria = options .selection_criteria .or_else(|| self.selection_criteria().cloned()); - let bucket_name = options + options.bucket_name = options .bucket_name - .unwrap_or_else(|| DEFAULT_BUCKET_NAME.to_string()); - let chunk_size_bytes = options.chunk_size_bytes.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES); + .or_else(|| Some(DEFAULT_BUCKET_NAME.to_string())); + options.chunk_size_bytes = options.chunk_size_bytes.or(Some(DEFAULT_CHUNK_SIZE_BYTES)); GridFsBucket { db: self.clone(), - bucket_name, - chunk_size_bytes, - read_concern, - write_concern, - selection_criteria, + options, } } } diff --git a/src/gridfs.rs b/src/gridfs.rs index bd7c7f7fd..0f644a2a1 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -44,12 +44,8 @@ pub struct FilesCollectionDocument { /// Struct for storing GridFS managed files within a [`Database`]. pub struct GridFsBucket { // Contains a "chunks" collection - pub(crate) bucket_name: String, pub(crate) db: Database, - pub(crate) chunk_size_bytes: u32, - pub(crate) read_concern: Option, - pub(crate) write_concern: Option, - pub(crate) selection_criteria: Option, + pub(crate) options: GridFsBucketOptions, } // TODO: RUST-1399 Add documentation and example code for this struct. @@ -116,18 +112,28 @@ impl tokio::io::AsyncRead for GridFsDownloadStream { impl GridFsBucket { /// Gets the read concern of the [`GridFsBucket`]. pub fn read_concern(&self) -> Option<&ReadConcern> { - self.read_concern.as_ref() + if self.options.read_concern.is_some() { + self.options.read_concern.as_ref() + } else { + self.db.read_concern() + } } /// Gets the write concern of the [`GridFsBucket`]. pub fn write_concern(&self) -> Option<&WriteConcern> { - self.write_concern.as_ref() - } + if self.options.write_concern.is_some() { + self.options.write_concern.as_ref() + } else { + self.db.write_concern() + } } /// Gets the selection criteria of the [`GridFsBucket`]. pub fn selection_criteria(&self) -> Option<&SelectionCriteria> { - self.selection_criteria.as_ref() - } + if self.options.selection_criteria.is_some() { + self.options.selection_criteria.as_ref() + } else { + self.db.selection_criteria() + } } /// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. /// The application provides a custom file id. From 81672d4140424b97b5f01d99ff0f0d2617906ef2 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Tue, 2 Aug 2022 17:14:32 -0400 Subject: [PATCH 18/23] change helpers for bucket --- src/gridfs.rs | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index 0f644a2a1..c8099cb61 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -112,28 +112,18 @@ impl tokio::io::AsyncRead for GridFsDownloadStream { impl GridFsBucket { /// Gets the read concern of the [`GridFsBucket`]. pub fn read_concern(&self) -> Option<&ReadConcern> { - if self.options.read_concern.is_some() { - self.options.read_concern.as_ref() - } else { - self.db.read_concern() - } + self.options.read_concern.as_ref() } /// Gets the write concern of the [`GridFsBucket`]. pub fn write_concern(&self) -> Option<&WriteConcern> { - if self.options.write_concern.is_some() { - self.options.write_concern.as_ref() - } else { - self.db.write_concern() - } } + self.options.write_concern.as_ref() + } /// Gets the selection criteria of the [`GridFsBucket`]. pub fn selection_criteria(&self) -> Option<&SelectionCriteria> { - if self.options.selection_criteria.is_some() { - self.options.selection_criteria.as_ref() - } else { - self.db.selection_criteria() - } } + self.options.selection_criteria.as_ref() + } /// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. /// The application provides a custom file id. From 9c9423744c527898992684970e9d5f94f69dad00 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Thu, 4 Aug 2022 09:39:19 -0400 Subject: [PATCH 19/23] add futures version to methods --- Cargo.toml | 1 + src/db/mod.rs | 7 ++--- src/gridfs.rs | 62 +++++++++++++++++++++++++++++++++---------- src/gridfs/options.rs | 2 +- 4 files changed, 52 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 073b6a59f..7e7dec0b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ bson = { git = "https://github.com/mongodb/bson-rust", branch = "main" } chrono = "0.4.7" derivative = "2.1.1" flate2 = { version = "1.0", optional = true } +futures = "0.3.14" futures-core = "0.3.14" futures-util = { version = "0.3.14", features = ["io"] } futures-executor = "0.3.14" diff --git a/src/db/mod.rs b/src/db/mod.rs index 808992b9e..4d392ff55 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -572,11 +572,8 @@ impl Database { } /// Creates a new [`GridFsBucket`] in the database with the given options. - pub fn new_gridfs_bucket( - &self, - options: Option, - ) -> GridFsBucket { - let mut options: GridFsBucketOptions = options.unwrap_or_default(); + pub fn gridfs_bucket(&self, options: impl Into>) -> GridFsBucket { + let mut options = options.into().unwrap_or_default(); options.read_concern = options .read_concern .or_else(|| self.read_concern().cloned()); diff --git a/src/gridfs.rs b/src/gridfs.rs index c8099cb61..559d08993 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -1,21 +1,21 @@ pub mod options; use core::task::{Context, Poll}; -use std::{io, pin::Pin}; +use std::pin::Pin; use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, - error::{Error, Result}, + error::Result, selection_criteria::SelectionCriteria, Database, }; - use bson::{oid::ObjectId, Bson, DateTime, Document}; +use futures; use futures_util; use options::*; use serde::{Deserialize, Serialize}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::io::ReadBuf; pub const DEFAULT_BUCKET_NAME: &'static str = "fs"; pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024; @@ -88,6 +88,30 @@ impl tokio::io::AsyncWrite for GridFsUploadStream { } } +impl futures_util::io::AsyncWrite for GridFsUploadStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + todo!() + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } +} + pub struct GridFsDownloadStream { files_id: Bson, } @@ -109,6 +133,16 @@ impl tokio::io::AsyncRead for GridFsDownloadStream { } } +impl futures_util::io::AsyncRead for GridFsDownloadStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + todo!() + } +} + impl GridFsBucket { /// Gets the read concern of the [`GridFsBucket`]. pub fn read_concern(&self) -> Option<&ReadConcern> { @@ -164,8 +198,8 @@ impl GridFsBucket { } /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the - /// `futures` crate's `AsyncRead` trait for the `source`. - pub async fn upload_from_futures_reader_with_id( + /// `futures-0.3` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_futures_0_3_reader_with_id( &self, id: Bson, filename: String, @@ -193,14 +227,14 @@ impl GridFsBucket { } /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for - /// the file id. Uses the `futures` crate's `AsyncRead` trait for the `source`. - pub async fn upload_from_futures_reader( + /// the file id. Uses the `futures-0.3` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_futures_0_3_reader( &self, filename: String, source: impl futures_util::io::AsyncRead, options: impl Into>, ) { - self.upload_from_futures_reader_with_id( + self.upload_from_futures_0_3_reader_with_id( Bson::ObjectId(ObjectId::new()), filename, source, @@ -238,9 +272,9 @@ impl GridFsBucket { } /// Downloads the contents of the stored file specified by `id` and writes - /// the contents to the `destination`. Uses the - /// `futures` crate's `AsyncWrite` trait for the `destination`. - pub async fn download_to_futures_writer( + /// the contents to the `destination`. Uses the `futures-0.3` crate's `AsyncWrite` + /// trait for the `destination`. + pub async fn download_to_futures_0_3_writer( &self, id: Bson, destination: impl futures_util::io::AsyncWrite, @@ -262,8 +296,8 @@ impl GridFsBucket { /// Downloads the contents of the stored file specified by `filename` and by /// the revision in `options` and writes the contents to the `destination`. Uses the - /// `futures` crate's `AsyncWrite` trait for the `destination`. - pub async fn download_to_futures_writer_by_name( + /// `futures-0.3` crate's `AsyncWrite` trait for the `destination`. + pub async fn download_to_futures_0_3_writer_by_name( &self, filename: String, destination: impl futures_util::io::AsyncWrite, diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs index 36c57cedb..cc4a31014 100644 --- a/src/gridfs/options.rs +++ b/src/gridfs/options.rs @@ -79,7 +79,7 @@ pub struct GridFsFindOptions { pub limit: Option, /// The maximum amount of time to allow the query to run. - pub max_time_ms: Option, + pub max_time: Option, /// The number of documents to skip before returning. pub skip: Option, From 4aa6cd9f7a76d3a5e76aa9eca6edbdd873f6abab Mon Sep 17 00:00:00 2001 From: sanav33 Date: Thu, 4 Aug 2022 14:33:37 -0400 Subject: [PATCH 20/23] remove futures-util import from gridfs --- src/gridfs.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index 559d08993..247861028 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -12,7 +12,6 @@ use crate::{ }; use bson::{oid::ObjectId, Bson, DateTime, Document}; use futures; -use futures_util; use options::*; use serde::{Deserialize, Serialize}; use tokio::io::ReadBuf; @@ -88,7 +87,7 @@ impl tokio::io::AsyncWrite for GridFsUploadStream { } } -impl futures_util::io::AsyncWrite for GridFsUploadStream { +impl futures::io::AsyncWrite for GridFsUploadStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -133,7 +132,7 @@ impl tokio::io::AsyncRead for GridFsDownloadStream { } } -impl futures_util::io::AsyncRead for GridFsDownloadStream { +impl futures::io::AsyncRead for GridFsDownloadStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -203,7 +202,7 @@ impl GridFsBucket { &self, id: Bson, filename: String, - source: impl futures_util::io::AsyncRead, + source: impl futures::io::AsyncRead, options: impl Into>, ) { todo!() @@ -231,7 +230,7 @@ impl GridFsBucket { pub async fn upload_from_futures_0_3_reader( &self, filename: String, - source: impl futures_util::io::AsyncRead, + source: impl futures::io::AsyncRead, options: impl Into>, ) { self.upload_from_futures_0_3_reader_with_id( @@ -277,7 +276,7 @@ impl GridFsBucket { pub async fn download_to_futures_0_3_writer( &self, id: Bson, - destination: impl futures_util::io::AsyncWrite, + destination: impl futures::io::AsyncWrite, ) { todo!() } @@ -300,7 +299,7 @@ impl GridFsBucket { pub async fn download_to_futures_0_3_writer_by_name( &self, filename: String, - destination: impl futures_util::io::AsyncWrite, + destination: impl futures::io::AsyncWrite, options: impl Into>, ) { todo!() From fac86a05baf9b80272a78a876065b78ee6279f61 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Fri, 5 Aug 2022 10:19:35 -0400 Subject: [PATCH 21/23] remove futures dependency --- Cargo.toml | 1 - src/gridfs.rs | 14 +++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7e7dec0b0..073b6a59f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,6 @@ bson = { git = "https://github.com/mongodb/bson-rust", branch = "main" } chrono = "0.4.7" derivative = "2.1.1" flate2 = { version = "1.0", optional = true } -futures = "0.3.14" futures-core = "0.3.14" futures-util = { version = "0.3.14", features = ["io"] } futures-executor = "0.3.14" diff --git a/src/gridfs.rs b/src/gridfs.rs index 247861028..676e7fe77 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -11,7 +11,7 @@ use crate::{ Database, }; use bson::{oid::ObjectId, Bson, DateTime, Document}; -use futures; +use futures_util; use options::*; use serde::{Deserialize, Serialize}; use tokio::io::ReadBuf; @@ -87,26 +87,26 @@ impl tokio::io::AsyncWrite for GridFsUploadStream { } } -impl futures::io::AsyncWrite for GridFsUploadStream { +impl futures_util::AsyncWrite for GridFsUploadStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], - ) -> Poll> { + ) -> Poll> { todo!() } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { todo!() } fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { todo!() } } @@ -132,12 +132,12 @@ impl tokio::io::AsyncRead for GridFsDownloadStream { } } -impl futures::io::AsyncRead for GridFsDownloadStream { +impl futures_util::io::AsyncRead for GridFsDownloadStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { todo!() } } From 01238f504c7edb7310e2d133d9557c67b0d71b2f Mon Sep 17 00:00:00 2001 From: sanav33 Date: Fri, 5 Aug 2022 14:53:03 -0400 Subject: [PATCH 22/23] fix destination trait bound --- src/gridfs.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/gridfs.rs b/src/gridfs.rs index 676e7fe77..0ae949137 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -202,7 +202,7 @@ impl GridFsBucket { &self, id: Bson, filename: String, - source: impl futures::io::AsyncRead, + source: impl futures_util::AsyncRead, options: impl Into>, ) { todo!() @@ -230,7 +230,7 @@ impl GridFsBucket { pub async fn upload_from_futures_0_3_reader( &self, filename: String, - source: impl futures::io::AsyncRead, + source: impl futures_util::AsyncRead, options: impl Into>, ) { self.upload_from_futures_0_3_reader_with_id( @@ -276,7 +276,7 @@ impl GridFsBucket { pub async fn download_to_futures_0_3_writer( &self, id: Bson, - destination: impl futures::io::AsyncWrite, + destination: impl futures_util::AsyncWrite, ) { todo!() } @@ -299,7 +299,7 @@ impl GridFsBucket { pub async fn download_to_futures_0_3_writer_by_name( &self, filename: String, - destination: impl futures::io::AsyncWrite, + destination: impl futures_util::AsyncWrite, options: impl Into>, ) { todo!() From 8325459a46d21f383fa16d79ac1fe621427047e4 Mon Sep 17 00:00:00 2001 From: sanav33 Date: Mon, 8 Aug 2022 15:29:54 -0400 Subject: [PATCH 23/23] remove link from doc for bucket constructor --- src/db/mod.rs | 2 +- src/gridfs.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 4d392ff55..71e9a92c8 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -571,7 +571,7 @@ impl Database { .await } - /// Creates a new [`GridFsBucket`] in the database with the given options. + /// Creates a new GridFsBucket in the database with the given options. pub fn gridfs_bucket(&self, options: impl Into>) -> GridFsBucket { let mut options = options.into().unwrap_or_default(); options.read_concern = options diff --git a/src/gridfs.rs b/src/gridfs.rs index 0ae949137..545a95828 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -11,12 +11,11 @@ use crate::{ Database, }; use bson::{oid::ObjectId, Bson, DateTime, Document}; -use futures_util; use options::*; use serde::{Deserialize, Serialize}; use tokio::io::ReadBuf; -pub const DEFAULT_BUCKET_NAME: &'static str = "fs"; +pub const DEFAULT_BUCKET_NAME: &str = "fs"; pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024; // Contained in a "chunks" collection for each user file @@ -47,7 +46,7 @@ pub struct GridFsBucket { pub(crate) options: GridFsBucketOptions, } -// TODO: RUST-1399 Add documentation and example code for this struct. +// TODO: RUST-1395 Add documentation and example code for this struct. pub struct GridFsUploadStream { files_id: Bson, }