Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support create object store source tables without depending on environment variables #5732

Merged
merged 11 commits into from
Apr 5, 2023
175 changes: 171 additions & 4 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,26 @@
use crate::{
command::{Command, OutputFormat},
helper::CliHelper,
object_storage::{
get_gcs_object_store_builder, get_oss_object_store_builder,
get_s3_object_store_builder,
},
print_options::PrintOptions,
};
use datafusion::error::Result;
use datafusion::prelude::SessionContext;
use datafusion::{
datasource::listing::ListingTableUrl,
error::{DataFusionError, Result},
logical_expr::CreateExternalTable,
};
use datafusion::{logical_expr::LogicalPlan, prelude::SessionContext};
use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
use std::{fs::File, sync::Arc};
use url::Url;

/// run and execute SQL statements and commands from a file, against a context with the given print options
pub async fn exec_from_lines(
Expand Down Expand Up @@ -165,9 +175,166 @@ async fn exec_and_print(
sql: String,
) -> Result<()> {
let now = Instant::now();
let df = ctx.sql(&sql).await?;

let plan = ctx.state().create_logical_plan(&sql).await?;
let df = match &plan {
LogicalPlan::CreateExternalTable(cmd) => {
create_external_table(&ctx, cmd)?;
ctx.execute_logical_plan(plan).await?
}
_ => ctx.execute_logical_plan(plan).await?,
};

let results = df.collect().await?;
print_options.print_batches(&results, now)?;

Ok(())
}

fn create_external_table(ctx: &SessionContext, cmd: &CreateExternalTable) -> Result<()> {
let table_path = ListingTableUrl::parse(&cmd.location)?;
let scheme = table_path.scheme();
let url: &Url = table_path.as_ref();

// registering the cloud object store dynamically using cmd.options
let store = match scheme {
"s3" => {
let builder = get_s3_object_store_builder(url, cmd)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"oss" => {
let builder = get_oss_object_store_builder(url, cmd)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"gs" | "gcs" => {
let builder = get_gcs_object_store_builder(url, cmd)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
_ => {
// for other types, try to get from the object_store_registry
ctx.runtime_env()
.object_store_registry
.get_store(url)
.map_err(|_| {
DataFusionError::Execution(format!(
"Unsupported object store scheme: {}",
scheme
))
})?
}
};

ctx.runtime_env().register_object_store(url, store);

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(&sql).await?;

match &plan {
LogicalPlan::CreateExternalTable(cmd) => {
create_external_table(&ctx, cmd)?;
}
_ => assert!(false),
};

ctx.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;

Ok(())
}

#[tokio::test]
async fn create_object_store_table_s3() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let region = "fake_us-east-2";
let session_token = "fake_session_token";
let location = "s3://bucket/path/file.parquet";

// Missing region
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}') LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();
assert!(err.to_string().contains("Missing region"));

// Should be OK
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}', 'region' '{region}', 'session_token' '{session_token}') LOCATION '{location}'");
create_external_table_test(location, &sql).await?;

Ok(())
}

#[tokio::test]
async fn create_object_store_table_oss() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let endpoint = "fake_endpoint";
let location = "oss://bucket/path/file.parquet";

// Should be OK
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}', 'endpoint' '{endpoint}') LOCATION '{location}'");
create_external_table_test(location, &sql).await?;

Ok(())
}

#[tokio::test]
async fn create_object_store_table_gcs() -> Result<()> {
let service_account_path = "fake_service_account_path";
let service_account_key =
"{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
let application_credentials_path = "fake_application_credentials_path";
let location = "gcs://bucket/path/file.parquet";

// for service_account_path
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('service_account_path' '{service_account_path}') LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();
assert!(err.to_string().contains("No such file or directory"));

// for service_account_key
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('service_account_key' '{service_account_key}') LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();
assert!(err.to_string().contains("No RSA key found in pem file"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for negative tests


// for application_credentials_path
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();
assert!(err.to_string().contains("A configuration file was passed"));

Ok(())
}

#[tokio::test]
async fn create_external_table_local_file() -> Result<()> {
let location = "/path/to/file.parquet";

// Ensure that local files are also registered
let sql =
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();
assert!(err.to_string().contains("No such file or directory"));

Ok(())
}
}
5 changes: 1 addition & 4 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use datafusion::execution::context::SessionConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::object_storage::DatafusionCliObjectStoreRegistry;
use datafusion_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
};
Expand Down Expand Up @@ -148,9 +147,7 @@ pub async fn main() -> Result<()> {
}

fn create_runtime_env() -> Result<RuntimeEnv> {
let object_store_registry = DatafusionCliObjectStoreRegistry::new();
let rn_config =
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
let rn_config = RuntimeConfig::new();
RuntimeEnv::new(rn_config)
}

Expand Down
Loading