From 3ed445ec3c44812b22d7fa9923f7ad7d0781376e Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sat, 3 Aug 2024 08:32:53 +0000 Subject: [PATCH 01/10] chore: init parquet crate --- integrations/parquet/.gitignore | 1 + integrations/parquet/Cargo.toml | 32 ++++++++++++++++++++++++++++++++ integrations/parquet/src/lib.rs | 17 +++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 integrations/parquet/.gitignore create mode 100644 integrations/parquet/Cargo.toml create mode 100644 integrations/parquet/src/lib.rs diff --git a/integrations/parquet/.gitignore b/integrations/parquet/.gitignore new file mode 100644 index 00000000000..03314f77b5a --- /dev/null +++ b/integrations/parquet/.gitignore @@ -0,0 +1 @@ +Cargo.lock diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml new file mode 100644 index 00000000000..a9634a17350 --- /dev/null +++ b/integrations/parquet/Cargo.toml @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +description = "parquet Integration for Apache OpenDAL" +name = "parquet_opendal" + +authors = ["Apache OpenDAL "] +edition = "2021" +homepage = "https://opendal.apache.org/" +license = "Apache-2.0" +repository = "https://github.com/apache/opendal" +rust-version = "1.75" +version = "0.45.0" + +[dependencies] +async-trait = "0.1" +parquet = { version = "52.0", default-features = false, features = ["async"] } diff --git a/integrations/parquet/src/lib.rs b/integrations/parquet/src/lib.rs new file mode 100644 index 00000000000..5cd17fb5a64 --- /dev/null +++ b/integrations/parquet/src/lib.rs @@ -0,0 +1,17 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + From eea7ec6e86ea3fa9f82e888cedd900d2faaedeba Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sat, 3 Aug 2024 09:07:06 +0000 Subject: [PATCH 02/10] feat: implement the `OpendalAsyncWriter` --- integrations/parquet/Cargo.toml | 16 ++- integrations/parquet/src/async_writer.rs | 118 +++++++++++++++++++++++ integrations/parquet/src/lib.rs | 32 ++++++ 3 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 integrations/parquet/src/async_writer.rs diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index a9634a17350..2f5141a21f3 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -29,4 +29,18 @@ version = "0.45.0" [dependencies] async-trait = "0.1" -parquet = { version = "52.0", default-features = false, features = ["async"] } +bytes = "1" +futures = "0.3" +opendal = { version = "0.48.0", path = "../../core" } +parquet = { version = "52.0", default-features = false, features = [ + "async", + "arrow", +] } + +[dev-dependencies] +opendal = { version = "0.48.0", path = "../../core", features = [ + "services-memory", + "services-s3", +] } +rand = "0.8.5" +tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } diff --git a/integrations/parquet/src/async_writer.rs b/integrations/parquet/src/async_writer.rs new file mode 100644 index 00000000000..f2fa9387c40 --- /dev/null +++ b/integrations/parquet/src/async_writer.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; +use parquet::arrow::async_writer::AsyncFileWriter; +use parquet::errors::{ParquetError, Result}; + +use futures::future::BoxFuture; +use opendal::Writer; + +/// OpendalAsyncWriter implements AsyncFileWriter trait by using opendal. +/// +/// ```no_run +/// use parquet::arrow::async_writer::AsyncFileWriter; +/// use parquet::OpendalAsyncWriter; +/// use opendal::services::S3; +/// use opendal::{Builder, Operator}; +/// +/// #[tokio::main] +/// async fn main() { +/// let builder = S3::from_map( +/// vec![ +/// ("access_key".to_string(), "my_access_key".to_string()), +/// ("secret_key".to_string(), "my_secret_key".to_string()), +/// ("endpoint".to_string(), "my_endpoint".to_string()), +/// ("region".to_string(), "my_region".to_string()), +/// ] +/// .into_iter() +/// .collect(), +/// ).unwrap(); +/// +/// // Create a new operator +/// let operator = Operator::new(builder).unwrap().finish(); +/// let path = "/path/to/file.parquet"; +/// // Create a new object store +/// let mut writer = Arc::new(OpendalAsyncWriter::new(operator.writer(path))); +/// } +/// ``` +pub struct OpendalAsyncWriter { + inner: Writer, +} + +impl OpendalAsyncWriter { + /// Create a [`OpendalAsyncWriter`] by given [`Writer`]. + pub fn new(writer: Writer) -> Self { + Self { inner: writer } + } +} + +impl AsyncFileWriter for OpendalAsyncWriter { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.inner + .write(bs) + .await + .map_err(|err| ParquetError::External(Box::new(err))) + }) + } + + fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.inner + .close() + .await + .map_err(|err| ParquetError::External(Box::new(err))) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use opendal::{services, Operator}; + + #[tokio::test] + async fn test_basic() { + let op = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "data/test.txt"; + let mut writer = OpendalAsyncWriter::new(op.writer(path).await.unwrap()); + let bytes = Bytes::from_static(b"hello, world!"); + writer.write(bytes).await.unwrap(); + let bytes = Bytes::from_static(b"hello, OpenDAL!"); + writer.write(bytes).await.unwrap(); + writer.complete().await.unwrap(); + + let bytes = op.read(path).await.unwrap().to_vec(); + assert_eq!(bytes, b"hello, world!hello, OpenDAL!"); + } + + #[tokio::test] + async fn test_abort() { + let op = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "data/test.txt"; + let mut writer = OpendalAsyncWriter::new(op.writer(path).await.unwrap()); + let bytes = Bytes::from_static(b"hello, world!"); + writer.write(bytes).await.unwrap(); + let bytes = Bytes::from_static(b"hello, OpenDAL!"); + writer.write(bytes).await.unwrap(); + drop(writer); + + let exist = op.is_exist(path).await.unwrap(); + assert!(!exist); + } +} diff --git a/integrations/parquet/src/lib.rs b/integrations/parquet/src/lib.rs index 5cd17fb5a64..945cc1375d8 100644 --- a/integrations/parquet/src/lib.rs +++ b/integrations/parquet/src/lib.rs @@ -15,3 +15,35 @@ // specific language governing permissions and limitations // under the License. +//! parquet_opendal provides parquet IO utils. +//! +//! ```no_run +//! use parquet::arrow::async_writer::AsyncFileWriter; +//! use parquet::OpendalAsyncWriter; +//! use opendal::services::S3; +//! use opendal::{Builder, Operator}; +//! +//! #[tokio::main] +//! async fn main() { +//! let builder = S3::from_map( +//! vec![ +//! ("access_key".to_string(), "my_access_key".to_string()), +//! ("secret_key".to_string(), "my_secret_key".to_string()), +//! ("endpoint".to_string(), "my_endpoint".to_string()), +//! ("region".to_string(), "my_region".to_string()), +//! ] +//! .into_iter() +//! .collect(), +//! ).unwrap(); +//! +//! // Create a new operator +//! let operator = Operator::new(builder).unwrap().finish(); +//! let path = "/path/to/file.parquet"; +//! // Create a new object store +//! let mut writer = Arc::new(OpendalAsyncWriter::new(operator.writer(path))); +//! } +//! ``` + +mod async_writer; + +pub use async_writer::OpendalAsyncWriter; From aa461080f8fa59e48f93298724c5be07c62b8805 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 14:40:37 +0000 Subject: [PATCH 03/10] chore: apply suggestions from CR --- integrations/parquet/Cargo.toml | 12 +++- integrations/parquet/examples/async_writer.rs | 45 +++++++++++++ integrations/parquet/src/async_writer.rs | 67 ++++++++++++------- integrations/parquet/src/lib.rs | 59 +++++++++++----- 4 files changed, 141 insertions(+), 42 deletions(-) create mode 100644 integrations/parquet/examples/async_writer.rs diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index 2f5141a21f3..7171060b07f 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -25,7 +25,11 @@ homepage = "https://opendal.apache.org/" license = "Apache-2.0" repository = "https://github.com/apache/opendal" rust-version = "1.75" -version = "0.45.0" +version = "0.0.1" + +[features] +default = ["arrow"] +arrow = ["dep:arrow"] [dependencies] async-trait = "0.1" @@ -36,6 +40,7 @@ parquet = { version = "52.0", default-features = false, features = [ "async", "arrow", ] } +arrow = { version = "52.0", optional = true } [dev-dependencies] opendal = { version = "0.48.0", path = "../../core", features = [ @@ -44,3 +49,8 @@ opendal = { version = "0.48.0", path = "../../core", features = [ ] } rand = "0.8.5" tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } + +[[example]] +name = "async_writer" +path = "examples/async_writer.rs" +required-features = ["arrow"] diff --git a/integrations/parquet/examples/async_writer.rs b/integrations/parquet/examples/async_writer.rs new file mode 100644 index 00000000000..9f16f69eac5 --- /dev/null +++ b/integrations/parquet/examples/async_writer.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array, RecordBatch}; + +use opendal::{services::S3Config, Operator}; +use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +use parquet_opendal::AsyncWriter; + +#[tokio::main] +async fn main() { + let mut cfg = S3Config::default(); + cfg.access_key_id = Some("my_access_key".to_string()); + cfg.secret_access_key = Some("my_secret_key".to_string()); + cfg.endpoint = Some("my_endpoint".to_string()); + cfg.region = Some("my_region".to_string()); + cfg.bucket = "my_bucket".to_string(); + + // Create a new operator + let operator = Operator::from_config(cfg).unwrap().finish(); + let path = "/path/to/file.parquet"; + + // Create an async writer + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let buffer = operator.read(path).await.unwrap().to_bytes(); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) + .unwrap() + .build() + .unwrap(); + let read = reader.next().unwrap().unwrap(); + assert_eq!(to_write, read); +} diff --git a/integrations/parquet/src/async_writer.rs b/integrations/parquet/src/async_writer.rs index f2fa9387c40..adc8b53b31e 100644 --- a/integrations/parquet/src/async_writer.rs +++ b/integrations/parquet/src/async_writer.rs @@ -22,46 +22,67 @@ use parquet::errors::{ParquetError, Result}; use futures::future::BoxFuture; use opendal::Writer; -/// OpendalAsyncWriter implements AsyncFileWriter trait by using opendal. +/// AsyncWriter implements AsyncFileWriter trait by using opendal. /// /// ```no_run -/// use parquet::arrow::async_writer::AsyncFileWriter; -/// use parquet::OpendalAsyncWriter; -/// use opendal::services::S3; -/// use opendal::{Builder, Operator}; +/// use std::sync::Arc; +/// +/// use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +/// +/// use opendal::{services::S3Config, Operator}; +/// use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +/// use parquet_opendal::AsyncWriter; /// /// #[tokio::main] /// async fn main() { -/// let builder = S3::from_map( -/// vec![ -/// ("access_key".to_string(), "my_access_key".to_string()), -/// ("secret_key".to_string(), "my_secret_key".to_string()), -/// ("endpoint".to_string(), "my_endpoint".to_string()), -/// ("region".to_string(), "my_region".to_string()), -/// ] -/// .into_iter() -/// .collect(), -/// ).unwrap(); +/// let mut cfg = S3Config::default(); +/// cfg.access_key_id = Some("my_access_key".to_string()); +/// cfg.secret_access_key = Some("my_secret_key".to_string()); +/// cfg.endpoint = Some("my_endpoint".to_string()); +/// cfg.region = Some("my_region".to_string()); +/// cfg.bucket = "my_bucket".to_string(); /// /// // Create a new operator -/// let operator = Operator::new(builder).unwrap().finish(); +/// let operator = Operator::from_config(cfg).unwrap().finish(); /// let path = "/path/to/file.parquet"; -/// // Create a new object store -/// let mut writer = Arc::new(OpendalAsyncWriter::new(operator.writer(path))); +/// +/// // Create an async writer +/// let writer = AsyncWriter::new( +/// operator +/// .writer_with(path) +/// .chunk(32 * 1024 * 1024) +/// .concurrent(8) +/// .await +/// .unwrap(), +/// ); +/// +/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; +/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); +/// let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); +/// writer.write(&to_write).await.unwrap(); +/// writer.close().await.unwrap(); +/// +/// let buffer = operator.read(path).await.unwrap().to_bytes(); +/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) +/// .unwrap() +/// .build() +/// .unwrap(); +/// let read = reader.next().unwrap().unwrap(); +/// assert_eq!(to_write, read); /// } /// ``` -pub struct OpendalAsyncWriter { +pub struct AsyncWriter { inner: Writer, } -impl OpendalAsyncWriter { +impl AsyncWriter { /// Create a [`OpendalAsyncWriter`] by given [`Writer`]. pub fn new(writer: Writer) -> Self { Self { inner: writer } } } -impl AsyncFileWriter for OpendalAsyncWriter { +impl AsyncFileWriter for AsyncWriter { fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { Box::pin(async move { self.inner @@ -90,7 +111,7 @@ mod tests { async fn test_basic() { let op = Operator::new(services::Memory::default()).unwrap().finish(); let path = "data/test.txt"; - let mut writer = OpendalAsyncWriter::new(op.writer(path).await.unwrap()); + let mut writer = AsyncWriter::new(op.writer(path).await.unwrap()); let bytes = Bytes::from_static(b"hello, world!"); writer.write(bytes).await.unwrap(); let bytes = Bytes::from_static(b"hello, OpenDAL!"); @@ -105,7 +126,7 @@ mod tests { async fn test_abort() { let op = Operator::new(services::Memory::default()).unwrap().finish(); let path = "data/test.txt"; - let mut writer = OpendalAsyncWriter::new(op.writer(path).await.unwrap()); + let mut writer = AsyncWriter::new(op.writer(path).await.unwrap()); let bytes = Bytes::from_static(b"hello, world!"); writer.write(bytes).await.unwrap(); let bytes = Bytes::from_static(b"hello, OpenDAL!"); diff --git a/integrations/parquet/src/lib.rs b/integrations/parquet/src/lib.rs index 945cc1375d8..ded082d8237 100644 --- a/integrations/parquet/src/lib.rs +++ b/integrations/parquet/src/lib.rs @@ -17,33 +17,56 @@ //! parquet_opendal provides parquet IO utils. //! +//! AsyncWriter implements AsyncFileWriter trait by using opendal. +//! //! ```no_run -//! use parquet::arrow::async_writer::AsyncFileWriter; -//! use parquet::OpendalAsyncWriter; -//! use opendal::services::S3; -//! use opendal::{Builder, Operator}; +//! use std::sync::Arc; +//! +//! use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +//! +//! use opendal::{services::S3Config, Operator}; +//! use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +//! use parquet_opendal::AsyncWriter; //! //! #[tokio::main] //! async fn main() { -//! let builder = S3::from_map( -//! vec![ -//! ("access_key".to_string(), "my_access_key".to_string()), -//! ("secret_key".to_string(), "my_secret_key".to_string()), -//! ("endpoint".to_string(), "my_endpoint".to_string()), -//! ("region".to_string(), "my_region".to_string()), -//! ] -//! .into_iter() -//! .collect(), -//! ).unwrap(); +//! let mut cfg = S3Config::default(); +//! cfg.access_key_id = Some("my_access_key".to_string()); +//! cfg.secret_access_key = Some("my_secret_key".to_string()); +//! cfg.endpoint = Some("my_endpoint".to_string()); +//! cfg.region = Some("my_region".to_string()); +//! cfg.bucket = "my_bucket".to_string(); //! //! // Create a new operator -//! let operator = Operator::new(builder).unwrap().finish(); +//! let operator = Operator::from_config(cfg).unwrap().finish(); //! let path = "/path/to/file.parquet"; -//! // Create a new object store -//! let mut writer = Arc::new(OpendalAsyncWriter::new(operator.writer(path))); +//! +//! // Create an async writer +//! let writer = AsyncWriter::new( +//! operator +//! .writer_with(path) +//! .chunk(32 * 1024 * 1024) +//! .concurrent(8) +//! .await +//! .unwrap(), +//! ); +//! +//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; +//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); +//! let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); +//! writer.write(&to_write).await.unwrap(); +//! writer.close().await.unwrap(); +//! +//! let buffer = operator.read(path).await.unwrap().to_bytes(); +//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) +//! .unwrap() +//! .build() +//! .unwrap(); +//! let read = reader.next().unwrap().unwrap(); +//! assert_eq!(to_write, read); //! } //! ``` mod async_writer; -pub use async_writer::OpendalAsyncWriter; +pub use async_writer::AsyncWriter; From 927641f2c10f3819528b91a592b723b3b0fafbed Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 15:58:01 +0000 Subject: [PATCH 04/10] chore: remove arrow dep from default --- integrations/parquet/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index 7171060b07f..9ca14a7544f 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -28,7 +28,7 @@ rust-version = "1.75" version = "0.0.1" [features] -default = ["arrow"] +default = [] arrow = ["dep:arrow"] [dependencies] From 45a1f1d60c1e1055b7607567fdb95e8cee5915c9 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 15:59:29 +0000 Subject: [PATCH 05/10] chore(ci): add ci for opendal_parquet --- .github/workflows/ci_integration_parquet.yml | 47 ++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 .github/workflows/ci_integration_parquet.yml diff --git a/.github/workflows/ci_integration_parquet.yml b/.github/workflows/ci_integration_parquet.yml new file mode 100644 index 00000000000..78d8ce996d2 --- /dev/null +++ b/.github/workflows/ci_integration_parquet.yml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Integration Object Store CI + +on: + push: + branches: + - main + pull_request: + branches: + - main + paths: + - "integrations/parquet/**" + - "core/**" + - ".github/workflows/ci_integration_parquet.yml" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + check_clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: ./.github/actions/setup + + - name: Cargo clippy + working-directory: integrations/parquet + run: cargo clippy --all-targets --all-features -- -D warnings From aff70da1147c75a6fb1e31dabf1c575b18d421ac Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 16:03:08 +0000 Subject: [PATCH 06/10] test: add test for async writer --- integrations/parquet/Cargo.toml | 2 ++ integrations/parquet/src/async_writer.rs | 33 ++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index 9ca14a7544f..7c4e4fa89ce 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -49,6 +49,8 @@ opendal = { version = "0.48.0", path = "../../core", features = [ ] } rand = "0.8.5" tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } +arrow = { version = "52.0" } + [[example]] name = "async_writer" diff --git a/integrations/parquet/src/async_writer.rs b/integrations/parquet/src/async_writer.rs index adc8b53b31e..027c9214c05 100644 --- a/integrations/parquet/src/async_writer.rs +++ b/integrations/parquet/src/async_writer.rs @@ -104,8 +104,12 @@ impl AsyncFileWriter for AsyncWriter { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; + use arrow::array::{ArrayRef, Int64Array, RecordBatch}; use opendal::{services, Operator}; + use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; #[tokio::test] async fn test_basic() { @@ -136,4 +140,33 @@ mod tests { let exist = op.is_exist(path).await.unwrap(); assert!(!exist); } + + #[tokio::test] + async fn test_async_writer() { + let operator = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "/path/to/file.parquet"; + + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let buffer = operator.read(path).await.unwrap().to_bytes(); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) + .unwrap() + .build() + .unwrap(); + let read = reader.next().unwrap().unwrap(); + assert_eq!(to_write, read); + } } From f6a3a5fc42d57ef7502efb64de3294d802f1c0cf Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 16:04:22 +0000 Subject: [PATCH 07/10] chore: remove arrow dep --- integrations/parquet/Cargo.toml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index 7c4e4fa89ce..1efe19b75a8 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -27,10 +27,6 @@ repository = "https://github.com/apache/opendal" rust-version = "1.75" version = "0.0.1" -[features] -default = [] -arrow = ["dep:arrow"] - [dependencies] async-trait = "0.1" bytes = "1" @@ -40,7 +36,6 @@ parquet = { version = "52.0", default-features = false, features = [ "async", "arrow", ] } -arrow = { version = "52.0", optional = true } [dev-dependencies] opendal = { version = "0.48.0", path = "../../core", features = [ @@ -55,4 +50,3 @@ arrow = { version = "52.0" } [[example]] name = "async_writer" path = "examples/async_writer.rs" -required-features = ["arrow"] From c2de994a5afd3e9cc1582537a1fc058b450c983e Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 16:08:10 +0000 Subject: [PATCH 08/10] chore(ci): add doc test --- .github/workflows/docs.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index fbf561af63c..7b9d9e0dbaa 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -406,6 +406,29 @@ jobs: name: virtiofs-opendal-docs path: ./integrations/virtiofs/target/doc + build-parquet-opendal-doc: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: ./.github/actions/setup + + - name: Setup Rust Nightly + run: | + rustup toolchain install ${{ env.RUST_DOC_TOOLCHAIN }} + + - name: Build parquet-opendal doc + working-directory: "integrations/parquet" + run: cargo +${{ env.RUST_DOC_TOOLCHAIN }} doc --lib --no-deps --all-features + + - name: Upload docs + uses: actions/upload-artifact@v3 + with: + name: object-parquet-docs + path: ./integrations/parquet/target/doc + build-website: runs-on: ubuntu-latest needs: @@ -423,6 +446,7 @@ jobs: - build-fuse3-opendal-doc - build-unftp-sbe-opendal-doc - build-virtiofs-opendal-doc + - build-parquet-opendal-doc steps: - uses: actions/checkout@v4 From 4beb294412ea7f73aa3877a42c116944ca4567ef Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 6 Aug 2024 00:10:55 +0800 Subject: [PATCH 09/10] Update .github/workflows/ci_integration_parquet.yml --- .github/workflows/ci_integration_parquet.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci_integration_parquet.yml b/.github/workflows/ci_integration_parquet.yml index 78d8ce996d2..1c32c7aec26 100644 --- a/.github/workflows/ci_integration_parquet.yml +++ b/.github/workflows/ci_integration_parquet.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: Integration Object Store CI +name: Integration Parquet CI on: push: From 2a73d954706039ff48057391ea386e68cf4d58f5 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 16:11:46 +0000 Subject: [PATCH 10/10] chore(ci): run cargo test --- .github/workflows/ci_integration_parquet.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/ci_integration_parquet.yml b/.github/workflows/ci_integration_parquet.yml index 1c32c7aec26..421faa418d5 100644 --- a/.github/workflows/ci_integration_parquet.yml +++ b/.github/workflows/ci_integration_parquet.yml @@ -45,3 +45,7 @@ jobs: - name: Cargo clippy working-directory: integrations/parquet run: cargo clippy --all-targets --all-features -- -D warnings + + - name: Cargo test + working-directory: integrations/parquet + run: cargo test