From 6c3d08c218027ac2ae5a34a758755b4bffe23b0b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 14:40:37 +0000 Subject: [PATCH] chore: apply suggestions from CR --- integrations/parquet/Cargo.toml | 12 +++- integrations/parquet/examples/async_writer.rs | 45 ++++++++++++ integrations/parquet/src/async_writer.rs | 71 ++++++++++++------- integrations/parquet/src/lib.rs | 63 ++++++++++------ 4 files changed, 145 insertions(+), 46 deletions(-) create mode 100644 integrations/parquet/examples/async_writer.rs diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index 2f5141a21f30..7171060b07f9 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -25,7 +25,11 @@ homepage = "https://opendal.apache.org/" license = "Apache-2.0" repository = "https://github.com/apache/opendal" rust-version = "1.75" -version = "0.45.0" +version = "0.0.1" + +[features] +default = ["arrow"] +arrow = ["dep:arrow"] [dependencies] async-trait = "0.1" @@ -36,6 +40,7 @@ parquet = { version = "52.0", default-features = false, features = [ "async", "arrow", ] } +arrow = { version = "52.0", optional = true } [dev-dependencies] opendal = { version = "0.48.0", path = "../../core", features = [ @@ -44,3 +49,8 @@ opendal = { version = "0.48.0", path = "../../core", features = [ ] } rand = "0.8.5" tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } + +[[example]] +name = "async_writer" +path = "examples/async_writer.rs" +required-features = ["arrow"] diff --git a/integrations/parquet/examples/async_writer.rs b/integrations/parquet/examples/async_writer.rs new file mode 100644 index 000000000000..9f16f69eac51 --- /dev/null +++ b/integrations/parquet/examples/async_writer.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array, RecordBatch}; + +use opendal::{services::S3Config, Operator}; +use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +use parquet_opendal::AsyncWriter; + +#[tokio::main] +async fn main() { + let mut cfg = S3Config::default(); + cfg.access_key_id = Some("my_access_key".to_string()); + cfg.secret_access_key = Some("my_secret_key".to_string()); + cfg.endpoint = Some("my_endpoint".to_string()); + cfg.region = Some("my_region".to_string()); + cfg.bucket = "my_bucket".to_string(); + + // Create a new operator + let operator = Operator::from_config(cfg).unwrap().finish(); + let path = "/path/to/file.parquet"; + + // Create an async writer + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let buffer = operator.read(path).await.unwrap().to_bytes(); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) + .unwrap() + .build() + .unwrap(); + let read = reader.next().unwrap().unwrap(); + assert_eq!(to_write, read); +} diff --git a/integrations/parquet/src/async_writer.rs b/integrations/parquet/src/async_writer.rs index f2fa9387c400..4d0bc7b82a6a 100644 --- a/integrations/parquet/src/async_writer.rs +++ b/integrations/parquet/src/async_writer.rs @@ -22,46 +22,67 @@ use parquet::errors::{ParquetError, Result}; use futures::future::BoxFuture; use opendal::Writer; -/// OpendalAsyncWriter implements AsyncFileWriter trait by using opendal. +/// AsyncWriter implements AsyncFileWriter trait by using opendal. /// /// ```no_run -/// use parquet::arrow::async_writer::AsyncFileWriter; -/// use parquet::OpendalAsyncWriter; -/// use opendal::services::S3; -/// use opendal::{Builder, Operator}; -/// +/// use std::sync::Arc; +/// +/// use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +/// +/// use opendal::{services::S3Config, Operator}; +/// use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +/// use parquet_opendal::AsyncWriter; +/// /// #[tokio::main] /// async fn main() { -/// let builder = S3::from_map( -/// vec![ -/// ("access_key".to_string(), "my_access_key".to_string()), -/// ("secret_key".to_string(), "my_secret_key".to_string()), -/// ("endpoint".to_string(), "my_endpoint".to_string()), -/// ("region".to_string(), "my_region".to_string()), -/// ] -/// .into_iter() -/// .collect(), -/// ).unwrap(); -/// +/// let mut cfg = S3Config::default(); +/// cfg.access_key_id = Some("my_access_key".to_string()); +/// cfg.secret_access_key = Some("my_secret_key".to_string()); +/// cfg.endpoint = Some("my_endpoint".to_string()); +/// cfg.region = Some("my_region".to_string()); +/// cfg.bucket = "my_bucket".to_string(); +/// /// // Create a new operator -/// let operator = Operator::new(builder).unwrap().finish(); +/// let operator = Operator::from_config(cfg).unwrap().finish(); /// let path = "/path/to/file.parquet"; -/// // Create a new object store -/// let mut writer = Arc::new(OpendalAsyncWriter::new(operator.writer(path))); +/// +/// // Create an async writer +/// let writer = AsyncWriter::new( +/// operator +/// .writer_with(path) +/// .chunk(32 * 1024 * 1024) +/// .concurrent(8) +/// .await +/// .unwrap(), +/// ); +/// +/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; +/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); +/// let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); +/// writer.write(&to_write).await.unwrap(); +/// writer.close().await.unwrap(); +/// +/// let buffer = operator.read(path).await.unwrap().to_bytes(); +/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) +/// .unwrap() +/// .build() +/// .unwrap(); +/// let read = reader.next().unwrap().unwrap(); +/// assert_eq!(to_write, read); /// } /// ``` -pub struct OpendalAsyncWriter { +pub struct AsyncWriter { inner: Writer, } -impl OpendalAsyncWriter { +impl AsyncWriter { /// Create a [`OpendalAsyncWriter`] by given [`Writer`]. pub fn new(writer: Writer) -> Self { Self { inner: writer } } } -impl AsyncFileWriter for OpendalAsyncWriter { +impl AsyncFileWriter for AsyncWriter { fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { Box::pin(async move { self.inner @@ -90,7 +111,7 @@ mod tests { async fn test_basic() { let op = Operator::new(services::Memory::default()).unwrap().finish(); let path = "data/test.txt"; - let mut writer = OpendalAsyncWriter::new(op.writer(path).await.unwrap()); + let mut writer = AsyncWriter::new(op.writer(path).await.unwrap()); let bytes = Bytes::from_static(b"hello, world!"); writer.write(bytes).await.unwrap(); let bytes = Bytes::from_static(b"hello, OpenDAL!"); @@ -105,7 +126,7 @@ mod tests { async fn test_abort() { let op = Operator::new(services::Memory::default()).unwrap().finish(); let path = "data/test.txt"; - let mut writer = OpendalAsyncWriter::new(op.writer(path).await.unwrap()); + let mut writer = AsyncWriter::new(op.writer(path).await.unwrap()); let bytes = Bytes::from_static(b"hello, world!"); writer.write(bytes).await.unwrap(); let bytes = Bytes::from_static(b"hello, OpenDAL!"); diff --git a/integrations/parquet/src/lib.rs b/integrations/parquet/src/lib.rs index 945cc1375d8d..87d95539949f 100644 --- a/integrations/parquet/src/lib.rs +++ b/integrations/parquet/src/lib.rs @@ -17,33 +17,56 @@ //! parquet_opendal provides parquet IO utils. //! -//! ```no_run -//! use parquet::arrow::async_writer::AsyncFileWriter; -//! use parquet::OpendalAsyncWriter; -//! use opendal::services::S3; -//! use opendal::{Builder, Operator}; +//! AsyncWriter implements AsyncFileWriter trait by using opendal. //! +//! ```no_run +//! use std::sync::Arc; +//! +//! use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +//! +//! use opendal::{services::S3Config, Operator}; +//! use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +//! use parquet_opendal::AsyncWriter; +//! //! #[tokio::main] //! async fn main() { -//! let builder = S3::from_map( -//! vec![ -//! ("access_key".to_string(), "my_access_key".to_string()), -//! ("secret_key".to_string(), "my_secret_key".to_string()), -//! ("endpoint".to_string(), "my_endpoint".to_string()), -//! ("region".to_string(), "my_region".to_string()), -//! ] -//! .into_iter() -//! .collect(), -//! ).unwrap(); -//! +//! let mut cfg = S3Config::default(); +//! cfg.access_key_id = Some("my_access_key".to_string()); +//! cfg.secret_access_key = Some("my_secret_key".to_string()); +//! cfg.endpoint = Some("my_endpoint".to_string()); +//! cfg.region = Some("my_region".to_string()); +//! cfg.bucket = "my_bucket".to_string(); +//! //! // Create a new operator -//! let operator = Operator::new(builder).unwrap().finish(); +//! let operator = Operator::from_config(cfg).unwrap().finish(); //! let path = "/path/to/file.parquet"; -//! // Create a new object store -//! let mut writer = Arc::new(OpendalAsyncWriter::new(operator.writer(path))); +//! +//! // Create an async writer +//! let writer = AsyncWriter::new( +//! operator +//! .writer_with(path) +//! .chunk(32 * 1024 * 1024) +//! .concurrent(8) +//! .await +//! .unwrap(), +//! ); +//! +//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; +//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); +//! let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); +//! writer.write(&to_write).await.unwrap(); +//! writer.close().await.unwrap(); +//! +//! let buffer = operator.read(path).await.unwrap().to_bytes(); +//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) +//! .unwrap() +//! .build() +//! .unwrap(); +//! let read = reader.next().unwrap().unwrap(); +//! assert_eq!(to_write, read); //! } //! ``` mod async_writer; -pub use async_writer::OpendalAsyncWriter; +pub use async_writer::AsyncWriter;