Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Remove df-cli specific SQL statment options before executing with DataFusion #8426

Merged
merged 5 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@ url = "2.2"
[dev-dependencies]
assert_cmd = "2.0"
ctor = "0.2.0"
datafusion-common = { path = "../datafusion/common" }
predicates = "3.0"
rstest = "0.17"
31 changes: 25 additions & 6 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async fn exec_and_print(
})?;
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let plan = ctx.state().statement_to_plan(statement).await?;
let mut plan = ctx.state().statement_to_plan(statement).await?;

// For plans like `Explain` ignore `MaxRows` option and always display all rows
let should_ignore_maxrows = matches!(
Expand All @@ -221,10 +221,12 @@ async fn exec_and_print(
| LogicalPlan::Analyze(_)
);

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
// Note that cmd is a mutable reference so that create_external_table function can remove all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
create_external_table(ctx, cmd).await?;
}

let df = ctx.execute_logical_plan(plan).await?;
let results = df.collect().await?;

Expand All @@ -244,7 +246,7 @@ async fn exec_and_print(

async fn create_external_table(
ctx: &SessionContext,
cmd: &CreateExternalTable,
cmd: &mut CreateExternalTable,
) -> Result<()> {
let table_path = ListingTableUrl::parse(&cmd.location)?;
let scheme = table_path.scheme();
Expand Down Expand Up @@ -285,15 +287,32 @@ async fn create_external_table(

#[cfg(test)]
mod tests {
use std::str::FromStr;

use super::*;
use datafusion::common::plan_err;
use datafusion_common::{file_options::StatementOptions, FileTypeWriterOptions};

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(sql).await?;
let mut plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
create_external_table(&ctx, cmd).await?;
let options: Vec<_> = cmd
.options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let statement_options = StatementOptions::new(options);
let file_type =
datafusion_common::FileType::from_str(cmd.file_type.as_str())?;

let _file_type_writer_options = FileTypeWriterOptions::build(
&file_type,
ctx.state().config_options(),
&statement_options,
)?;
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}
Expand Down
41 changes: 22 additions & 19 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@ use url::Url;

pub async fn get_s3_object_store_builder(
url: &Url,
cmd: &CreateExternalTable,
cmd: &mut CreateExternalTable,
) -> Result<AmazonS3Builder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);

if let (Some(access_key_id), Some(secret_access_key)) = (
cmd.options.get("access_key_id"),
cmd.options.get("secret_access_key"),
// These options are datafusion-cli specific and must be removed before passing through to datafusion.
// Otherwise, a Configuration error will be raised.
cmd.options.remove("access_key_id"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very subtle change -- maybe we can add a comment explaining why these options are removed

Likewise below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments explaining why the keys are removed/plan made mutable within datafusion-cli.

I also updated one of the unit tests to invoke FileTypeWriterOptions::build to test for this issue in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

cmd.options.remove("secret_access_key"),
) {
println!("removing secret access key!");
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);

if let Some(session_token) = cmd.options.get("session_token") {
if let Some(session_token) = cmd.options.remove("session_token") {
builder = builder.with_token(session_token);
}
} else {
Expand All @@ -66,7 +69,7 @@ pub async fn get_s3_object_store_builder(
builder = builder.with_credentials(credentials);
}

if let Some(region) = cmd.options.get("region") {
if let Some(region) = cmd.options.remove("region") {
builder = builder.with_region(region);
}

Expand Down Expand Up @@ -99,7 +102,7 @@ impl CredentialProvider for S3CredentialProvider {

pub fn get_oss_object_store_builder(
url: &Url,
cmd: &CreateExternalTable,
cmd: &mut CreateExternalTable,
) -> Result<AmazonS3Builder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env()
Expand All @@ -109,15 +112,15 @@ pub fn get_oss_object_store_builder(
.with_region("do_not_care");

if let (Some(access_key_id), Some(secret_access_key)) = (
cmd.options.get("access_key_id"),
cmd.options.get("secret_access_key"),
cmd.options.remove("access_key_id"),
cmd.options.remove("secret_access_key"),
) {
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);
}

if let Some(endpoint) = cmd.options.get("endpoint") {
if let Some(endpoint) = cmd.options.remove("endpoint") {
builder = builder.with_endpoint(endpoint);
}

Expand All @@ -126,21 +129,21 @@ pub fn get_oss_object_store_builder(

pub fn get_gcs_object_store_builder(
url: &Url,
cmd: &CreateExternalTable,
cmd: &mut CreateExternalTable,
) -> Result<GoogleCloudStorageBuilder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);

if let Some(service_account_path) = cmd.options.get("service_account_path") {
if let Some(service_account_path) = cmd.options.remove("service_account_path") {
builder = builder.with_service_account_path(service_account_path);
}

if let Some(service_account_key) = cmd.options.get("service_account_key") {
if let Some(service_account_key) = cmd.options.remove("service_account_key") {
builder = builder.with_service_account_key(service_account_key);
}

if let Some(application_credentials_path) =
cmd.options.get("application_credentials_path")
cmd.options.remove("application_credentials_path")
{
builder = builder.with_application_credentials(application_credentials_path);
}
Expand Down Expand Up @@ -180,9 +183,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}', 'region' '{region}', 'session_token' {session_token}) LOCATION '{location}'");

let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(&sql).await?;
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
let builder = get_s3_object_store_builder(table_url.as_ref(), cmd).await?;
// get the actual configuration information, then assert_eq!
let config = [
Expand Down Expand Up @@ -212,9 +215,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}', 'endpoint' '{endpoint}') LOCATION '{location}'");

let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(&sql).await?;
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
let builder = get_oss_object_store_builder(table_url.as_ref(), cmd)?;
// get the actual configuration information, then assert_eq!
let config = [
Expand Down Expand Up @@ -244,9 +247,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('service_account_path' '{service_account_path}', 'service_account_key' '{service_account_key}', 'application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");

let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(&sql).await?;
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
let builder = get_gcs_object_store_builder(table_url.as_ref(), cmd)?;
// get the actual configuration information, then assert_eq!
let config = [
Expand Down