Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support named query parameters #8384

Merged
merged 29 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7afeb8b
Minor: Improve the document format of JoinHashMap
Asura7969 Nov 8, 2023
6332bec
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 10, 2023
cc5e0c7
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 10, 2023
a114310
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 11, 2023
928c811
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 11, 2023
839093e
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 12, 2023
a836cde
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 13, 2023
5648dc7
Merge branch 'apache:main' into main
Asura7969 Nov 13, 2023
a670409
Merge branch 'apache:main' into main
Asura7969 Nov 14, 2023
22894a3
Merge branch 'apache:main' into main
Asura7969 Nov 14, 2023
73a59d2
Merge branch 'apache:main' into main
Asura7969 Nov 15, 2023
46409c2
Merge branch 'apache:main' into main
Asura7969 Nov 16, 2023
8a86a4c
Merge branch 'apache:main' into main
Asura7969 Nov 17, 2023
cf5c584
Merge branch 'apache:main' into main
Asura7969 Nov 17, 2023
62ae9b9
Merge branch 'apache:main' into main
Asura7969 Nov 19, 2023
da02fa2
Merge branch 'apache:main' into main
Asura7969 Nov 20, 2023
d98eb2e
Merge branch 'apache:main' into main
Asura7969 Nov 21, 2023
79e7216
Merge branch 'apache:main' into main
Asura7969 Nov 21, 2023
ba51abd
Merge branch 'apache:main' into main
Asura7969 Nov 23, 2023
2468f52
Merge branch 'apache:main' into main
Asura7969 Nov 23, 2023
180c303
Merge branch 'apache:main' into main
Asura7969 Nov 24, 2023
68980ba
Merge branch 'apache:main' into main
Asura7969 Nov 27, 2023
9411940
Merge branch 'apache:main' into main
Asura7969 Nov 27, 2023
ba28346
Merge branch 'apache:main' into main
Asura7969 Nov 28, 2023
df0942f
Merge branch 'apache:main' into main
Asura7969 Nov 29, 2023
d6381c2
support named query parameters
Asura7969 Nov 30, 2023
69a2080
cargo fmt
Asura7969 Dec 1, 2023
49e617e
add `ParamValues` conversion
Asura7969 Dec 2, 2023
5b8c3b7
improve doc
Asura7969 Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions datafusion/common/src/param_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,24 @@ impl From<Vec<ScalarValue>> for ParamValues {
}
}

impl From<HashMap<String, ScalarValue>> for ParamValues {
fn from(value: HashMap<String, ScalarValue>) -> Self {
impl<K> From<Vec<(K, ScalarValue)>> for ParamValues
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 very nice

where
K: Into<String>,
{
fn from(value: Vec<(K, ScalarValue)>) -> Self {
let value: HashMap<String, ScalarValue> =
value.into_iter().map(|(k, v)| (k.into(), v)).collect();
Self::MAP(value)
}
}

impl<K> From<HashMap<K, ScalarValue>> for ParamValues
where
K: Into<String>,
{
fn from(value: HashMap<K, ScalarValue>) -> Self {
let value: HashMap<String, ScalarValue> =
value.into_iter().map(|(k, v)| (k.into(), v)).collect();
Self::MAP(value)
}
}
4 changes: 2 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,7 @@ impl DataFrame {
/// .with_param_values(vec![
/// // value at index 0 --> $1
/// ScalarValue::from(2i64)
/// ].into())?
/// ])?
/// .collect()
/// .await?;
/// assert_batches_eq!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add a demonstration of how to use the hashmap version too so that it is easier to discover by a casual user?

Perhaps add this to the example:

    /// // Note you can also provide named parameters
    /// let results = ctx
    ///   .sql("SELECT a FROM example WHERE b = $my_param")
    ///   .await?
    ///    // replace $my_param with value 2
    ///    // Note you can also use a HashMap as well
    ///   .with_param_values(vec![
    ///      "my_param",
    ///      ScalarValue::from(2i64)
    ///    ])?
    ///   .collect()
    ///   .await?;
    /// assert_batches_eq!(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment, I'm done

Expand All @@ -1231,7 +1231,7 @@ impl DataFrame {
/// # Ok(())
/// # }
/// ```
pub fn with_param_values(self, query_values: ParamValues) -> Result<Self> {
pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
let plan = self.plan.with_param_values(query_values)?;
Ok(Self::new(self.session_state, plan))
}
Expand Down
23 changes: 9 additions & 14 deletions datafusion/core/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
// under the License.

use super::*;
use datafusion_common::{ParamValues, ScalarValue};
use std::collections::HashMap;
use datafusion_common::ScalarValue;
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -491,8 +490,7 @@ async fn test_prepare_statement() -> Result<()> {
ctx.sql("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 FROM test WHERE c1 > $2 AND c1 < $1").await?;

// prepare logical plan to logical plan without parameters
let param_values: ParamValues =
vec![ScalarValue::Int32(Some(3)), ScalarValue::Float64(Some(0.0))].into();
let param_values = vec![ScalarValue::Int32(Some(3)), ScalarValue::Float64(Some(0.0))];
let dataframe = dataframe.with_param_values(param_values)?;
let results = dataframe.collect().await?;

Expand Down Expand Up @@ -535,18 +533,15 @@ async fn test_named_query_parameters() -> Result<()> {

// sql to statement then to logical plan with parameters
// c1 defined as UINT32, c2 defined as UInt64
let dataframe = ctx
let results = ctx
.sql("SELECT c1, c2 FROM test WHERE c1 > $coo AND c1 < $foo")
.await?
.with_param_values(vec![
("foo", ScalarValue::UInt32(Some(3))),
("coo", ScalarValue::UInt32(Some(0))),
])?
.collect()
.await?;

// to logical plan without parameters
let param_values = vec![
("foo".to_string(), ScalarValue::UInt32(Some(3))),
("coo".to_string(), ScalarValue::UInt32(Some(0))),
];
let param_values: HashMap<_, _> = param_values.into_iter().collect();
let dataframe = dataframe.with_param_values(param_values.into())?;
let results = dataframe.collect().await?;
let expected = vec![
"+----+----+",
"| c1 | c2 |",
Expand Down
8 changes: 6 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,14 +981,18 @@ impl LogicalPlan {
/// // Fill in the parameter $1 with a literal 3
/// let plan = plan.with_param_values(vec![
/// ScalarValue::from(3i32) // value at index 0 --> $1
/// ].into()).unwrap();
/// ]).unwrap();
///
/// assert_eq!("Filter: t1.id = Int32(3)\
/// \n TableScan: t1",
/// plan.display_indent().to_string()
/// );
/// ```
Asura7969 marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_param_values(self, param_values: ParamValues) -> Result<LogicalPlan> {
pub fn with_param_values(
self,
param_values: impl Into<ParamValues>,
) -> Result<LogicalPlan> {
let param_values = param_values.into();
match self {
LogicalPlan::Prepare(prepare_lp) => {
param_values.verify(&prepare_lp.data_types)?;
Expand Down
32 changes: 14 additions & 18 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2678,7 +2678,7 @@ fn prepare_stmt_quick_test(

fn prepare_stmt_replace_params_quick_test(
plan: LogicalPlan,
param_values: ParamValues,
param_values: impl Into<ParamValues>,
expected_plan: &str,
) -> LogicalPlan {
// replace params
Expand Down Expand Up @@ -3708,7 +3708,7 @@ fn test_prepare_statement_to_plan_no_param() {

///////////////////
// replace params with values
let param_values = vec![ScalarValue::Int32(Some(10))].into();
let param_values = vec![ScalarValue::Int32(Some(10))];
let expected_plan = "Projection: person.id, person.age\
\n Filter: person.age = Int64(10)\
\n TableScan: person";
Expand All @@ -3730,7 +3730,7 @@ fn test_prepare_statement_to_plan_no_param() {

///////////////////
// replace params with values
let param_values = vec![].into();
let param_values: Vec<ScalarValue> = vec![];
let expected_plan = "Projection: person.id, person.age\
\n Filter: person.age = Int64(10)\
\n TableScan: person";
Expand All @@ -3744,7 +3744,7 @@ fn test_prepare_statement_to_plan_one_param_no_value_panic() {
let sql = "PREPARE my_plan(INT) AS SELECT id, age FROM person WHERE age = 10";
let plan = logical_plan(sql).unwrap();
// declare 1 param but provide 0
let param_values = vec![].into();
let param_values: Vec<ScalarValue> = vec![];
assert_eq!(
plan.with_param_values(param_values)
.unwrap_err()
Expand All @@ -3759,7 +3759,7 @@ fn test_prepare_statement_to_plan_one_param_one_value_different_type_panic() {
let sql = "PREPARE my_plan(INT) AS SELECT id, age FROM person WHERE age = 10";
let plan = logical_plan(sql).unwrap();
// declare 1 param but provide 0
let param_values = vec![ScalarValue::Float64(Some(20.0))].into();
let param_values = vec![ScalarValue::Float64(Some(20.0))];
assert_eq!(
plan.with_param_values(param_values)
.unwrap_err()
Expand All @@ -3774,7 +3774,7 @@ fn test_prepare_statement_to_plan_no_param_on_value_panic() {
let sql = "PREPARE my_plan AS SELECT id, age FROM person WHERE age = 10";
let plan = logical_plan(sql).unwrap();
// declare 1 param but provide 0
let param_values = vec![ScalarValue::Int32(Some(10))].into();
let param_values = vec![ScalarValue::Int32(Some(10))];
assert_eq!(
plan.with_param_values(param_values)
.unwrap_err()
Expand All @@ -3795,7 +3795,7 @@ fn test_prepare_statement_to_plan_params_as_constants() {

///////////////////
// replace params with values
let param_values = vec![ScalarValue::Int32(Some(10))].into();
let param_values = vec![ScalarValue::Int32(Some(10))];
let expected_plan = "Projection: Int32(10)\n EmptyRelation";

prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
Expand All @@ -3811,7 +3811,7 @@ fn test_prepare_statement_to_plan_params_as_constants() {

///////////////////
// replace params with values
let param_values = vec![ScalarValue::Int32(Some(10))].into();
let param_values = vec![ScalarValue::Int32(Some(10))];
let expected_plan = "Projection: Int64(1) + Int32(10)\n EmptyRelation";

prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
Expand All @@ -3830,8 +3830,7 @@ fn test_prepare_statement_to_plan_params_as_constants() {
let param_values = vec![
ScalarValue::Int32(Some(10)),
ScalarValue::Float64(Some(10.0)),
]
.into();
];
let expected_plan = "Projection: Int64(1) + Int32(10) + Float64(10)\n EmptyRelation";

prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
Expand Down Expand Up @@ -4068,7 +4067,7 @@ fn test_prepare_statement_to_plan_one_param() {

///////////////////
// replace params with values
let param_values = vec![ScalarValue::Int32(Some(10))].into();
let param_values = vec![ScalarValue::Int32(Some(10))];
let expected_plan = "Projection: person.id, person.age\
\n Filter: person.age = Int32(10)\
\n TableScan: person";
Expand All @@ -4093,7 +4092,7 @@ fn test_prepare_statement_to_plan_data_type() {

///////////////////
// replace params with values still succeed and use Float64
let param_values = vec![ScalarValue::Float64(Some(10.0))].into();
let param_values = vec![ScalarValue::Float64(Some(10.0))];
let expected_plan = "Projection: person.id, person.age\
\n Filter: person.age = Float64(10)\
\n TableScan: person";
Expand Down Expand Up @@ -4126,8 +4125,7 @@ fn test_prepare_statement_to_plan_multi_params() {
ScalarValue::Int32(Some(20)),
ScalarValue::Float64(Some(200.0)),
ScalarValue::Utf8(Some("xyz".to_string())),
]
.into();
];
let expected_plan =
"Projection: person.id, person.age, Utf8(\"xyz\")\
\n Filter: person.age IN ([Int32(10), Int32(20)]) AND person.salary > Float64(100) AND person.salary < Float64(200) OR person.first_name < Utf8(\"abc\")\
Expand Down Expand Up @@ -4164,8 +4162,7 @@ fn test_prepare_statement_to_plan_having() {
ScalarValue::Float64(Some(100.0)),
ScalarValue::Float64(Some(200.0)),
ScalarValue::Float64(Some(300.0)),
]
.into();
];
let expected_plan =
"Projection: person.id, SUM(person.age)\
\n Filter: SUM(person.age) < Int32(10) AND SUM(person.age) > Int64(10) OR SUM(person.age) IN ([Float64(200), Float64(300)])\
Expand Down Expand Up @@ -4195,8 +4192,7 @@ fn test_prepare_statement_to_plan_value_list() {
let param_values = vec![
ScalarValue::Utf8(Some("a".to_string())),
ScalarValue::Utf8(Some("b".to_string())),
]
.into();
];
let expected_plan = "Projection: t.num, t.letter\
\n SubqueryAlias: t\
\n Projection: column1 AS num, column2 AS letter\
Expand Down