From 7e14ac0d453215f2ba36e6b2cfba7bc7df2c669c Mon Sep 17 00:00:00 2001 From: TCeason <33082201+TCeason@users.noreply.github.com> Date: Thu, 26 Sep 2024 17:57:34 +0800 Subject: [PATCH] feat(query): Procedure support drop if exists and create or replace (#16500) * feat(query): support drop procedure if exists * support create or replace procedure * fix conversation * refactor: support create_option and drop if exisits * add_procedure return Result> --- src/meta/app/src/principal/procedure.rs | 14 +-- src/query/ast/src/ast/statements/procedure.rs | 13 ++- src/query/ast/src/parser/statement.rs | 26 ++++- src/query/ast/tests/it/parser.rs | 19 +++ src/query/ast/tests/it/testdata/stmt.txt | 110 ++++++++++++++++++ .../management/src/procedure/procedure_mgr.rs | 25 +--- .../interpreter_procedure_create.rs | 25 +++- .../interpreter_procedure_drop.rs | 2 +- .../sql/src/planner/binder/ddl/procedure.rs | 5 +- .../sql/src/planner/plans/ddl/procedure.rs | 8 +- src/query/users/src/user_procedure.rs | 33 +++++- .../base/15_procedure/15_0002_procedure.test | 55 ++++++++- 12 files changed, 275 insertions(+), 60 deletions(-) diff --git a/src/meta/app/src/principal/procedure.rs b/src/meta/app/src/principal/procedure.rs index e6d693edb017..490d45130b1b 100644 --- a/src/meta/app/src/principal/procedure.rs +++ b/src/meta/app/src/principal/procedure.rs @@ -25,7 +25,6 @@ use databend_common_expression::types::DataType; use crate::principal::procedure_id_ident::ProcedureIdIdent; use crate::principal::procedure_name_ident::ProcedureNameIdent; use crate::principal::ProcedureIdentity; -use crate::schema::CreateOption; use crate::tenant::Tenant; use crate::tenant::ToTenant; use crate::KeyWithTenant; @@ -85,22 +84,15 @@ impl Display for ProcedureMeta { #[derive(Clone, Debug, PartialEq)] pub struct CreateProcedureReq { - pub create_option: CreateOption, pub name_ident: ProcedureNameIdent, pub meta: ProcedureMeta, } impl Display for CreateProcedureReq { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let typ = match self.create_option { - CreateOption::Create => "create_procedure", - CreateOption::CreateIfNotExists => "create_procedure_if_not_exists", - CreateOption::CreateOrReplace => "create_or_replace_procedure", - }; write!( f, - "{}:{}/{}={:?}", - typ, + "{}/{}={:?}", self.name_ident.tenant_name(), self.name_ident.procedure_name(), self.meta @@ -137,7 +129,6 @@ pub struct RenameProcedureReply {} #[derive(Clone, Debug, PartialEq, Eq)] pub struct DropProcedureReq { - pub if_exists: bool, pub name_ident: ProcedureNameIdent, } @@ -145,8 +136,7 @@ impl Display for DropProcedureReq { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!( f, - "drop_procedure(if_exists={}):{}/{}", - self.if_exists, + "drop_procedure:{}/{}", self.name_ident.tenant_name(), self.name_ident.procedure_name(), ) diff --git a/src/query/ast/src/ast/statements/procedure.rs b/src/query/ast/src/ast/statements/procedure.rs index 336649120ed4..a2d75d52ea2a 100644 --- a/src/query/ast/src/ast/statements/procedure.rs +++ b/src/query/ast/src/ast/statements/procedure.rs @@ -98,7 +98,11 @@ impl Display for CreateProcedureStmt { if let CreateOption::CreateOrReplace = self.create_option { write!(f, "OR REPLACE ")?; } - write!(f, "PROCEDURE {}", self.name.name)?; + write!(f, "PROCEDURE ")?; + if let CreateOption::CreateIfNotExists = self.create_option { + write!(f, "IF NOT EXISTS ")?; + } + write!(f, "{}", self.name.name)?; if let Some(args) = &self.args { if args.is_empty() { write!(f, "() ")?; @@ -137,12 +141,17 @@ impl Display for CreateProcedureStmt { #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct DropProcedureStmt { + pub if_exists: bool, pub name: ProcedureIdentity, } impl Display for DropProcedureStmt { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "DROP PROCEDURE {}", self.name)?; + write!(f, "DROP PROCEDURE ")?; + if self.if_exists { + write!(f, "IF EXISTS ")?; + } + write!(f, "{}", self.name)?; Ok(()) } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 0877ac754811..a985f0e9ee15 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -2067,10 +2067,25 @@ pub fn statement_body(i: Input) -> IResult { // [ COMMENT = '' ] AS let create_procedure = map_res( rule! { - CREATE ~ ( OR ~ ^REPLACE )? ~ PROCEDURE ~ #ident ~ #procedure_arg ~ RETURNS ~ #procedure_return ~ LANGUAGE ~ SQL ~ (COMMENT ~ "=" ~ #literal_string)? ~ AS ~ #code_string + CREATE ~ ( OR ~ ^REPLACE )? ~ PROCEDURE ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident ~ #procedure_arg ~ RETURNS ~ #procedure_return ~ LANGUAGE ~ SQL ~ (COMMENT ~ "=" ~ #literal_string)? ~ AS ~ #code_string }, - |(_, opt_or_replace, _, name, args, _, return_type, _, _, opt_comment, _, script)| { - let create_option = parse_create_option(opt_or_replace.is_some(), false)?; + |( + _, + opt_or_replace, + _, + opt_if_not_exists, + name, + args, + _, + return_type, + _, + _, + opt_comment, + _, + script, + )| { + let create_option = + parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?; let name = ProcedureIdentity { name: name.to_string(), @@ -2137,10 +2152,11 @@ pub fn statement_body(i: Input) -> IResult { let drop_procedure = map( rule! { - DROP ~ PROCEDURE ~ #ident ~ #procedure_type_name + DROP ~ PROCEDURE ~ ( IF ~ ^EXISTS )? ~ #ident ~ #procedure_type_name }, - |(_, _, name, args)| { + |(_, _, opt_if_exists, name, args)| { Statement::DropProcedure(DropProcedureStmt { + if_exists: opt_if_exists.is_some(), name: ProcedureIdentity { name: name.to_string(), args_type: if args.is_empty() { diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index c484eff311df..ca1566d5cbd6 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -822,10 +822,29 @@ fn test_statement() { r#"describe PROCEDURE p1()"#, r#"describe PROCEDURE p1(string, timestamp)"#, r#"drop PROCEDURE p1()"#, + r#"drop PROCEDURE if exists p1()"#, r#"drop PROCEDURE p1(int, string)"#, r#"call PROCEDURE p1()"#, r#"call PROCEDURE p1(1, 'x', '2022-02-02'::Date)"#, r#"show PROCEDURES like 'p1%'"#, + r#"create or replace PROCEDURE p1() returns string not null language sql comment = 'test' as $$ + BEGIN + LET sum := 0; + FOR x IN SELECT * FROM numbers(100) DO + sum := sum + x.number; + END FOR; + RETURN sum; + END; + $$;"#, + r#"create PROCEDURE if not exists p1() returns string not null language sql comment = 'test' as $$ + BEGIN + LET sum := 0; + FOR x IN SELECT * FROM numbers(100) DO + sum := sum + x.number; + END FOR; + RETURN sum; + END; + $$;"#, r#"create PROCEDURE p1() returns string not null language sql comment = 'test' as $$ BEGIN LET sum := 0; diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 25e029ec6d15..3d4e878352a9 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -23030,6 +23030,23 @@ DROP PROCEDURE p1() ---------- AST ------------ DropProcedure( DropProcedureStmt { + if_exists: false, + name: ProcedureIdentity { + name: "p1", + args_type: "", + }, + }, +) + + +---------- Input ---------- +drop PROCEDURE if exists p1() +---------- Output --------- +DROP PROCEDURE IF EXISTS p1() +---------- AST ------------ +DropProcedure( + DropProcedureStmt { + if_exists: true, name: ProcedureIdentity { name: "p1", args_type: "", @@ -23045,6 +23062,7 @@ DROP PROCEDURE p1(Int32,STRING) ---------- AST ------------ DropProcedure( DropProcedureStmt { + if_exists: false, name: ProcedureIdentity { name: "p1", args_type: "Int32,STRING", @@ -23130,6 +23148,98 @@ ShowProcedures { } +---------- Input ---------- +create or replace PROCEDURE p1() returns string not null language sql comment = 'test' as $$ +BEGIN + LET sum := 0; + FOR x IN SELECT * FROM numbers(100) DO + sum := sum + x.number; + END FOR; + RETURN sum; +END; +$$; +---------- Output --------- +CREATE OR REPLACE PROCEDURE p1() RETURNS STRING NOT NULL LANGUAGE SQL COMMENT='test' AS $$ +BEGIN + LET sum := 0; + FOR x IN SELECT * FROM numbers(100) DO + sum := sum + x.number; + END FOR; + RETURN sum; +END; +$$ +---------- AST ------------ +CreateProcedure( + CreateProcedureStmt { + create_option: CreateOrReplace, + name: ProcedureIdentity { + name: "p1", + args_type: "", + }, + language: SQL, + args: None, + return_type: [ + ProcedureType { + name: None, + data_type: NotNull( + String, + ), + }, + ], + comment: Some( + "test", + ), + script: "BEGIN\n LET sum := 0;\n FOR x IN SELECT * FROM numbers(100) DO\n sum := sum + x.number;\n END FOR;\n RETURN sum;\nEND;", + }, +) + + +---------- Input ---------- +create PROCEDURE if not exists p1() returns string not null language sql comment = 'test' as $$ +BEGIN + LET sum := 0; + FOR x IN SELECT * FROM numbers(100) DO + sum := sum + x.number; + END FOR; + RETURN sum; +END; +$$; +---------- Output --------- +CREATE PROCEDURE IF NOT EXISTS p1() RETURNS STRING NOT NULL LANGUAGE SQL COMMENT='test' AS $$ +BEGIN + LET sum := 0; + FOR x IN SELECT * FROM numbers(100) DO + sum := sum + x.number; + END FOR; + RETURN sum; +END; +$$ +---------- AST ------------ +CreateProcedure( + CreateProcedureStmt { + create_option: CreateIfNotExists, + name: ProcedureIdentity { + name: "p1", + args_type: "", + }, + language: SQL, + args: None, + return_type: [ + ProcedureType { + name: None, + data_type: NotNull( + String, + ), + }, + ], + comment: Some( + "test", + ), + script: "BEGIN\n LET sum := 0;\n FOR x IN SELECT * FROM numbers(100) DO\n sum := sum + x.number;\n END FOR;\n RETURN sum;\nEND;", + }, +) + + ---------- Input ---------- create PROCEDURE p1() returns string not null language sql comment = 'test' as $$ BEGIN diff --git a/src/query/management/src/procedure/procedure_mgr.rs b/src/query/management/src/procedure/procedure_mgr.rs index 27f4d31caeb5..f85e7c3b784c 100644 --- a/src/query/management/src/procedure/procedure_mgr.rs +++ b/src/query/management/src/procedure/procedure_mgr.rs @@ -22,7 +22,6 @@ use databend_common_meta_app::principal::procedure::ProcedureInfo; use databend_common_meta_app::principal::procedure_id_ident::ProcedureIdIdent; use databend_common_meta_app::principal::CreateProcedureReply; use databend_common_meta_app::principal::CreateProcedureReq; -use databend_common_meta_app::principal::DropProcedureReq; use databend_common_meta_app::principal::GetProcedureReply; use databend_common_meta_app::principal::GetProcedureReq; use databend_common_meta_app::principal::ListProcedureReq; @@ -31,7 +30,6 @@ use databend_common_meta_app::principal::ProcedureIdToNameIdent; use databend_common_meta_app::principal::ProcedureIdentity; use databend_common_meta_app::principal::ProcedureMeta; use databend_common_meta_app::principal::ProcedureNameIdent; -use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; @@ -55,11 +53,11 @@ impl ProcedureMgr { pub async fn create_procedure( &self, req: CreateProcedureReq, + overriding: bool, ) -> Result { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); let name_ident = &req.name_ident; let meta = &req.meta; - let overriding = req.create_option.is_overriding(); let name_ident_raw = serialize_struct(name_ident.procedure_name())?; let create_res = self @@ -74,19 +72,7 @@ impl ProcedureMgr { match create_res { Ok(id) => Ok(CreateProcedureReply { procedure_id: *id }), - Err(existent) => match req.create_option { - CreateOption::Create => { - Err(AppError::from(name_ident.exist_error(func_name!())).into()) - } - CreateOption::CreateIfNotExists => Ok(CreateProcedureReply { - procedure_id: *existent.data, - }), - CreateOption::CreateOrReplace => { - unreachable!( - "create_procedure: CreateOrReplace should never conflict with existent" - ); - } - }, + Err(_) => Err(AppError::from(name_ident.exist_error(func_name!())).into()), } } @@ -94,13 +80,12 @@ impl ProcedureMgr { #[async_backtrace::framed] pub async fn drop_procedure( &self, - req: DropProcedureReq, + name_ident: &ProcedureNameIdent, ) -> Result, SeqV)>, KVAppError> { - debug!(req :? =(&req); "SchemaApi: {}", func_name!()); - let name_ident = req.name_ident; + debug!(name_ident :? =(name_ident); "SchemaApi: {}", func_name!()); let dropped = self .kv_api - .remove_id_value(&name_ident, |id| { + .remove_id_value(name_ident, |id| { vec![ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key()] }) .await?; diff --git a/src/query/service/src/interpreters/interpreter_procedure_create.rs b/src/query/service/src/interpreters/interpreter_procedure_create.rs index 31f7a7022810..86e51d1586ee 100644 --- a/src/query/service/src/interpreters/interpreter_procedure_create.rs +++ b/src/query/service/src/interpreters/interpreter_procedure_create.rs @@ -15,7 +15,10 @@ use std::sync::Arc; use databend_common_exception::Result; +use databend_common_meta_api::kv_app_error::KVAppError; +use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::principal::CreateProcedureReq; +use databend_common_meta_app::schema::CreateOption; use databend_common_sql::plans::CreateProcedurePlan; use databend_common_users::UserApiProvider; use log::debug; @@ -55,10 +58,24 @@ impl Interpreter for CreateProcedureInterpreter { let tenant = self.plan.tenant.clone(); let create_procedure_req: CreateProcedureReq = self.plan.clone().into(); - let _ = UserApiProvider::instance() - .add_procedure(&tenant, create_procedure_req) - .await?; + let overriding = self.plan.create_option.is_overriding(); - Ok(PipelineBuildResult::create()) + return if let Err(e) = UserApiProvider::instance() + .add_procedure(&tenant, create_procedure_req, overriding) + .await? + { + match e { + KVAppError::AppError(AppError::ProcedureAlreadyExists(_)) => { + if self.plan.create_option != CreateOption::CreateIfNotExists { + Err(e.into()) + } else { + Ok(PipelineBuildResult::create()) + } + } + _ => Err(e.into()), + } + } else { + Ok(PipelineBuildResult::create()) + }; } } diff --git a/src/query/service/src/interpreters/interpreter_procedure_drop.rs b/src/query/service/src/interpreters/interpreter_procedure_drop.rs index abb710155fa4..86f318abd286 100644 --- a/src/query/service/src/interpreters/interpreter_procedure_drop.rs +++ b/src/query/service/src/interpreters/interpreter_procedure_drop.rs @@ -56,7 +56,7 @@ impl Interpreter for DropProcedureInterpreter { let drop_procedure_req: DropProcedureReq = self.plan.clone().into(); let _ = UserApiProvider::instance() - .drop_procedure(&tenant, drop_procedure_req) + .drop_procedure(&tenant, drop_procedure_req, self.plan.if_exists) .await?; Ok(PipelineBuildResult::create()) diff --git a/src/query/sql/src/planner/binder/ddl/procedure.rs b/src/query/sql/src/planner/binder/ddl/procedure.rs index 69e32716ae04..17f078433c92 100644 --- a/src/query/sql/src/planner/binder/ddl/procedure.rs +++ b/src/query/sql/src/planner/binder/ddl/procedure.rs @@ -81,12 +81,11 @@ impl Binder { } pub async fn bind_drop_procedure(&mut self, stmt: &DropProcedureStmt) -> Result { - let DropProcedureStmt { name } = stmt; + let DropProcedureStmt { name, if_exists } = stmt; let tenant = self.ctx.get_tenant(); - // TODO: need parser name: ProcedureNameIdent = name + args Ok(Plan::DropProcedure(Box::new(DropProcedurePlan { - if_exists: false, + if_exists: *if_exists, tenant: tenant.to_owned(), name: ProcedureNameIdent::new(tenant, ProcedureIdentity::from(name.clone())), }))) diff --git a/src/query/sql/src/planner/plans/ddl/procedure.rs b/src/query/sql/src/planner/plans/ddl/procedure.rs index ca38ad6e3d94..4104c34ebfe2 100644 --- a/src/query/sql/src/planner/plans/ddl/procedure.rs +++ b/src/query/sql/src/planner/plans/ddl/procedure.rs @@ -46,7 +46,6 @@ pub struct CreateProcedurePlan { impl From for CreateProcedureReq { fn from(p: CreateProcedurePlan) -> Self { CreateProcedureReq { - create_option: p.create_option, name_ident: p.name, meta: p.meta, } @@ -56,7 +55,6 @@ impl From for CreateProcedureReq { impl From<&CreateProcedurePlan> for CreateProcedureReq { fn from(p: &CreateProcedurePlan) -> Self { CreateProcedureReq { - create_option: p.create_option, name_ident: p.name.clone(), meta: p.meta.clone(), } @@ -72,17 +70,13 @@ pub struct DropProcedurePlan { impl From for DropProcedureReq { fn from(p: DropProcedurePlan) -> Self { - DropProcedureReq { - if_exists: p.if_exists, - name_ident: p.name, - } + DropProcedureReq { name_ident: p.name } } } impl From<&DropProcedurePlan> for DropProcedureReq { fn from(p: &DropProcedurePlan) -> Self { DropProcedureReq { - if_exists: p.if_exists, name_ident: p.name.clone(), } } diff --git a/src/query/users/src/user_procedure.rs b/src/query/users/src/user_procedure.rs index 3266c8364be3..ed6039500096 100644 --- a/src/query/users/src/user_procedure.rs +++ b/src/query/users/src/user_procedure.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_meta_api::kv_app_error::KVAppError; use databend_common_meta_app::app_error::AppError; +use databend_common_meta_app::principal::CreateProcedureReply; use databend_common_meta_app::principal::CreateProcedureReq; use databend_common_meta_app::principal::DropProcedureReq; use databend_common_meta_app::principal::GetProcedureReply; @@ -26,10 +29,16 @@ use crate::UserApiProvider; impl UserApiProvider { // Add a new Procedure. #[async_backtrace::framed] - pub async fn add_procedure(&self, tenant: &Tenant, req: CreateProcedureReq) -> Result<()> { + pub async fn add_procedure( + &self, + tenant: &Tenant, + req: CreateProcedureReq, + overriding: bool, + ) -> Result> { let procedure_api = self.procedure_api(tenant); - let _ = procedure_api.create_procedure(req).await?; - Ok(()) + let replay = procedure_api.create_procedure(req, overriding).await; + + Ok(replay) } #[async_backtrace::framed] @@ -48,8 +57,22 @@ impl UserApiProvider { // Drop a Procedure by name. #[async_backtrace::framed] - pub async fn drop_procedure(&self, tenant: &Tenant, req: DropProcedureReq) -> Result<()> { - let _ = self.procedure_api(tenant).drop_procedure(req).await?; + pub async fn drop_procedure( + &self, + tenant: &Tenant, + req: DropProcedureReq, + if_exists: bool, + ) -> Result<()> { + let dropped = self + .procedure_api(tenant) + .drop_procedure(&req.name_ident) + .await?; + if dropped.is_none() && !if_exists { + return Err(ErrorCode::UnknownProcedure(format!( + "Unknown procedure '{}' while drop procedure", + req.name_ident + ))); + } Ok(()) } } diff --git a/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test b/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test index 8bc52f539ab9..1d0ad55352bc 100644 --- a/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test +++ b/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test @@ -2,7 +2,25 @@ statement ok set global enable_experimental_procedure=1; statement ok -CREATE PROCEDURE p1() RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ +drop procedure if exists p1(); + +statement ok +drop procedure if exists p1(UInt8, UInt8); + +statement ok +CREATE PROCEDURE if not exists p1() RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ +BEGIN + LET x := -1; + LET sum := 0; + FOR x IN x TO x + 3 DO + sum := sum + x; + END FOR; + RETURN sum; +END; +$$; + +statement ok +CREATE PROCEDURE if not exists p1() RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ BEGIN LET x := -1; LET sum := 0; @@ -19,6 +37,23 @@ call procedure p1(); ---- 2 +statement ok +CREATE or replace PROCEDURE p1() RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ +BEGIN + LET x := -1; + LET sum := 10; + FOR x IN x TO x + 3 DO + sum := sum + x; + END FOR; + RETURN sum; +END; +$$; + +query T +call procedure p1(); +---- +12 + statement ok CREATE PROCEDURE p1(x UInt8, sum UInt8) RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ BEGIN @@ -39,6 +74,18 @@ BEGIN END; $$; +statement ok +CREATE OR REPLACE PROCEDURE p1() RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ +BEGIN + LET x := -1; + LET sum := 0; + FOR x IN x TO x + 3 DO + sum := sum + x; + END FOR; + RETURN sum; +END; +$$; + query T call procedure p1(); ---- @@ -71,5 +118,11 @@ select count(name) from system.procedures ---- 0 +statement ok +drop procedure if exists not_exists_p(); + +statement error 3130 +drop procedure not_exists_p(); + statement ok unset global enable_experimental_procedure;