From aca68c37002c31b93c278f9fa05ace47d7bfd135 Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Fri, 28 Jul 2023 16:10:06 +0200 Subject: [PATCH 1/2] wip poc using rusoto_s3 --- rust/Cargo.toml | 3 ++ rust/src/test_utils.rs | 47 +++++++++++---------- rust/tests/command_filesystem_check.rs | 8 ++-- rust/tests/integration_checkpoint.rs | 4 +- rust/tests/integration_concurrent_writes.rs | 2 +- rust/tests/integration_object_store.rs | 4 +- rust/tests/integration_read.rs | 4 +- rust/tests/repair_s3_rename_test.rs | 2 +- 8 files changed, 40 insertions(+), 34 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 492f781de5..e8219e24fc 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -60,6 +60,7 @@ rusoto_core = { version = "0.47", default-features = false, optional = true } rusoto_credential = { version = "0.47", optional = true } rusoto_sts = { version = "0.47", default-features = false, optional = true } rusoto_dynamodb = { version = "0.47", default-features = false, optional = true } +rusoto_s3 = { version = "0.47", default-features = false, optional = true } # Glue rusoto_glue = { version = "0.47", default-features = false, optional = true } @@ -131,6 +132,7 @@ s3-native-tls = [ "rusoto_credential", "rusoto_sts/native-tls", "rusoto_dynamodb/native-tls", + "rusoto_s3/native-tls", "dynamodb_lock/native-tls", "object_store/aws", ] @@ -139,6 +141,7 @@ s3 = [ "rusoto_credential", "rusoto_sts/rustls", "rusoto_dynamodb/rustls", + "rusoto_s3/rustls", "dynamodb_lock/rustls", "object_store/aws", ] diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index a11bff8c17..969439d8eb 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -19,7 +19,7 @@ pub struct IntegrationContext { } impl IntegrationContext { - pub fn new( + pub async fn new( integration: StorageIntegration, ) -> Result> { // environment variables are loaded from .env files if found. Otherwise @@ -46,7 +46,7 @@ impl IntegrationContext { account_path.as_path().to_str().unwrap(), ); } - integration.create_bucket(&bucket)?; + integration.create_bucket(&bucket).await?; let store_uri = match integration { StorageIntegration::Amazon => format!("s3://{}", &bucket), StorageIntegration::Microsoft => format!("az://{}", &bucket), @@ -177,14 +177,14 @@ impl StorageIntegration { } } - fn create_bucket(&self, name: impl AsRef) -> std::io::Result<()> { + async fn create_bucket(&self, name: impl AsRef) -> std::io::Result<()> { match self { Self::Microsoft => { az_cli::create_container(name)?; Ok(()) } Self::Amazon => { - s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?; + s3_cli::create_bucket(name.as_ref()).await.unwrap(); set_env_if_not_set( "DYNAMO_LOCK_PARTITION_KEY_VALUE", format!("s3://{}", name.as_ref()), @@ -336,26 +336,29 @@ pub mod s3_cli { use super::set_env_if_not_set; use crate::builder::s3_storage_options; use std::process::{Command, ExitStatus, Stdio}; + use rusoto_s3::{S3Client, S3, CreateBucketRequest, CreateBucketOutput, CreateBucketError}; + use rusoto_credential::EnvironmentProvider; + use rusoto_core::{HttpClient, Region, RusotoError}; /// Create a new bucket - pub fn create_bucket(bucket_name: impl AsRef) -> std::io::Result { - let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) - .expect("variable ENDPOINT must be set to connect to S3"); - let region = std::env::var(s3_storage_options::AWS_REGION) - .expect("variable AWS_REGION must be set to connect to S3"); - let mut child = Command::new("aws") - .args([ - "s3", - "mb", - bucket_name.as_ref(), - "--endpoint-url", - &endpoint, - "--region", - ®ion, - ]) - .spawn() - .expect("aws command is installed"); - child.wait() + pub async fn create_bucket(bucket_name: impl AsRef) -> Result> { + let region_name = std::env::var(s3_storage_options::AWS_REGION) + .expect("variable AWS_REGION must be set to connect to S3"); + let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) + .expect("variable ENDPOINT must be set to connect to S3"); + + let region = Region::Custom { name: region_name, endpoint: endpoint_url }; + let s3_client = S3Client::new_with( + HttpClient::new().unwrap(), + EnvironmentProvider::default(), + region + ); + return s3_client.create_bucket( + CreateBucketRequest { + bucket: bucket_name.as_ref().to_string(), + ..CreateBucketRequest::default() + } + ).await; } /// delete bucket diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index ced317d990..4f2d3b946e 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -46,7 +46,7 @@ async fn test_filesystem_check_hdfs() -> TestResult { } async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(storage)?; + let context = IntegrationContext::new(storage).await?; context.load_table(TestTables::Simple).await?; let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"; let path = Path::from_iter([&TestTables::Simple.as_name(), file]); @@ -89,7 +89,7 @@ async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { #[serial] async fn test_filesystem_check_partitioned() -> TestResult { let storage = StorageIntegration::Local; - let context = IntegrationContext::new(storage)?; + let context = IntegrationContext::new(storage).await?; context .load_table(TestTables::Delta0_8_0Partitioned) .await?; @@ -122,7 +122,7 @@ async fn test_filesystem_check_partitioned() -> TestResult { #[serial] async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult { // Validate failure when a non dry only executes on the latest version - let context = IntegrationContext::new(StorageIntegration::Local)?; + let context = IntegrationContext::new(StorageIntegration::Local).await?; context.load_table(TestTables::Simple).await?; let file = "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"; let path = Path::from_iter([&TestTables::Simple.as_name(), file]); @@ -150,7 +150,7 @@ async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult { #[ignore = "should this actually fail? with conflcit resolution, we are re-trying again."] async fn test_filesystem_check_outdated() -> TestResult { // Validate failure when a non dry only executes on the latest version - let context = IntegrationContext::new(StorageIntegration::Local)?; + let context = IntegrationContext::new(StorageIntegration::Local).await?; context.load_table(TestTables::Simple).await?; let file = "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"; let path = Path::from_iter([&TestTables::Simple.as_name(), file]); diff --git a/rust/tests/integration_checkpoint.rs b/rust/tests/integration_checkpoint.rs index c4361ac7bf..a69177841b 100644 --- a/rust/tests/integration_checkpoint.rs +++ b/rust/tests/integration_checkpoint.rs @@ -13,7 +13,7 @@ use tokio::time::sleep; #[tokio::test] async fn cleanup_metadata_fs_test() -> TestResult { - let context = IntegrationContext::new(StorageIntegration::Local)?; + let context = IntegrationContext::new(StorageIntegration::Local).await?; cleanup_metadata_test(&context).await?; Ok(()) } @@ -22,7 +22,7 @@ async fn cleanup_metadata_fs_test() -> TestResult { #[tokio::test] #[serial] async fn cleanup_metadata_aws_test() -> TestResult { - let context = IntegrationContext::new(StorageIntegration::Amazon)?; + let context = IntegrationContext::new(StorageIntegration::Amazon).await?; cleanup_metadata_test(&context).await?; Ok(()) } diff --git a/rust/tests/integration_concurrent_writes.rs b/rust/tests/integration_concurrent_writes.rs index 91e4345963..25da51c9e7 100644 --- a/rust/tests/integration_concurrent_writes.rs +++ b/rust/tests/integration_concurrent_writes.rs @@ -40,7 +40,7 @@ async fn test_concurrent_writes_hdfs() -> TestResult { } async fn test_concurrent_writes(integration: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(integration)?; + let context = IntegrationContext::new(integration).await?; let (_table, table_uri) = prepare_table(&context).await?; run_test(|name| Worker::new(&table_uri, name)).await; Ok(()) diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index 6fd31d759b..9136c3cf27 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -49,7 +49,7 @@ async fn test_object_store_hdfs() -> TestResult { } async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> TestResult { - let context = IntegrationContext::new(integration)?; + let context = IntegrationContext::new(integration).await?; let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) .with_allow_http(true) .build_storage()?; @@ -425,7 +425,7 @@ async fn test_object_store_prefixes_local() -> TestResult { } async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(integration)?; + let context = IntegrationContext::new(integration).await?; let prefixes = &["table path", "table path/hello%3F", "你好/😊"]; for prefix in prefixes { let rooturi = format!("{}/{}", context.root_uri(), prefix); diff --git a/rust/tests/integration_read.rs b/rust/tests/integration_read.rs index 178b6692b1..93c0a1d5f4 100644 --- a/rust/tests/integration_read.rs +++ b/rust/tests/integration_read.rs @@ -117,7 +117,7 @@ mod s3 { } async fn read_tables(storage: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(storage)?; + let context = IntegrationContext::new(storage).await?; context.load_table(TestTables::Simple).await?; context.load_table(TestTables::Golden).await?; context @@ -136,7 +136,7 @@ async fn read_table_paths( table_root: &str, upload_path: &str, ) -> TestResult { - let context = IntegrationContext::new(storage)?; + let context = IntegrationContext::new(storage).await?; context .load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path) .await?; diff --git a/rust/tests/repair_s3_rename_test.rs b/rust/tests/repair_s3_rename_test.rs index 25a9a5e060..09d2b83d7c 100644 --- a/rust/tests/repair_s3_rename_test.rs +++ b/rust/tests/repair_s3_rename_test.rs @@ -42,7 +42,7 @@ async fn repair_when_worker_pauses_after_rename_test() { async fn run_repair_test_case(path: &str, pause_copy: bool) -> Result<(), ObjectStoreError> { std::env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb"); std::env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2"); - let context = IntegrationContext::new(StorageIntegration::Amazon).unwrap(); + let context = IntegrationContext::new(StorageIntegration::Amazon).await.unwrap(); let root_path = Path::from(path); let src1 = root_path.child("src1"); From 0f015aac73e5e42efdd202fcfa4ab43497d6f0a6 Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Sat, 29 Jul 2023 10:35:53 +0200 Subject: [PATCH 2/2] drop bucket with --- rust/src/test_utils.rs | 106 ++++++++++++++------ rust/tests/command_filesystem_check.rs | 2 +- rust/tests/integration_checkpoint.rs | 2 +- rust/tests/integration_concurrent_writes.rs | 2 +- rust/tests/integration_object_store.rs | 2 +- rust/tests/integration_read.rs | 4 +- rust/tests/repair_s3_rename_test.rs | 4 +- 7 files changed, 85 insertions(+), 37 deletions(-) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 969439d8eb..9af745d8c1 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -140,7 +140,7 @@ impl Drop for IntegrationContext { fn drop(&mut self) { match self.integration { StorageIntegration::Amazon => { - s3_cli::delete_bucket(self.root_uri()).unwrap(); + s3_cli::delete_bucket(&self.bucket); s3_cli::delete_lock_table().unwrap(); } StorageIntegration::Microsoft => { @@ -184,7 +184,7 @@ impl StorageIntegration { Ok(()) } Self::Amazon => { - s3_cli::create_bucket(name.as_ref()).await.unwrap(); + s3_cli::create_bucket(name.as_ref()).await; set_env_if_not_set( "DYNAMO_LOCK_PARTITION_KEY_VALUE", format!("s3://{}", name.as_ref()), @@ -335,48 +335,94 @@ pub mod az_cli { pub mod s3_cli { use super::set_env_if_not_set; use crate::builder::s3_storage_options; - use std::process::{Command, ExitStatus, Stdio}; - use rusoto_s3::{S3Client, S3, CreateBucketRequest, CreateBucketOutput, CreateBucketError}; + use rusoto_core::{HttpClient, Region}; use rusoto_credential::EnvironmentProvider; - use rusoto_core::{HttpClient, Region, RusotoError}; - + use rusoto_s3::{ + CreateBucketRequest, Delete, DeleteBucketRequest, DeleteObjectsRequest, + ListObjectsV2Request, ObjectIdentifier, S3Client, S3, + }; + use std::process::{Command, ExitStatus, Stdio}; /// Create a new bucket - pub async fn create_bucket(bucket_name: impl AsRef) -> Result> { - let region_name = std::env::var(s3_storage_options::AWS_REGION) - .expect("variable AWS_REGION must be set to connect to S3"); - let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) - .expect("variable ENDPOINT must be set to connect to S3"); + pub async fn create_bucket(bucket_name: impl AsRef) { + let region = std::env::var(s3_storage_options::AWS_REGION) + .expect("variable AWS_REGION must be set to connect to S3"); + let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) + .expect("variable ENDPOINT must be set to connect to S3"); - let region = Region::Custom { name: region_name, endpoint: endpoint_url }; let s3_client = S3Client::new_with( HttpClient::new().unwrap(), EnvironmentProvider::default(), - region + Region::Custom { + name: region, + endpoint: endpoint, + }, ); - return s3_client.create_bucket( - CreateBucketRequest { + s3_client + .create_bucket(CreateBucketRequest { bucket: bucket_name.as_ref().to_string(), ..CreateBucketRequest::default() - } - ).await; + }) + .await + .unwrap(); } /// delete bucket - pub fn delete_bucket(bucket_name: impl AsRef) -> std::io::Result { + pub fn delete_bucket(bucket_name: impl AsRef) { + let region = std::env::var(s3_storage_options::AWS_REGION) + .expect("variable AWS_REGION must be set to connect to S3"); let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) .expect("variable ENDPOINT must be set to connect to S3"); - let mut child = Command::new("aws") - .args([ - "s3", - "rb", - bucket_name.as_ref(), - "--endpoint-url", - &endpoint, - "--force", - ]) - .spawn() - .expect("aws command is installed"); - child.wait() + + let s3_client = S3Client::new_with( + HttpClient::new().unwrap(), + EnvironmentProvider::default(), + Region::Custom { + name: region, + endpoint: endpoint, + }, + ); + + futures::executor::block_on(async { + // objects must be deleted before the bucket can be deleted + let objects: Vec = s3_client + .list_objects_v2(ListObjectsV2Request { + bucket: bucket_name.as_ref().to_string(), + ..ListObjectsV2Request::default() + }) + .await + .unwrap() + .contents + .into_iter() + .flatten() + .filter_map(|x| x.key) + .map(|key| ObjectIdentifier { + key: key, + version_id: None, + }) + .collect(); + + if !objects.is_empty() { + s3_client + .delete_objects(DeleteObjectsRequest { + bucket: bucket_name.as_ref().to_string(), + delete: Delete { + objects: objects, + quiet: Some(true), + }, + ..DeleteObjectsRequest::default() + }) + .await + .unwrap(); + } + + s3_client + .delete_bucket(DeleteBucketRequest { + bucket: bucket_name.as_ref().to_string(), + ..DeleteBucketRequest::default() + }) + .await + .unwrap(); + }); } /// copy directory diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index 4f2d3b946e..94d746112e 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -16,7 +16,7 @@ async fn test_filesystem_check_local() -> TestResult { } #[cfg(any(feature = "s3", feature = "s3-native-tls"))] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn test_filesystem_check_aws() -> TestResult { set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true"); diff --git a/rust/tests/integration_checkpoint.rs b/rust/tests/integration_checkpoint.rs index a69177841b..fe64f8eb84 100644 --- a/rust/tests/integration_checkpoint.rs +++ b/rust/tests/integration_checkpoint.rs @@ -19,7 +19,7 @@ async fn cleanup_metadata_fs_test() -> TestResult { } #[cfg(any(feature = "s3", feature = "s3-native-tls"))] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn cleanup_metadata_aws_test() -> TestResult { let context = IntegrationContext::new(StorageIntegration::Amazon).await?; diff --git a/rust/tests/integration_concurrent_writes.rs b/rust/tests/integration_concurrent_writes.rs index 25da51c9e7..424f4d2ff5 100644 --- a/rust/tests/integration_concurrent_writes.rs +++ b/rust/tests/integration_concurrent_writes.rs @@ -17,7 +17,7 @@ async fn test_concurrent_writes_local() -> TestResult { } #[cfg(feature = "s3")] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn concurrent_writes_s3() -> TestResult { test_concurrent_writes(StorageIntegration::Amazon).await?; Ok(()) diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index 9136c3cf27..7546b05a29 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -23,7 +23,7 @@ async fn test_object_store_azure() -> TestResult { } #[cfg(feature = "s3")] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn test_object_store_aws() -> TestResult { test_object_store(StorageIntegration::Amazon, true).await?; diff --git a/rust/tests/integration_read.rs b/rust/tests/integration_read.rs index 93c0a1d5f4..5ec6351651 100644 --- a/rust/tests/integration_read.rs +++ b/rust/tests/integration_read.rs @@ -37,7 +37,7 @@ mod azure { mod local { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_read_tables_local() -> TestResult { read_tables(StorageIntegration::Local).await?; @@ -103,7 +103,7 @@ mod hdfs { #[cfg(any(feature = "s3", feature = "s3-native-tls"))] mod s3 { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_read_tables_aws() -> TestResult { read_tables(StorageIntegration::Amazon).await?; diff --git a/rust/tests/repair_s3_rename_test.rs b/rust/tests/repair_s3_rename_test.rs index 09d2b83d7c..f55f23baaa 100644 --- a/rust/tests/repair_s3_rename_test.rs +++ b/rust/tests/repair_s3_rename_test.rs @@ -42,7 +42,9 @@ async fn repair_when_worker_pauses_after_rename_test() { async fn run_repair_test_case(path: &str, pause_copy: bool) -> Result<(), ObjectStoreError> { std::env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb"); std::env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2"); - let context = IntegrationContext::new(StorageIntegration::Amazon).await.unwrap(); + let context = IntegrationContext::new(StorageIntegration::Amazon) + .await + .unwrap(); let root_path = Path::from(path); let src1 = root_path.child("src1");