From 43cf33ffccb21e707f425913d0934835871bf555 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Tue, 5 Dec 2023 08:40:36 -0500 Subject: [PATCH 1/5] remove df-cli specific options from create external table options --- datafusion-cli/src/exec.rs | 11 ++++---- datafusion-cli/src/object_storage.rs | 39 ++++++++++++++-------------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 63862caab82a..c8777a847470 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -211,7 +211,7 @@ async fn exec_and_print( })?; let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?; for statement in statements { - let plan = ctx.state().statement_to_plan(statement).await?; + let mut plan = ctx.state().statement_to_plan(statement).await?; // For plans like `Explain` ignore `MaxRows` option and always display all rows let should_ignore_maxrows = matches!( @@ -221,10 +221,9 @@ async fn exec_and_print( | LogicalPlan::Analyze(_) ); - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(ctx, cmd).await?; } - let df = ctx.execute_logical_plan(plan).await?; let results = df.collect().await?; @@ -244,7 +243,7 @@ async fn exec_and_print( async fn create_external_table( ctx: &SessionContext, - cmd: &CreateExternalTable, + cmd: &mut CreateExternalTable, ) -> Result<()> { let table_path = ListingTableUrl::parse(&cmd.location)?; let scheme = table_path.scheme(); @@ -290,9 +289,9 @@ mod tests { async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(sql).await?; + let mut plan = ctx.state().create_logical_plan(sql).await?; - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(&ctx, cmd).await?; } else { return plan_err!("LogicalPlan is not a CreateExternalTable"); diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index c39d1915eb43..17d0f9b9e604 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -30,20 +30,21 @@ use url::Url; pub async fn get_s3_object_store_builder( url: &Url, - cmd: &CreateExternalTable, + cmd: &mut CreateExternalTable, ) -> Result { let bucket_name = get_bucket_name(url)?; let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name); if let (Some(access_key_id), Some(secret_access_key)) = ( - cmd.options.get("access_key_id"), - cmd.options.get("secret_access_key"), + cmd.options.remove("access_key_id"), + cmd.options.remove("secret_access_key"), ) { + println!("removing secret access key!"); builder = builder .with_access_key_id(access_key_id) .with_secret_access_key(secret_access_key); - if let Some(session_token) = cmd.options.get("session_token") { + if let Some(session_token) = cmd.options.remove("session_token") { builder = builder.with_token(session_token); } } else { @@ -66,7 +67,7 @@ pub async fn get_s3_object_store_builder( builder = builder.with_credentials(credentials); } - if let Some(region) = cmd.options.get("region") { + if let Some(region) = cmd.options.remove("region") { builder = builder.with_region(region); } @@ -99,7 +100,7 @@ impl CredentialProvider for S3CredentialProvider { pub fn get_oss_object_store_builder( url: &Url, - cmd: &CreateExternalTable, + cmd: &mut CreateExternalTable, ) -> Result { let bucket_name = get_bucket_name(url)?; let mut builder = AmazonS3Builder::from_env() @@ -109,15 +110,15 @@ pub fn get_oss_object_store_builder( .with_region("do_not_care"); if let (Some(access_key_id), Some(secret_access_key)) = ( - cmd.options.get("access_key_id"), - cmd.options.get("secret_access_key"), + cmd.options.remove("access_key_id"), + cmd.options.remove("secret_access_key"), ) { builder = builder .with_access_key_id(access_key_id) .with_secret_access_key(secret_access_key); } - if let Some(endpoint) = cmd.options.get("endpoint") { + if let Some(endpoint) = cmd.options.remove("endpoint") { builder = builder.with_endpoint(endpoint); } @@ -126,21 +127,21 @@ pub fn get_oss_object_store_builder( pub fn get_gcs_object_store_builder( url: &Url, - cmd: &CreateExternalTable, + cmd: &mut CreateExternalTable, ) -> Result { let bucket_name = get_bucket_name(url)?; let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name); - if let Some(service_account_path) = cmd.options.get("service_account_path") { + if let Some(service_account_path) = cmd.options.remove("service_account_path") { builder = builder.with_service_account_path(service_account_path); } - if let Some(service_account_key) = cmd.options.get("service_account_key") { + if let Some(service_account_key) = cmd.options.remove("service_account_key") { builder = builder.with_service_account_key(service_account_key); } if let Some(application_credentials_path) = - cmd.options.get("application_credentials_path") + cmd.options.remove("application_credentials_path") { builder = builder.with_application_credentials(application_credentials_path); } @@ -180,9 +181,9 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}', 'region' '{region}', 'session_token' {session_token}) LOCATION '{location}'"); let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx.state().create_logical_plan(&sql).await?; - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { let builder = get_s3_object_store_builder(table_url.as_ref(), cmd).await?; // get the actual configuration information, then assert_eq! let config = [ @@ -212,9 +213,9 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}', 'endpoint' '{endpoint}') LOCATION '{location}'"); let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx.state().create_logical_plan(&sql).await?; - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { let builder = get_oss_object_store_builder(table_url.as_ref(), cmd)?; // get the actual configuration information, then assert_eq! let config = [ @@ -244,9 +245,9 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('service_account_path' '{service_account_path}', 'service_account_key' '{service_account_key}', 'application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"); let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx.state().create_logical_plan(&sql).await?; - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { let builder = get_gcs_object_store_builder(table_url.as_ref(), cmd)?; // get the actual configuration information, then assert_eq! let config = [ From 960b5753625c46bf4a8f51bd811ad77d84872edc Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 6 Dec 2023 08:21:10 -0500 Subject: [PATCH 2/5] add test and comments --- datafusion-cli/Cargo.toml | 1 + datafusion-cli/src/exec.rs | 15 +++++++++++++++ datafusion-cli/src/object_storage.rs | 2 ++ 3 files changed, 18 insertions(+) diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index dd7a077988cb..b7877b245d0d 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -50,3 +50,4 @@ assert_cmd = "2.0" ctor = "0.2.0" predicates = "3.0" rstest = "0.17" +datafusion-common = {path="../datafusion/common"} diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index c8777a847470..597c88883644 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -221,6 +221,9 @@ async fn exec_and_print( | LogicalPlan::Analyze(_) ); + // Note that cmd is a mutable reference so that create_external_table function can remove all + // datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion + // will raise Configuration errors. if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(ctx, cmd).await?; } @@ -284,8 +287,11 @@ async fn create_external_table( #[cfg(test)] mod tests { + use std::str::FromStr; + use super::*; use datafusion::common::plan_err; + use datafusion_common::{FileTypeWriterOptions, file_options::StatementOptions}; async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); @@ -293,6 +299,15 @@ mod tests { if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(&ctx, cmd).await?; + let options: Vec<_> = cmd.options.iter().map(|(k,v)| (k.clone(), v.clone())).collect(); + let statement_options = StatementOptions::new(options); + let file_type = datafusion_common::FileType::from_str(cmd.file_type.as_str())?; + + let _file_type_writer_options = FileTypeWriterOptions::build( + &file_type, + ctx.state().config_options(), + &statement_options, + )?; } else { return plan_err!("LogicalPlan is not a CreateExternalTable"); } diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 17d0f9b9e604..9d79c7e0ec78 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -36,6 +36,8 @@ pub async fn get_s3_object_store_builder( let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name); if let (Some(access_key_id), Some(secret_access_key)) = ( + // These options are datafusion-cli specific and must be removed before passing through to datafusion. + // Otherwise, a Configuration error will be raised. cmd.options.remove("access_key_id"), cmd.options.remove("secret_access_key"), ) { From f4a6adf9efdfcc35f957ca0dd0db15bd110e4305 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 6 Dec 2023 08:23:29 -0500 Subject: [PATCH 3/5] cargo fmt --- datafusion-cli/src/exec.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 597c88883644..8af534cd1375 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -291,7 +291,7 @@ mod tests { use super::*; use datafusion::common::plan_err; - use datafusion_common::{FileTypeWriterOptions, file_options::StatementOptions}; + use datafusion_common::{file_options::StatementOptions, FileTypeWriterOptions}; async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); @@ -299,9 +299,14 @@ mod tests { if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(&ctx, cmd).await?; - let options: Vec<_> = cmd.options.iter().map(|(k,v)| (k.clone(), v.clone())).collect(); + let options: Vec<_> = cmd + .options + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let statement_options = StatementOptions::new(options); - let file_type = datafusion_common::FileType::from_str(cmd.file_type.as_str())?; + let file_type = + datafusion_common::FileType::from_str(cmd.file_type.as_str())?; let _file_type_writer_options = FileTypeWriterOptions::build( &file_type, From 0f71c997a4c54bc7aa305bf3d5e8eea9a558913f Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 6 Dec 2023 08:54:16 -0500 Subject: [PATCH 4/5] merge main --- datafusion-cli/Cargo.lock | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 474d85ac4603..f88c907b052f 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1155,6 +1155,7 @@ dependencies = [ "clap", "ctor", "datafusion", + "datafusion-common", "dirs", "env_logger", "mimalloc", @@ -2157,9 +2158,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", @@ -2307,7 +2308,7 @@ dependencies = [ "quick-xml", "rand", "reqwest", - "ring 0.17.6", + "ring 0.17.7", "rustls-pemfile", "serde", "serde_json", @@ -2770,9 +2771,9 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.6" +version = "0.17.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" dependencies = [ "cc", "getrandom", @@ -2861,7 +2862,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" dependencies = [ "log", - "ring 0.17.6", + "ring 0.17.7", "rustls-webpki", "sct", ] @@ -2893,7 +2894,7 @@ version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring 0.17.6", + "ring 0.17.7", "untrusted 0.9.0", ] @@ -2962,7 +2963,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ - "ring 0.17.6", + "ring 0.17.7", "untrusted 0.9.0", ] @@ -3759,7 +3760,7 @@ version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" dependencies = [ - "ring 0.17.6", + "ring 0.17.7", "untrusted 0.9.0", ] From a83bd6c2943f9cc099ea4c55443df2cdefedeb6a Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 6 Dec 2023 09:16:28 -0500 Subject: [PATCH 5/5] cargo toml format --- datafusion-cli/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index b7877b245d0d..fd2dfd76c20e 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -48,6 +48,6 @@ url = "2.2" [dev-dependencies] assert_cmd = "2.0" ctor = "0.2.0" +datafusion-common = { path = "../datafusion/common" } predicates = "3.0" rstest = "0.17" -datafusion-common = {path="../datafusion/common"}