From 6abab333c48adf7c3d730e205a2245d7ba146054 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Fri, 2 Aug 2024 18:58:46 +0100 Subject: [PATCH 01/18] chore: include opendal/services-gcs --- crates/iceberg/Cargo.toml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index de5b7cdc5..a22cac624 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,12 +29,13 @@ license = { workspace = true } keywords = ["iceberg"] [features] -default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] -storage-all = ["storage-memory", "storage-fs", "storage-s3"] +default = ["storage-memory", "storage-fs", "storage-s3", "tokio", "storage-gcs"] +storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] +storage-gcs = ["opendal/services-gcs"] async-std = ["dep:async-std"] tokio = ["dep:tokio"] From 4b86f549734f2653bd392fba95bab4761392a5d7 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Fri, 2 Aug 2024 19:02:59 +0100 Subject: [PATCH 02/18] feat: basic gcs scaffolding --- crates/iceberg/src/io/mod.rs | 4 +++ crates/iceberg/src/io/storage.rs | 15 ++++++++- crates/iceberg/src/io/storage_gcs.rs | 50 ++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/io/storage_gcs.rs diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 12ad8097e..3260b48a0 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -82,3 +82,7 @@ pub use storage_s3::*; mod storage_fs; #[cfg(feature = "storage-fs")] use storage_fs::*; +#[cfg(feature = "storage-gcs")] +mod storage_gcs; +//#[cfg(feature = "storage-gcs")] +//use storage_s3::*; diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 870e61ec6..f2f03603b 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -17,6 +17,8 @@ use std::sync::Arc; +#[cfg(feature = "storage-gcs")] +use opendal::services::GcsConfig; #[cfg(feature = "storage-s3")] use opendal::services::S3Config; use opendal::{Operator, Scheme}; @@ -38,6 +40,8 @@ pub(crate) enum Storage { scheme_str: String, config: Arc, }, + #[cfg(feature = "storage-gcs")] + Gcs { config: Arc }, } impl Storage { @@ -56,6 +60,8 @@ impl Storage { scheme_str, config: super::s3_config_parse(props)?.into(), }), + #[cfg(feature = "storage-gcs")] + Scheme::Gcs => Ok(Self::Gcs { config: todo!() }), _ => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Constructing file io from scheme: {scheme} not supported now",), @@ -117,7 +123,13 @@ impl Storage { )) } } - #[cfg(all(not(feature = "storage-s3"), not(feature = "storage-fs")))] + #[cfg(feature = "storage-gcs")] + Storage::Gcs { config: _ } => todo!(), + #[cfg(all( + not(feature = "storage-s3"), + not(feature = "storage-fs"), + not(feature = "storage-gcs") + ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, "No storage service has been enabled", @@ -131,6 +143,7 @@ impl Storage { "memory" => Ok(Scheme::Memory), "file" | "" => Ok(Scheme::Fs), "s3" | "s3a" => Ok(Scheme::S3), + "gs" => Ok(Scheme::Gcs), s => Ok(s.parse::()?), } } diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs new file mode 100644 index 000000000..d281b2b4e --- /dev/null +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use opendal::services::GcsConfig; +use opendal::Operator; +use url::Url; + +use crate::{Error, ErrorKind, Result}; + +// [Google Cloud Storage](https://py.iceberg.apache.org/configuration/#google-cloud-storage) +// configuration parameters + +/// Google Project ID +pub const GOOGLE_PROJECT_ID: &str = "gcs.project-id"; +/// Google Cloud OAuth token +pub const GCS_ENDPOINT: &str = "gcs.endpoint"; +pub const GCS_OAUTH: &str = "gcs.oauth2.token"; +/// Google Cloud default location +pub const GCS_DEFAULT_LOCATION: &str = "gcs.default-location"; + +/// Parse iceberg properties to Google Cloud config. +pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result { + let mut cfg = GcsConfig::default(); + if let Some(endpoint) = m.remove(GCS_ENDPOINT) { + cfg.endpoint = Some(endpoint); + }; + + Ok(cfg) +} + +/// Build a new OpenDAL [`Operator`] based on a known [`GcsConfig`]. +pub(crate) fn gcs_config_build(_cfg: &GcsConfig) -> Result { + todo!(); +} From 10a124d00a4684ab942f3272a42cf2ee2e9ea21d Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Fri, 2 Aug 2024 22:37:12 +0100 Subject: [PATCH 03/18] feat: populate config parse with basic details --- crates/iceberg/src/io/mod.rs | 4 ++-- crates/iceberg/src/io/storage.rs | 17 +++++++++++-- crates/iceberg/src/io/storage_gcs.rs | 36 +++++++++++++++++++--------- 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 3260b48a0..b8b7e2718 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -84,5 +84,5 @@ mod storage_fs; use storage_fs::*; #[cfg(feature = "storage-gcs")] mod storage_gcs; -//#[cfg(feature = "storage-gcs")] -//use storage_s3::*; +#[cfg(feature = "storage-gcs")] +pub use storage_gcs::*; diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index f2f03603b..b894a9f3f 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -61,7 +61,9 @@ impl Storage { config: super::s3_config_parse(props)?.into(), }), #[cfg(feature = "storage-gcs")] - Scheme::Gcs => Ok(Self::Gcs { config: todo!() }), + Scheme::Gcs => Ok(Self::Gcs { + config: super::gcs_config_parse(props)?.into(), + }), _ => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Constructing file io from scheme: {scheme} not supported now",), @@ -124,7 +126,18 @@ impl Storage { } } #[cfg(feature = "storage-gcs")] - Storage::Gcs { config: _ } => todo!(), + Storage::Gcs { config } => { + let operator = super::gcs_config_build(config)?; + let prefix = format!("gs://{}/", operator.info().name()); + if path.starts_with(&prefix) { + Ok((operator, &path[prefix.len()..])) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {}, should start with {}", path, prefix), + )) + } + } #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index d281b2b4e..7aa956e0d 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -14,37 +14,51 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +//! Google Cloud Storage properties use std::collections::HashMap; use opendal::services::GcsConfig; use opendal::Operator; -use url::Url; use crate::{Error, ErrorKind, Result}; -// [Google Cloud Storage](https://py.iceberg.apache.org/configuration/#google-cloud-storage) -// configuration parameters +// Reference: https://github.com/apache/iceberg/blob/main/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java +/// Google Cloud Storage bucket name +pub const GCS_BUCKET: &str = "gcs.bucket"; /// Google Project ID -pub const GOOGLE_PROJECT_ID: &str = "gcs.project-id"; -/// Google Cloud OAuth token +pub const GCS_PROJECT_ID: &str = "gcs.project-id"; +/// Google Cloud Storage endpoint pub const GCS_ENDPOINT: &str = "gcs.endpoint"; -pub const GCS_OAUTH: &str = "gcs.oauth2.token"; -/// Google Cloud default location -pub const GCS_DEFAULT_LOCATION: &str = "gcs.default-location"; +/// Google Cloud Storage OAuth token +pub const GCS_OAUTH2_TOKEN: &str = "gcs.oauth2.token"; +/// Google Cloud Storage working (root) directory +pub const GCS_ROOT: &str = "gcs.root"; -/// Parse iceberg properties to Google Cloud config. +/// Parse iceberg properties to [`GcsConfig`]. pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result { let mut cfg = GcsConfig::default(); if let Some(endpoint) = m.remove(GCS_ENDPOINT) { cfg.endpoint = Some(endpoint); }; + if let Some(bucket) = m.remove(GCS_BUCKET) { + cfg.bucket = bucket; + } + + if let Some(root) = m.remove(GCS_ROOT) { + cfg.root = Some(root) + } + + if let Some(credential) = m.remove(GCS_OAUTH2_TOKEN) { + cfg.credential = Some(credential) + } + Ok(cfg) } /// Build a new OpenDAL [`Operator`] based on a known [`GcsConfig`]. -pub(crate) fn gcs_config_build(_cfg: &GcsConfig) -> Result { - todo!(); +pub(crate) fn gcs_config_build(cfg: &GcsConfig) -> Result { + Ok(Operator::from_config(cfg.clone())?.finish()) } From d5a009e53633e3954e6e9149949dc7afa7ce8735 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Fri, 2 Aug 2024 22:37:49 +0100 Subject: [PATCH 04/18] feat: include docker-compose integration tests --- .../testdata/file_io_gcs/docker-compose.yaml | 24 +++++++ crates/iceberg/tests/file_io_gcs_test.rs | 69 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 crates/iceberg/testdata/file_io_gcs/docker-compose.yaml create mode 100644 crates/iceberg/tests/file_io_gcs_test.rs diff --git a/crates/iceberg/testdata/file_io_gcs/docker-compose.yaml b/crates/iceberg/testdata/file_io_gcs/docker-compose.yaml new file mode 100644 index 000000000..a71bcbe45 --- /dev/null +++ b/crates/iceberg/testdata/file_io_gcs/docker-compose.yaml @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + gcs-server: + # v1.49 + image: fsouza/fake-gcs-server@sha256:36b0116fae5236e8def76ccb07761a9ca323e476f366a5f4bf449cac19deaf2d + expose: + - 4443 + #environment: diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/iceberg/tests/file_io_gcs_test.rs new file mode 100644 index 000000000..be485a9b8 --- /dev/null +++ b/crates/iceberg/tests/file_io_gcs_test.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for FileIO Google Cloud Storage (GCS). + +use std::net::SocketAddr; +use std::sync::RwLock; + +use ctor::{ctor, dtor}; +use iceberg::io::{FileIO, FileIOBuilder, GCS_BUCKET, GCS_ENDPOINT}; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; + +const FAKE_GCS_PORT: u16 = 4443; +static DOCKER_COMPOSE_ENV: RwLock> = RwLock::new(None); + +#[ctor] +fn before_all() { + let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); + let docker_compose = DockerCompose::new( + normalize_test_name(module_path!()), + format!("{}/testdata/file_io_gcs", env!("CARGO_MANIFEST_DIR")), + ); + docker_compose.run(); + guard.replace(docker_compose); +} + +#[dtor] +fn after_all() { + let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); + guard.take(); +} + +async fn get_file_io() -> FileIO { + set_up(); + + let guard = DOCKER_COMPOSE_ENV.read().unwrap(); + let docker_compose = guard.as_ref().unwrap(); + let container_ip = docker_compose.get_container_ip("gcs-server"); + let gcs_socket_addr = SocketAddr::new(container_ip, FAKE_GCS_PORT); + + FileIOBuilder::new("gcs") + .with_props(vec![ + (GCS_ENDPOINT, format!("http://{}", gcs_socket_addr)), + (GCS_BUCKET, "my-test-bucket".to_string()), + ]) + .build() + .unwrap() +} + +#[tokio::test] +async fn test_file_io_gcs_exists() { + let file_io = get_file_io().await; + assert!(file_io.is_exist("gs://my-test-bucket/").await.unwrap()); +} From ab2eb6f83dcc86537e5d57eba4b53fc442c747ca Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Sun, 4 Aug 2024 12:01:07 +0100 Subject: [PATCH 05/18] feat: add extra iceberg properties --- crates/iceberg/src/io/storage_gcs.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index 7aa956e0d..18544441c 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -27,38 +27,48 @@ use crate::{Error, ErrorKind, Result}; /// Google Cloud Storage bucket name pub const GCS_BUCKET: &str = "gcs.bucket"; -/// Google Project ID -pub const GCS_PROJECT_ID: &str = "gcs.project-id"; /// Google Cloud Storage endpoint pub const GCS_ENDPOINT: &str = "gcs.endpoint"; /// Google Cloud Storage OAuth token pub const GCS_OAUTH2_TOKEN: &str = "gcs.oauth2.token"; /// Google Cloud Storage working (root) directory pub const GCS_ROOT: &str = "gcs.root"; +/// Google Cloud Storage working (root) directory +pub const GCS_CREDENTIAL_PATH: &str = "gcs.credential-path"; /// Parse iceberg properties to [`GcsConfig`]. pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result { let mut cfg = GcsConfig::default(); - if let Some(endpoint) = m.remove(GCS_ENDPOINT) { - cfg.endpoint = Some(endpoint); - }; if let Some(bucket) = m.remove(GCS_BUCKET) { cfg.bucket = bucket; + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Bucket name is required for GCS", + )); } if let Some(root) = m.remove(GCS_ROOT) { cfg.root = Some(root) } - if let Some(credential) = m.remove(GCS_OAUTH2_TOKEN) { - cfg.credential = Some(credential) + if let Some(endpoint) = m.remove(GCS_ENDPOINT) { + cfg.endpoint = Some(endpoint); + } + + if let Some(cred_path) = m.remove(GCS_CREDENTIAL_PATH) { + cfg.credential_path = Some(cred_path); + } + + if let Some(token) = m.remove(GCS_OAUTH2_TOKEN) { + cfg.credential = Some(token); } Ok(cfg) } -/// Build a new OpenDAL [`Operator`] based on a known [`GcsConfig`]. +/// Build a new OpenDAL [`Operator`] based on a provided [`GcsConfig`]. pub(crate) fn gcs_config_build(cfg: &GcsConfig) -> Result { Ok(Operator::from_config(cfg.clone())?.finish()) } From 6a595c92e3dac020fbd6c89892d010ed2294a0e1 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Sun, 4 Aug 2024 12:02:42 +0100 Subject: [PATCH 06/18] feat: add tests for gcs read/write These are currently conditional tests with a todo comment using the test_with proc macro. More work needs to be done on investigating/potentially expanding OpenDAL to allow unauthenticated requests to fake-gcs-server. At the moment this always ends up reaching the final VM metadata check. --- crates/iceberg/Cargo.toml | 1 + crates/iceberg/tests/file_io_gcs_test.rs | 106 +++++++++++++++-------- 2 files changed, 71 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index a22cac624..4b52a152e 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -85,3 +85,4 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } +test-with = "0.13.0" diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/iceberg/tests/file_io_gcs_test.rs index be485a9b8..435fa8aef 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/iceberg/tests/file_io_gcs_test.rs @@ -17,53 +17,87 @@ //! Integration tests for FileIO Google Cloud Storage (GCS). -use std::net::SocketAddr; -use std::sync::RwLock; +use bytes::Bytes; +use iceberg::io::{FileIO, FileIOBuilder, GCS_BUCKET, GCS_CREDENTIAL_PATH}; +use iceberg_test_utils::set_up; -use ctor::{ctor, dtor}; -use iceberg::io::{FileIO, FileIOBuilder, GCS_BUCKET, GCS_ENDPOINT}; -use iceberg_test_utils::docker::DockerCompose; -use iceberg_test_utils::{normalize_test_name, set_up}; +// static DOCKER_COMPOSE_ENV: RwLock> = RwLock::new(None); -const FAKE_GCS_PORT: u16 = 4443; -static DOCKER_COMPOSE_ENV: RwLock> = RwLock::new(None); - -#[ctor] -fn before_all() { - let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); - let docker_compose = DockerCompose::new( - normalize_test_name(module_path!()), - format!("{}/testdata/file_io_gcs", env!("CARGO_MANIFEST_DIR")), - ); - docker_compose.run(); - guard.replace(docker_compose); -} - -#[dtor] -fn after_all() { - let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); - guard.take(); -} +// TODO: use compose with fake-gcs-server +//#[ctor] +//fn before_all() { +// let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); +// let docker_compose = DockerCompose::new( +// normalize_test_name(module_path!()), +// format!("{}/testdata/file_io_gcs", env!("CARGO_MANIFEST_DIR")), +// ); +// docker_compose.run(); +// guard.replace(docker_compose); +//} +// +//#[dtor] +//fn after_all() { +// let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); +// guard.take(); +//} -async fn get_file_io() -> FileIO { +async fn get_file_io_gcs() -> FileIO { set_up(); - let guard = DOCKER_COMPOSE_ENV.read().unwrap(); - let docker_compose = guard.as_ref().unwrap(); - let container_ip = docker_compose.get_container_ip("gcs-server"); - let gcs_socket_addr = SocketAddr::new(container_ip, FAKE_GCS_PORT); - FileIOBuilder::new("gcs") .with_props(vec![ - (GCS_ENDPOINT, format!("http://{}", gcs_socket_addr)), - (GCS_BUCKET, "my-test-bucket".to_string()), + (GCS_BUCKET, std::env::var("GCS_BUCKET").unwrap().to_string()), + ( + GCS_CREDENTIAL_PATH, + std::env::var("GCS_CREDENTIAL_PATH").unwrap().to_string(), + ), ]) .build() .unwrap() } +fn get_gs_path() -> String { + format!( + "gs://{}", + std::env::var("GCS_BUCKET").expect("Only runs with var enabled") + ) +} + #[tokio::test] -async fn test_file_io_gcs_exists() { - let file_io = get_file_io().await; - assert!(file_io.is_exist("gs://my-test-bucket/").await.unwrap()); +#[test_with::env(GCS_BUCKET, GCS_CREDENTIAL_PATH)] +async fn gcs_exists() { + let file_io = get_file_io_gcs().await; + assert!(file_io + .is_exist(format!("{}/", get_gs_path())) + .await + .unwrap()); +} + +#[tokio::test] +#[test_with::env(GCS_BUCKET, GCS_CREDENTIAL_PATH)] +async fn gcs_write() { + let gs_file = format!("{}/write-file", get_gs_path()); + let file_io = get_file_io_gcs().await; + let output = file_io.new_output(&gs_file).unwrap(); + output + .write(bytes::Bytes::from_static(b"iceberg-gcs!")) + .await + .expect("Write to test output file"); + assert!(file_io.is_exist(gs_file).await.unwrap()) +} + +#[tokio::test] +#[test_with::env(GCS_BUCKET, GCS_CREDENTIAL_PATH)] +async fn gcs_read() { + let gs_file = format!("{}/read-gcs", get_gs_path()); + let file_io = get_file_io_gcs().await; + let output = file_io.new_output(&gs_file).unwrap(); + output + .write(bytes::Bytes::from_static(b"iceberg!")) + .await + .expect("Write to test output file"); + assert!(file_io.is_exist(&gs_file).await.unwrap()); + + let input = file_io.new_input(gs_file).unwrap(); + assert_eq!(input.read().await.unwrap(), Bytes::from_static(b"iceberg!")); } From 3a2f166f0e07868532326be1a215766691de2791 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Sun, 4 Aug 2024 12:04:45 +0100 Subject: [PATCH 07/18] chore: minor cleanup for compose todo --- crates/iceberg/testdata/file_io_gcs/docker-compose.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/iceberg/testdata/file_io_gcs/docker-compose.yaml b/crates/iceberg/testdata/file_io_gcs/docker-compose.yaml index a71bcbe45..e152f2540 100644 --- a/crates/iceberg/testdata/file_io_gcs/docker-compose.yaml +++ b/crates/iceberg/testdata/file_io_gcs/docker-compose.yaml @@ -17,8 +17,6 @@ services: gcs-server: - # v1.49 image: fsouza/fake-gcs-server@sha256:36b0116fae5236e8def76ccb07761a9ca323e476f366a5f4bf449cac19deaf2d expose: - 4443 - #environment: From 0900c152b6b7a07d32c40a235ddab1b93bf66f6a Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Mon, 5 Aug 2024 17:09:40 +0100 Subject: [PATCH 08/18] fix: do not introduce new properties --- crates/iceberg/src/io/storage_gcs.rs | 35 ++++------------------------ 1 file changed, 4 insertions(+), 31 deletions(-) diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index 18544441c..4f870b578 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -25,46 +25,19 @@ use crate::{Error, ErrorKind, Result}; // Reference: https://github.com/apache/iceberg/blob/main/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java -/// Google Cloud Storage bucket name -pub const GCS_BUCKET: &str = "gcs.bucket"; +/// Google Cloud Project ID +pub const GCS_PROJECT_ID: &str = "gcs.project-id"; /// Google Cloud Storage endpoint -pub const GCS_ENDPOINT: &str = "gcs.endpoint"; -/// Google Cloud Storage OAuth token -pub const GCS_OAUTH2_TOKEN: &str = "gcs.oauth2.token"; -/// Google Cloud Storage working (root) directory -pub const GCS_ROOT: &str = "gcs.root"; -/// Google Cloud Storage working (root) directory -pub const GCS_CREDENTIAL_PATH: &str = "gcs.credential-path"; +pub const GCS_SERVICE_PATH: &str = "gcs.service.path"; /// Parse iceberg properties to [`GcsConfig`]. pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result { let mut cfg = GcsConfig::default(); - if let Some(bucket) = m.remove(GCS_BUCKET) { - cfg.bucket = bucket; - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - "Bucket name is required for GCS", - )); - } - - if let Some(root) = m.remove(GCS_ROOT) { - cfg.root = Some(root) - } - - if let Some(endpoint) = m.remove(GCS_ENDPOINT) { + if let Some(endpoint) = m.remove(GCS_SERVICE_PATH) { cfg.endpoint = Some(endpoint); } - if let Some(cred_path) = m.remove(GCS_CREDENTIAL_PATH) { - cfg.credential_path = Some(cred_path); - } - - if let Some(token) = m.remove(GCS_OAUTH2_TOKEN) { - cfg.credential = Some(token); - } - Ok(cfg) } From 5e171cce500b1046a120afa7b79a974a075a7bb7 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Mon, 5 Aug 2024 17:30:36 +0100 Subject: [PATCH 09/18] feat: infer bucket from path --- crates/iceberg/src/io/storage.rs | 2 +- crates/iceberg/src/io/storage_gcs.rs | 15 +++++++++++++-- crates/iceberg/tests/file_io_gcs_test.rs | 14 ++------------ 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index b894a9f3f..71526e377 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -127,7 +127,7 @@ impl Storage { } #[cfg(feature = "storage-gcs")] Storage::Gcs { config } => { - let operator = super::gcs_config_build(config)?; + let operator = super::gcs_config_build(config, path)?; let prefix = format!("gs://{}/", operator.info().name()); if path.starts_with(&prefix) { Ok((operator, &path[prefix.len()..])) diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index 4f870b578..e2641b0d1 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use opendal::services::GcsConfig; use opendal::Operator; +use url::Url; use crate::{Error, ErrorKind, Result}; @@ -42,6 +43,16 @@ pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result Result { - Ok(Operator::from_config(cfg.clone())?.finish()) +pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result { + let url = Url::parse(path)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {}, bucket is required", path), + ) + })?; + + let mut cfg = cfg.clone(); + cfg.bucket = bucket.to_string(); + Ok(Operator::from_config(cfg)?.finish()) } diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/iceberg/tests/file_io_gcs_test.rs index 435fa8aef..4875c9e74 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/iceberg/tests/file_io_gcs_test.rs @@ -18,7 +18,7 @@ //! Integration tests for FileIO Google Cloud Storage (GCS). use bytes::Bytes; -use iceberg::io::{FileIO, FileIOBuilder, GCS_BUCKET, GCS_CREDENTIAL_PATH}; +use iceberg::io::{FileIO, FileIOBuilder}; use iceberg_test_utils::set_up; // static DOCKER_COMPOSE_ENV: RwLock> = RwLock::new(None); @@ -43,17 +43,7 @@ use iceberg_test_utils::set_up; async fn get_file_io_gcs() -> FileIO { set_up(); - - FileIOBuilder::new("gcs") - .with_props(vec![ - (GCS_BUCKET, std::env::var("GCS_BUCKET").unwrap().to_string()), - ( - GCS_CREDENTIAL_PATH, - std::env::var("GCS_CREDENTIAL_PATH").unwrap().to_string(), - ), - ]) - .build() - .unwrap() + FileIOBuilder::new("gcs").build().unwrap() } fn get_gs_path() -> String { From f7410550a368ed5ec04da41f509481b0a551e45a Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Mon, 5 Aug 2024 17:39:47 +0100 Subject: [PATCH 10/18] chore: add user-project const --- crates/iceberg/src/io/storage_gcs.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index e2641b0d1..52537e420 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -30,6 +30,8 @@ use crate::{Error, ErrorKind, Result}; pub const GCS_PROJECT_ID: &str = "gcs.project-id"; /// Google Cloud Storage endpoint pub const GCS_SERVICE_PATH: &str = "gcs.service.path"; +/// Google Cloud user project +pub const GCS_USER_PROJECT: &str = "gcs.user-project"; /// Parse iceberg properties to [`GcsConfig`]. pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result { From 30f90d147dae02fdedec560e3be47f9178a3dc82 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Tue, 6 Aug 2024 14:47:38 +0100 Subject: [PATCH 11/18] feat: add allow_anonymous for test --- crates/iceberg/src/io/storage_gcs.rs | 6 ++++++ crates/iceberg/testdata/file_io_gcs/docker-compose.yaml | 1 + 2 files changed, 7 insertions(+) diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index 52537e420..c7f7ce97d 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -32,6 +32,8 @@ pub const GCS_PROJECT_ID: &str = "gcs.project-id"; pub const GCS_SERVICE_PATH: &str = "gcs.service.path"; /// Google Cloud user project pub const GCS_USER_PROJECT: &str = "gcs.user-project"; +/// Allow unauthenticated requests +pub const GCS_NO_AUTH: &str = "gcs.no-auth"; /// Parse iceberg properties to [`GcsConfig`]. pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result { @@ -41,6 +43,10 @@ pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result Date: Tue, 6 Aug 2024 14:54:29 +0100 Subject: [PATCH 12/18] chore: remove test-with dep --- crates/iceberg/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 4b52a152e..a22cac624 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -85,4 +85,3 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } -test-with = "0.13.0" From c9f7c6494bdd9567edafaaab82c0cb28f789c9db Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Tue, 6 Aug 2024 14:54:58 +0100 Subject: [PATCH 13/18] feat: update with allow_anonymous functionality This requires the opendal allow_anonymous funcitonality with the GCS service to work. --- crates/iceberg/Cargo.toml | 3 +- crates/iceberg/tests/file_io_gcs_test.rs | 93 ++++++++++++++++-------- 2 files changed, 65 insertions(+), 31 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index a22cac624..42dc2595c 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -63,7 +63,8 @@ futures = { workspace = true } itertools = { workspace = true } murmur3 = { workspace = true } once_cell = { workspace = true } -opendal = { workspace = true } +# opendal = { workspace = true } +opendal = { git = "https://github.com/apache/opendal", rev = "0a3e98f"} ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } reqwest = { workspace = true } diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/iceberg/tests/file_io_gcs_test.rs index 4875c9e74..e8ca2ba16 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/iceberg/tests/file_io_gcs_test.rs @@ -17,44 +17,79 @@ //! Integration tests for FileIO Google Cloud Storage (GCS). +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::RwLock; + use bytes::Bytes; -use iceberg::io::{FileIO, FileIOBuilder}; -use iceberg_test_utils::set_up; - -// static DOCKER_COMPOSE_ENV: RwLock> = RwLock::new(None); - -// TODO: use compose with fake-gcs-server -//#[ctor] -//fn before_all() { -// let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); -// let docker_compose = DockerCompose::new( -// normalize_test_name(module_path!()), -// format!("{}/testdata/file_io_gcs", env!("CARGO_MANIFEST_DIR")), -// ); -// docker_compose.run(); -// guard.replace(docker_compose); -//} -// -//#[dtor] -//fn after_all() { -// let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); -// guard.take(); -//} +use ctor::{ctor, dtor}; +use iceberg::io::{FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH}; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; + +static DOCKER_COMPOSE_ENV: RwLock> = RwLock::new(None); +static FAKE_GCS_PORT: u16 = 4443; +static FAKE_GCS_BUCKET: &str = "test-bucket"; + +#[ctor] +fn before_all() { + let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); + let docker_compose = DockerCompose::new( + normalize_test_name(module_path!()), + format!("{}/testdata/file_io_gcs", env!("CARGO_MANIFEST_DIR")), + ); + docker_compose.run(); + guard.replace(docker_compose); +} + +#[dtor] +fn after_all() { + let mut guard = DOCKER_COMPOSE_ENV.write().unwrap(); + guard.take(); +} async fn get_file_io_gcs() -> FileIO { set_up(); - FileIOBuilder::new("gcs").build().unwrap() + + let ip = DOCKER_COMPOSE_ENV + .read() + .unwrap() + .as_ref() + .unwrap() + .get_container_ip("gcs-server"); + let addr = SocketAddr::new(ip, FAKE_GCS_PORT); + + // A bucket must exist for FileIO + create_bucket(FAKE_GCS_BUCKET, addr.to_string()) + .await + .unwrap(); + + FileIOBuilder::new("gcs") + .with_props(vec![ + (GCS_SERVICE_PATH, format!("http://{}", addr)), + (GCS_NO_AUTH, "true".to_string()), + ]) + .build() + .unwrap() +} + +// Create a bucket against the emulated GCS storage server. +async fn create_bucket(name: &str, server_addr: String) -> anyhow::Result<()> { + let mut bucket_data = HashMap::new(); + bucket_data.insert("name", name); + + let client = reqwest::Client::new(); + let endpoint = format!("http://{}/storage/v1/b", server_addr); + client.post(endpoint).json(&bucket_data).send().await?; + Ok(()) } fn get_gs_path() -> String { - format!( - "gs://{}", - std::env::var("GCS_BUCKET").expect("Only runs with var enabled") - ) + format!("gs://{}", FAKE_GCS_BUCKET) } #[tokio::test] -#[test_with::env(GCS_BUCKET, GCS_CREDENTIAL_PATH)] +//#[test_with::env(GCS_BUCKET, GCS_CREDENTIAL_PATH)] async fn gcs_exists() { let file_io = get_file_io_gcs().await; assert!(file_io @@ -64,7 +99,6 @@ async fn gcs_exists() { } #[tokio::test] -#[test_with::env(GCS_BUCKET, GCS_CREDENTIAL_PATH)] async fn gcs_write() { let gs_file = format!("{}/write-file", get_gs_path()); let file_io = get_file_io_gcs().await; @@ -77,7 +111,6 @@ async fn gcs_write() { } #[tokio::test] -#[test_with::env(GCS_BUCKET, GCS_CREDENTIAL_PATH)] async fn gcs_read() { let gs_file = format!("{}/read-gcs", get_gs_path()); let file_io = get_file_io_gcs().await; From e4e7dadf286222f9456fecbda84566864938a0d3 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Wed, 7 Aug 2024 16:27:15 +0100 Subject: [PATCH 14/18] ci: use cargo sort --- crates/iceberg/Cargo.toml | 3 +-- crates/iceberg/tests/file_io_gcs_test.rs | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 42dc2595c..c6e758b34 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -63,8 +63,7 @@ futures = { workspace = true } itertools = { workspace = true } murmur3 = { workspace = true } once_cell = { workspace = true } -# opendal = { workspace = true } -opendal = { git = "https://github.com/apache/opendal", rev = "0a3e98f"} +opendal = { git = "https://github.com/apache/opendal", rev = "0a3e98f" } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } reqwest = { workspace = true } diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/iceberg/tests/file_io_gcs_test.rs index e8ca2ba16..98539e9c4 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/iceberg/tests/file_io_gcs_test.rs @@ -89,7 +89,6 @@ fn get_gs_path() -> String { } #[tokio::test] -//#[test_with::env(GCS_BUCKET, GCS_CREDENTIAL_PATH)] async fn gcs_exists() { let file_io = get_file_io_gcs().await; assert!(file_io From 5284405bd56d2409d6c56162c6e1fc601ffc9e21 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Fri, 9 Aug 2024 09:33:20 +0100 Subject: [PATCH 15/18] chore: undo storage-gcs default feature --- crates/iceberg/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index c6e758b34..131b7d591 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,7 +29,7 @@ license = { workspace = true } keywords = ["iceberg"] [features] -default = ["storage-memory", "storage-fs", "storage-s3", "tokio", "storage-gcs"] +default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] storage-memory = ["opendal/services-memory"] From 7f2e8d860ab04ddbe866a1daea23ed7feaf2cd38 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Fri, 9 Aug 2024 12:56:59 +0100 Subject: [PATCH 16/18] feat: include disable_ params for GCS_NO_AUTH --- crates/iceberg/src/io/storage_gcs.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index c7f7ce97d..0a2410799 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -45,6 +45,8 @@ pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result Date: Fri, 9 Aug 2024 13:15:51 +0100 Subject: [PATCH 17/18] ci: use storage-all for async-std tests --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d155b2949..38f450bf7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -113,7 +113,7 @@ jobs: run: cargo test --no-fail-fast --all-targets --all-features --workspace - name: Async-std Test - run: cargo test --no-fail-fast --all-targets --no-default-features --features "async-std" --features "storage-fs" --workspace + run: cargo test --no-fail-fast --all-targets --no-default-features --features "async-std" --features "storage-all" --workspace - name: Doc Test run: cargo test --no-fail-fast --doc --all-features --workspace From 3ab0a376690f0592861fa82fc738e10ddb050fef Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Tue, 13 Aug 2024 11:31:43 +0100 Subject: [PATCH 18/18] revert: use opendal from workspace Now that v0.49 has been released, this work does not need to pin to a particular version! --- crates/iceberg/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 131b7d591..84e29dfcb 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -63,7 +63,7 @@ futures = { workspace = true } itertools = { workspace = true } murmur3 = { workspace = true } once_cell = { workspace = true } -opendal = { git = "https://github.com/apache/opendal", rev = "0a3e98f" } +opendal = { workspace = true } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } reqwest = { workspace = true }